You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/09/25 21:46:09 UTC
svn commit: r1390102 - in /giraph/trunk: CHANGELOG
src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
src/main/java/org/apache/giraph/graph/BspServiceWorker.java
src/main/java/org/apache/giraph/graph/partition/Partition.java
Author: aching
Date: Tue Sep 25 19:46:09 2012
New Revision: 1390102
URL: http://svn.apache.org/viewvc?rev=1390102&view=rev
Log:
GIRAPH-274: Jobs still failing due to tasks timeout during
INPUT_SUPERSTEP. (nitayj via aching)
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java
Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1390102&r1=1390101&r2=1390102&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Tue Sep 25 19:46:09 2012
@@ -2,6 +2,9 @@ Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-274: Jobs still failing due to tasks timeout during
+ INPUT_SUPERSTEP. (nitayj via aching)
+
GIRAPH-337: Make a specific Giraph configuration for Class caching
and specific Giraph configuration. (aching)
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java?rev=1390102&r1=1390101&r2=1390102&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java Tue Sep 25 19:46:09 2012
@@ -161,6 +161,8 @@ public class NettyWorkerServer<I extends
vertexIndex, originalVertex, mutations,
serverData.getCurrentMessageStore().
hasMessagesForVertex(vertexIndex));
+ service.getGraphMapper().getGraphState().getContext().progress();
+
if (LOG.isDebugEnabled()) {
LOG.debug("prepareSuperstep: Resolved vertex index " +
vertexIndex + " with original vertex " +
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1390102&r1=1390101&r2=1390102&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Tue Sep 25 19:46:09 2012
@@ -286,6 +286,7 @@ public class BspServiceWorker<I extends
transferRegulator = null; // don't need this anymore
return null;
}
+ getContext().progress();
// Wait for either a reservation to go away or a notification that
// an InputSplit has finished.
context.progress();
@@ -463,6 +464,7 @@ public class BspServiceWorker<I extends
if (readerVertex.getValue() == null) {
readerVertex.setValue(getConfiguration().createVertexValue());
}
+ readerVertex.setGraphState(getGraphMapper().getGraphState());
PartitionOwner partitionOwner =
workerGraphPartitioner.getPartitionOwner(
readerVertex.getId());
@@ -1111,8 +1113,10 @@ public class BspServiceWorker<I extends
for (Partition<I, V, E, M> partition :
getPartitionStore().getPartitions()) {
for (Vertex<I, V, E, M> vertex : partition.getVertices()) {
+ getContext().progress();
vertexWriter.writeVertex(vertex);
}
+ getContext().progress();
}
vertexWriter.close(getContext());
}
@@ -1230,6 +1234,7 @@ public class BspServiceWorker<I extends
(verticesOutputStream.getPos() - startPos) +
", partition = " + partition.toString());
}
+ getContext().progress();
}
// Metadata is buffered and written at the end since it's small and
// needs to know how many partitions this worker owns
@@ -1342,6 +1347,7 @@ public class BspServiceWorker<I extends
partitionOwner);
}
getPartitionStore().addPartition(partition);
+ getContext().progress();
++loadedPartitions;
} catch (IOException e) {
throw new RuntimeException(
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java?rev=1390102&r1=1390101&r2=1390102&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java Tue Sep 25 19:46:09 2012
@@ -154,9 +154,9 @@ public class Partition<I extends Writabl
int vertices = input.readInt();
for (int i = 0; i < vertices; ++i) {
Vertex<I, V, E, M> vertex = conf.createVertex();
+ vertex.getContext().progress();
vertex.readFields(input);
- if (vertexMap.put(vertex.getId(),
- (Vertex<I, V, E, M>) vertex) != null) {
+ if (vertexMap.put(vertex.getId(), vertex) != null) {
throw new IllegalStateException(
"readFields: " + this +
" already has same id " + vertex);
@@ -168,6 +168,7 @@ public class Partition<I extends Writabl
public void write(DataOutput output) throws IOException {
output.writeInt(vertexMap.size());
for (Vertex vertex : vertexMap.values()) {
+ vertex.getContext().progress();
vertex.write(output);
}
}