You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/09/25 21:46:09 UTC

svn commit: r1390102 - in /giraph/trunk: CHANGELOG src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java src/main/java/org/apache/giraph/graph/BspServiceWorker.java src/main/java/org/apache/giraph/graph/partition/Partition.java

Author: aching
Date: Tue Sep 25 19:46:09 2012
New Revision: 1390102

URL: http://svn.apache.org/viewvc?rev=1390102&view=rev
Log:
GIRAPH-274: Jobs still failing due to tasks timeout during
INPUT_SUPERSTEP. (nitayj via aching)

Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1390102&r1=1390101&r2=1390102&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Tue Sep 25 19:46:09 2012
@@ -2,6 +2,9 @@ Giraph Change Log
 
 Release 0.2.0 - unreleased
 
+  GIRAPH-274: Jobs still failing due to tasks timeout during
+  INPUT_SUPERSTEP. (nitayj via aching)
+
   GIRAPH-337: Make a specific Giraph configuration for Class caching
   and specific Giraph configuration. (aching)
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java?rev=1390102&r1=1390101&r2=1390102&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java Tue Sep 25 19:46:09 2012
@@ -161,6 +161,8 @@ public class NettyWorkerServer<I extends
           vertexIndex, originalVertex, mutations,
           serverData.getCurrentMessageStore().
               hasMessagesForVertex(vertexIndex));
+      service.getGraphMapper().getGraphState().getContext().progress();
+
       if (LOG.isDebugEnabled()) {
         LOG.debug("prepareSuperstep: Resolved vertex index " +
             vertexIndex + " with original vertex " +

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1390102&r1=1390101&r2=1390102&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Tue Sep 25 19:46:09 2012
@@ -286,6 +286,7 @@ public class BspServiceWorker<I extends 
         transferRegulator = null; // don't need this anymore
         return null;
       }
+      getContext().progress();
       // Wait for either a reservation to go away or a notification that
       // an InputSplit has finished.
       context.progress();
@@ -463,6 +464,7 @@ public class BspServiceWorker<I extends 
       if (readerVertex.getValue() == null) {
         readerVertex.setValue(getConfiguration().createVertexValue());
       }
+      readerVertex.setGraphState(getGraphMapper().getGraphState());
       PartitionOwner partitionOwner =
           workerGraphPartitioner.getPartitionOwner(
               readerVertex.getId());
@@ -1111,8 +1113,10 @@ public class BspServiceWorker<I extends 
     for (Partition<I, V, E, M> partition :
         getPartitionStore().getPartitions()) {
       for (Vertex<I, V, E, M> vertex : partition.getVertices()) {
+        getContext().progress();
         vertexWriter.writeVertex(vertex);
       }
+      getContext().progress();
     }
     vertexWriter.close(getContext());
   }
@@ -1230,6 +1234,7 @@ public class BspServiceWorker<I extends 
             (verticesOutputStream.getPos() - startPos) +
             ", partition = " + partition.toString());
       }
+      getContext().progress();
     }
     // Metadata is buffered and written at the end since it's small and
     // needs to know how many partitions this worker owns
@@ -1342,6 +1347,7 @@ public class BspServiceWorker<I extends 
                     partitionOwner);
           }
           getPartitionStore().addPartition(partition);
+          getContext().progress();
           ++loadedPartitions;
         } catch (IOException e) {
           throw new RuntimeException(

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java?rev=1390102&r1=1390101&r2=1390102&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java Tue Sep 25 19:46:09 2012
@@ -154,9 +154,9 @@ public class Partition<I extends Writabl
     int vertices = input.readInt();
     for (int i = 0; i < vertices; ++i) {
       Vertex<I, V, E, M> vertex = conf.createVertex();
+      vertex.getContext().progress();
       vertex.readFields(input);
-      if (vertexMap.put(vertex.getId(),
-          (Vertex<I, V, E, M>) vertex) != null) {
+      if (vertexMap.put(vertex.getId(), vertex) != null) {
         throw new IllegalStateException(
             "readFields: " + this +
             " already has same id " + vertex);
@@ -168,6 +168,7 @@ public class Partition<I extends Writabl
   public void write(DataOutput output) throws IOException {
     output.writeInt(vertexMap.size());
     for (Vertex vertex : vertexMap.values()) {
+      vertex.getContext().progress();
       vertex.write(output);
     }
   }