You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/03/20 19:39:17 UTC

git commit: GIRAPH-537: Fix log messages produced by aggregators (majakabiljo)

Updated Branches:
  refs/heads/trunk f31c62575 -> 40bc599b8


GIRAPH-537: Fix log messages produced by aggregators (majakabiljo)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/40bc599b
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/40bc599b
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/40bc599b

Branch: refs/heads/trunk
Commit: 40bc599b8a7e7caaeb0a284fd2d2103926b79288
Parents: f31c625
Author: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Authored: Wed Mar 20 11:38:18 2013 -0700
Committer: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Committed: Wed Mar 20 11:38:18 2013 -0700

----------------------------------------------------------------------
 CHANGELOG                                          |    2 +
 .../comm/aggregators/AllAggregatorServerData.java  |   34 ++--
 .../aggregators/OwnerAggregatorServerData.java     |   18 +-
 .../giraph/comm/netty/NettyMasterClient.java       |    3 +-
 .../NettyWorkerAggregatorRequestProcessor.java     |    6 +-
 .../requests/ByteArrayWithSenderTaskIdRequest.java |   71 +++++++
 .../requests/SendAggregatorsToOwnerRequest.java    |   12 +-
 .../requests/SendAggregatorsToWorkerRequest.java   |   10 +-
 .../requests/SendWorkerAggregatorsRequest.java     |   10 +-
 .../org/apache/giraph/utils/ExpectedBarrier.java   |  125 ------------
 .../apache/giraph/utils/TaskIdsPermitsBarrier.java |  155 +++++++++++++++
 .../giraph/worker/WorkerAggregatorHandler.java     |   31 +++-
 12 files changed, 309 insertions(+), 168 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/40bc599b/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index e4430f2..462f104 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-537: Fix log messages produced by aggregators (majakabiljo)
+
   GIRAPH-480: Add convergence detection to org.apache.giraph.examples.RandomWalkVertex (ssc)
 
   GIRAPH-565: Make an easy way to gather some logs from workers on master (majakabiljo)

http://git-wip-us.apache.org/repos/asf/giraph/blob/40bc599b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java
index dddd1cb..f38c6cd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java
@@ -19,7 +19,8 @@
 package org.apache.giraph.comm.aggregators;
 
 import org.apache.giraph.aggregators.Aggregator;
-import org.apache.giraph.utils.ExpectedBarrier;
+import org.apache.giraph.master.MasterInfo;
+import org.apache.giraph.utils.TaskIdsPermitsBarrier;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.Progressable;
 import org.apache.log4j.Logger;
@@ -30,6 +31,7 @@ import com.google.common.collect.Maps;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 
 /**
@@ -65,7 +67,7 @@ public class AllAggregatorServerData {
    * (named AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)
    * to know how many requests it has to receive.
    */
-  private final ExpectedBarrier masterBarrier;
+  private final TaskIdsPermitsBarrier masterBarrier;
   /**
    * Aggregator data which this worker received from master and which it is
    * going to distribute before starting next superstep. Thread-safe.
@@ -78,7 +80,7 @@ public class AllAggregatorServerData {
    * (named AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)
    * to know how many requests it has to receive.
    */
-  private final ExpectedBarrier workersBarrier;
+  private final TaskIdsPermitsBarrier workersBarrier;
   /** Progressable used to report progress */
   private final Progressable progressable;
 
@@ -89,8 +91,8 @@ public class AllAggregatorServerData {
    */
   public AllAggregatorServerData(Progressable progressable) {
     this.progressable = progressable;
-    workersBarrier = new ExpectedBarrier(progressable);
-    masterBarrier = new ExpectedBarrier(progressable);
+    workersBarrier = new TaskIdsPermitsBarrier(progressable);
+    masterBarrier = new TaskIdsPermitsBarrier(progressable);
   }
 
   /**
@@ -154,9 +156,10 @@ public class AllAggregatorServerData {
    * arrive from master.
    *
    * @param requestCount Number of requests which should arrive
+   * @param taskId Task id of master
    */
-  public void receivedRequestCountFromMaster(long requestCount) {
-    masterBarrier.requirePermits(requestCount);
+  public void receivedRequestCountFromMaster(long requestCount, int taskId) {
+    masterBarrier.requirePermits(requestCount, taskId);
   }
 
   /**
@@ -172,19 +175,22 @@ public class AllAggregatorServerData {
    * arrive from one of the workers.
    *
    * @param requestCount Number of requests which should arrive
+   * @param taskId Task id of that worker
    */
-  public void receivedRequestCountFromWorker(long requestCount) {
-    workersBarrier.requirePermits(requestCount);
+  public void receivedRequestCountFromWorker(long requestCount, int taskId) {
+    workersBarrier.requirePermits(requestCount, taskId);
   }
 
   /**
    * This function will wait until all aggregator requests from master have
    * arrived, and return that data afterwards.
    *
+   * @param masterInfo Master info
    * @return Iterable through data received from master
    */
-  public Iterable<byte[]> getDataFromMasterWhenReady() {
-    masterBarrier.waitForRequiredPermits(1);
+  public Iterable<byte[]> getDataFromMasterWhenReady(MasterInfo masterInfo) {
+    masterBarrier.waitForRequiredPermits(
+        Collections.singleton(masterInfo.getTaskId()));
     if (LOG.isDebugEnabled()) {
       LOG.debug("getDataFromMasterWhenReady: " +
           "Aggregator data for distribution ready");
@@ -196,7 +202,7 @@ public class AllAggregatorServerData {
    * This function will wait until all aggregator requests from workers have
    * arrived, and fill the maps for next superstep when ready.
    *
-   * @param numberOfWorkers Total number of workers in the job
+   * @param workerIds All workers in the job apart from the current one
    * @param previousAggregatedValuesMap Map of values from previous
    *                                    superstep to fill out
    * @param currentAggregatorMap Map of aggregators for current superstep to
@@ -204,10 +210,10 @@ public class AllAggregatorServerData {
    *                             be set to initial value.
    */
   public void fillNextSuperstepMapsWhenReady(
-      int numberOfWorkers,
+      Set<Integer> workerIds,
       Map<String, Writable> previousAggregatedValuesMap,
       Map<String, Aggregator<Writable>> currentAggregatorMap) {
-    workersBarrier.waitForRequiredPermits(numberOfWorkers - 1);
+    workersBarrier.waitForRequiredPermits(workerIds);
     if (LOG.isDebugEnabled()) {
       LOG.debug("fillNextSuperstepMapsWhenReady: Aggregators ready");
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/40bc599b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java
index 70ff7fe..bd6068a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java
@@ -19,7 +19,7 @@
 package org.apache.giraph.comm.aggregators;
 
 import org.apache.giraph.aggregators.Aggregator;
-import org.apache.giraph.utils.ExpectedBarrier;
+import org.apache.giraph.utils.TaskIdsPermitsBarrier;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.Progressable;
 import org.apache.log4j.Logger;
@@ -30,6 +30,7 @@ import com.google.common.collect.Maps;
 
 import java.util.AbstractMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 
 /**
@@ -68,7 +69,7 @@ public class OwnerAggregatorServerData {
    * (named AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)
    * to know how many requests it has to receive.
    */
-  private final ExpectedBarrier workersBarrier;
+  private final TaskIdsPermitsBarrier workersBarrier;
   /** Progressable used to report progress */
   private final Progressable progressable;
 
@@ -79,7 +80,7 @@ public class OwnerAggregatorServerData {
    */
   public OwnerAggregatorServerData(Progressable progressable) {
     this.progressable = progressable;
-    workersBarrier = new ExpectedBarrier(progressable);
+    workersBarrier = new TaskIdsPermitsBarrier(progressable);
   }
 
   /**
@@ -143,9 +144,10 @@ public class OwnerAggregatorServerData {
    * arrive from one of the workers. Thread-safe.
    *
    * @param requestCount Number of requests which should arrive
+   * @param taskId Task id of that worker
    */
-  public void receivedRequestCountFromWorker(long requestCount) {
-    workersBarrier.requirePermits(requestCount);
+  public void receivedRequestCountFromWorker(long requestCount, int taskId) {
+    workersBarrier.requirePermits(requestCount, taskId);
   }
 
   /**
@@ -153,12 +155,12 @@ public class OwnerAggregatorServerData {
    * workers are ready and aggregated, and return final aggregated values
    * afterwards.
    *
-   * @param numberOfWorkers Total number of workers in the job
+   * @param workerIds All workers in the job apart from the current one
    * @return Iterable through final aggregated values which this worker owns
    */
   public Iterable<Map.Entry<String, Writable>>
-  getMyAggregatorValuesWhenReady(int numberOfWorkers) {
-    workersBarrier.waitForRequiredPermits(numberOfWorkers - 1);
+  getMyAggregatorValuesWhenReady(Set<Integer> workerIds) {
+    workersBarrier.waitForRequiredPermits(workerIds);
     if (LOG.isDebugEnabled()) {
       LOG.debug("getMyAggregatorValuesWhenReady: Values ready");
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/40bc599b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
index 86ea8dc..319f41a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
@@ -104,7 +104,8 @@ public class NettyMasterClient implements MasterClient {
     byte[] aggregatorData =
         sendAggregatorCache.removeAggregators(worker.getTaskId());
     nettyClient.sendWritableRequest(
-        worker.getTaskId(), new SendAggregatorsToOwnerRequest(aggregatorData));
+        worker.getTaskId(), new SendAggregatorsToOwnerRequest(aggregatorData,
+          service.getMasterInfo().getTaskId()));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/40bc599b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java
index cd24219..d1cce64 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java
@@ -109,7 +109,8 @@ public class NettyWorkerAggregatorRequestProcessor
     byte[] aggregatorData =
         sendAggregatedValueCache.removeAggregators(worker.getTaskId());
     workerClient.sendWritableRequest(worker.getTaskId(),
-        new SendWorkerAggregatorsRequest(aggregatorData));
+        new SendWorkerAggregatorsRequest(aggregatorData,
+            serviceWorker.getWorkerInfo().getTaskId()));
   }
 
   @Override
@@ -124,7 +125,8 @@ public class NettyWorkerAggregatorRequestProcessor
       Iterable<byte[]> aggregatorDataList) throws IOException {
     for (byte[] aggregatorData : aggregatorDataList) {
       SendAggregatorsToWorkerRequest request =
-          new SendAggregatorsToWorkerRequest(aggregatorData);
+          new SendAggregatorsToWorkerRequest(aggregatorData,
+              serviceWorker.getWorkerInfo().getTaskId());
       for (WorkerInfo worker : serviceWorker.getWorkerInfoList()) {
         if (!isThisWorker(worker)) {
           workerClient.sendWritableRequest(worker.getTaskId(), request);

http://git-wip-us.apache.org/repos/asf/giraph/blob/40bc599b/giraph-core/src/main/java/org/apache/giraph/comm/requests/ByteArrayWithSenderTaskIdRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/ByteArrayWithSenderTaskIdRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/ByteArrayWithSenderTaskIdRequest.java
new file mode 100644
index 0000000..d66339e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/ByteArrayWithSenderTaskIdRequest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.requests;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Abstract request which has a byte array and task id of the sender as its
+ * data
+ */
+public abstract class ByteArrayWithSenderTaskIdRequest
+    extends ByteArrayRequest {
+  /** Task id of the sender of request */
+  private int senderTaskId;
+
+  /**
+   * Constructor
+   *
+   * @param data Request data
+   * @param senderTaskId Sender task id
+   */
+  public ByteArrayWithSenderTaskIdRequest(byte[] data, int senderTaskId) {
+    super(data);
+    this.senderTaskId = senderTaskId;
+  }
+
+  /**
+   * Default constructor
+   */
+  public ByteArrayWithSenderTaskIdRequest() {
+  }
+
+  public int getSenderTaskId() {
+    return senderTaskId;
+  }
+
+  @Override
+  void writeRequest(DataOutput output) throws IOException {
+    super.writeRequest(output);
+    output.writeInt(senderTaskId);
+  }
+
+  @Override
+  void readFieldsRequest(DataInput input) throws IOException {
+    super.readFieldsRequest(input);
+    senderTaskId = input.readInt();
+  }
+
+  @Override
+  public int getSerializedSize() {
+    return super.getSerializedSize() + 4;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/40bc599b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
index 21b1b2d..e2681ee 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
@@ -32,16 +32,17 @@ import java.io.IOException;
  * Request to send final aggregatd values from master to worker which owns
  * the aggregators
  */
-public class SendAggregatorsToOwnerRequest extends ByteArrayRequest
-    implements WorkerRequest {
+public class SendAggregatorsToOwnerRequest
+    extends ByteArrayWithSenderTaskIdRequest implements WorkerRequest {
 
   /**
    * Constructor
    *
    * @param data Serialized aggregator data
+   * @param senderTaskId Sender task id
    */
-  public SendAggregatorsToOwnerRequest(byte[] data) {
-    super(data);
+  public SendAggregatorsToOwnerRequest(byte[] data, int senderTaskId) {
+    super(data, senderTaskId);
   }
 
   /**
@@ -62,7 +63,8 @@ public class SendAggregatorsToOwnerRequest extends ByteArrayRequest
         if (aggregatorName.equals(AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) {
           LongWritable count = new LongWritable(0);
           count.readFields(input);
-          aggregatorData.receivedRequestCountFromMaster(count.get());
+          aggregatorData.receivedRequestCountFromMaster(count.get(),
+              getSenderTaskId());
         } else {
           Class<Aggregator<Writable>> aggregatorClass =
               AggregatorUtils.getAggregatorClass(aggregatorClassName);

http://git-wip-us.apache.org/repos/asf/giraph/blob/40bc599b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java
index 7e84e17..52e4cba 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java
@@ -33,15 +33,16 @@ import java.io.IOException;
  * other workers
  */
 public class SendAggregatorsToWorkerRequest extends
-    ByteArrayRequest implements WorkerRequest {
+    ByteArrayWithSenderTaskIdRequest implements WorkerRequest {
 
   /**
    * Constructor
    *
    * @param data Serialized aggregator data
+   * @param senderTaskId Sender task id
    */
-  public SendAggregatorsToWorkerRequest(byte[] data) {
-    super(data);
+  public SendAggregatorsToWorkerRequest(byte[] data, int senderTaskId) {
+    super(data, senderTaskId);
   }
 
   /**
@@ -62,7 +63,8 @@ public class SendAggregatorsToWorkerRequest extends
         if (aggregatorName.equals(AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) {
           LongWritable count = new LongWritable(0);
           count.readFields(input);
-          aggregatorData.receivedRequestCountFromWorker(count.get());
+          aggregatorData.receivedRequestCountFromWorker(count.get(),
+              getSenderTaskId());
         } else {
           Class<Aggregator<Writable>> aggregatorClass =
               AggregatorUtils.getAggregatorClass(aggregatorClassName);

http://git-wip-us.apache.org/repos/asf/giraph/blob/40bc599b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java
index 264f03a..00a0c26 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java
@@ -32,15 +32,16 @@ import java.io.IOException;
  * which were computed by one worker's vertices)
  */
 public class SendWorkerAggregatorsRequest extends
-    ByteArrayRequest implements WorkerRequest {
+    ByteArrayWithSenderTaskIdRequest implements WorkerRequest {
 
   /**
    * Constructor
    *
    * @param data Serialized aggregator data
+   * @param senderTaskId Sender task id
    */
-  public SendWorkerAggregatorsRequest(byte[] data) {
-    super(data);
+  public SendWorkerAggregatorsRequest(byte[] data, int senderTaskId) {
+    super(data, senderTaskId);
   }
 
   /**
@@ -62,7 +63,8 @@ public class SendWorkerAggregatorsRequest extends
             AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) {
           LongWritable count = new LongWritable(0);
           count.readFields(input);
-          aggregatorData.receivedRequestCountFromWorker(count.get());
+          aggregatorData.receivedRequestCountFromWorker(count.get(),
+              getSenderTaskId());
         } else {
           Writable aggregatedValue =
               aggregatorData.createAggregatorInitialValue(aggregatorName);

http://git-wip-us.apache.org/repos/asf/giraph/blob/40bc599b/giraph-core/src/main/java/org/apache/giraph/utils/ExpectedBarrier.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ExpectedBarrier.java b/giraph-core/src/main/java/org/apache/giraph/utils/ExpectedBarrier.java
deleted file mode 100644
index ccd137c..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ExpectedBarrier.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.utils;
-
-import org.apache.hadoop.util.Progressable;
-import org.apache.log4j.Logger;
-
-/**
- * User must follow this protocol for concurrent access:
- *
- * (1) an object instance is constructed
- * (2) arbitrarily many times
- *     (2a) concurrent calls to requirePermits(), releasePermits() and
- *          waitForRequiredPermits() are issued
- *     (2b) waitForRequiredPermits() returns
- *
- * Note that the next cycle of  calls to requirePermits() or releasePermits()
- * cannot start until the previous call to waitForRequiredPermits()
- * has returned.
- *
- * Methods of this class are thread-safe.
- */
-public class ExpectedBarrier {
-  /** Class logger */
-  private static final Logger LOG = Logger.getLogger(ExpectedBarrier.class);
-  /** Msecs to refresh the progress meter */
-  private static final int MSEC_PERIOD = 10000;
-  /** Progressable for reporting progress */
-  private final Progressable progressable;
-  /** Number of times permits were added */
-  private long timesRequired = 0;
-  /** Number of permits we are currently waiting for */
-  private long waitingOnPermits = 0;
-  /** Logger */
-  private final TimedLogger logger;
-
-  /**
-   * Constructor
-   *
-   * @param progressable Progressable for reporting progress
-   */
-  public ExpectedBarrier(Progressable progressable) {
-    this.progressable = progressable;
-    logger = new TimedLogger(MSEC_PERIOD, LOG);
-  }
-
-  /**
-   * Wait until permits have been required desired number of times,
-   * and all required permits are available
-   *
-   * @param desiredTimesRequired How many times should permits have been
-   *                             required
-   */
-  public synchronized void waitForRequiredPermits(
-      long desiredTimesRequired) {
-    while (timesRequired < desiredTimesRequired || waitingOnPermits > 0) {
-      try {
-        wait(MSEC_PERIOD);
-      } catch (InterruptedException e) {
-        throw new IllegalStateException("waitForRequiredPermits: " +
-            "InterruptedException occurred");
-      }
-      progressable.progress();
-      if (LOG.isInfoEnabled()) {
-        if (timesRequired < desiredTimesRequired) {
-          logger.info("waitForRequiredPermits: " +
-              "Waiting for times required to be " + desiredTimesRequired +
-              " (currently " + timesRequired + ") ");
-        } else {
-          logger.info("waitForRequiredPermits: " +
-              "Waiting for " + waitingOnPermits + " more permits.");
-        }
-      }
-    }
-
-    // Reset for the next time to use
-    timesRequired = 0;
-    waitingOnPermits = 0;
-  }
-
-  /**
-   * Require more permits. This will increase the number of times permits
-   * were required. Doesn't wait for permits to become available.
-   *
-   * @param permits Number of permits to require
-   */
-  public synchronized void requirePermits(long permits) {
-    timesRequired++;
-    waitingOnPermits += permits;
-    notifyAll();
-  }
-
-  /**
-   * Release one permit.
-   */
-  public synchronized void releaseOnePermit() {
-    releasePermits(1);
-  }
-
-  /**
-   * Release some permits.
-   *
-   * @param permits Number of permits to release
-   */
-  public synchronized void releasePermits(long permits) {
-    waitingOnPermits -= permits;
-    notifyAll();
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/40bc599b/giraph-core/src/main/java/org/apache/giraph/utils/TaskIdsPermitsBarrier.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/TaskIdsPermitsBarrier.java b/giraph-core/src/main/java/org/apache/giraph/utils/TaskIdsPermitsBarrier.java
new file mode 100644
index 0000000..e2c22cc
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/TaskIdsPermitsBarrier.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.util.Progressable;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Sets;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * This barrier is used when we don't know how many events are we waiting on
+ * from the start. Instead we have a set of task ids, and each of those will,
+ * at some point of time, give the information about how many events from it
+ * should we expect. Barrier will be waiting for all the tasks to notify it
+ * about that number of events, and than it will also wait for all the events
+ * to happen.
+ *
+ * requirePermits() corresponds to task notifying us how many events from it
+ * to expect, and releasePermits() notifies us about events happening.
+ *
+ * This class is currently used during preparation of aggregators.
+ *
+ * User must follow this protocol for concurrent access:
+ *
+ * (1) an object instance is constructed
+ * (2) arbitrarily many times
+ *     (2a) concurrent calls to requirePermits(), releasePermits() and
+ *          waitForRequiredPermits() are issued
+ *     (2b) waitForRequiredPermits() returns
+ *
+ * Note that the next cycle of calls to requirePermits() or releasePermits()
+ * cannot start until the previous call to waitForRequiredPermits()
+ * has returned.
+ *
+ * Methods of this class are thread-safe.
+ */
+public class TaskIdsPermitsBarrier {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(TaskIdsPermitsBarrier.class);
+  /** Msecs to refresh the progress meter */
+  private static final int MSEC_PERIOD = 10000;
+  /** Maximum number of task ids to list in the log */
+  private static final int MAX_TASK_IDS_TO_LOG = 10;
+  /** Progressable for reporting progress */
+  private final Progressable progressable;
+  /** Number of permits we are currently waiting for */
+  private long waitingOnPermits = 0;
+  /** Set of task ids which required permits already */
+  private final Set<Integer> arrivedTaskIds = new HashSet<Integer>();
+  /** Logger */
+  private final TimedLogger logger;
+
+  /**
+   * Constructor
+   *
+   * @param progressable Progressable for reporting progress
+   */
+  public TaskIdsPermitsBarrier(Progressable progressable) {
+    this.progressable = progressable;
+    logger = new TimedLogger(MSEC_PERIOD, LOG);
+  }
+
+  /**
+   * Wait until permits have been required desired number of times,
+   * and all required permits are available
+   *
+   * @param expectedTaskIds List of task ids which we are waiting permits from
+   */
+  public synchronized void waitForRequiredPermits(
+      Set<Integer> expectedTaskIds) {
+    while (arrivedTaskIds.size() < expectedTaskIds.size() ||
+        waitingOnPermits > 0) {
+      try {
+        wait(MSEC_PERIOD);
+      } catch (InterruptedException e) {
+        throw new IllegalStateException("waitForRequiredPermits: " +
+            "InterruptedException occurred");
+      }
+      progressable.progress();
+      if (LOG.isInfoEnabled()) {
+        if (arrivedTaskIds.size() < expectedTaskIds.size()) {
+          String logSuffix = "";
+          if (expectedTaskIds.size() - arrivedTaskIds.size() <=
+              MAX_TASK_IDS_TO_LOG) {
+            Sets.SetView<Integer> difference =
+                Sets.difference(expectedTaskIds, arrivedTaskIds);
+            logSuffix = ", task ids: " + difference;
+          }
+          logger.info("waitForRequiredPermits: " +
+              "Waiting for " +
+              (expectedTaskIds.size() - arrivedTaskIds.size()) +
+              " more tasks to send their aggregator data" +
+              logSuffix);
+        } else {
+          logger.info("waitForRequiredPermits: " +
+              "Waiting for " + waitingOnPermits + " more aggregator requests");
+        }
+      }
+    }
+
+    // Reset for the next time to use
+    arrivedTaskIds.clear();
+    waitingOnPermits = 0;
+  }
+
+  /**
+   * Require more permits. This will increase the number of times permits
+   * were required. Doesn't wait for permits to become available.
+   *
+   * @param permits Number of permits to require
+   * @param taskId Task id which required permits
+   */
+  public synchronized void requirePermits(long permits, int taskId) {
+    arrivedTaskIds.add(taskId);
+    waitingOnPermits += permits;
+    notifyAll();
+  }
+
+  /**
+   * Release one permit.
+   */
+  public synchronized void releaseOnePermit() {
+    releasePermits(1);
+  }
+
+  /**
+   * Release some permits.
+   *
+   * @param permits Number of permits to release
+   */
+  public synchronized void releasePermits(long permits) {
+    waitingOnPermits -= permits;
+    notifyAll();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/40bc599b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
index 001cf59..3c18449 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
@@ -31,9 +31,11 @@ import org.apache.hadoop.util.Progressable;
 import org.apache.log4j.Logger;
 
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * Handler for aggregators on worker. Provides the aggregated values and
@@ -129,7 +131,8 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
         serviceWorker.getServerData().getAllAggregatorData();
     // Wait for my aggregators
     Iterable<byte[]> dataToDistribute =
-        allAggregatorData.getDataFromMasterWhenReady();
+        allAggregatorData.getDataFromMasterWhenReady(
+            serviceWorker.getMasterInfo());
     try {
       // Distribute my aggregators
       requestProcessor.distributeAggregators(dataToDistribute);
@@ -139,7 +142,7 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
     }
     // Wait for all other aggregators and store them
     allAggregatorData.fillNextSuperstepMapsWhenReady(
-        serviceWorker.getWorkerInfoList().size(), previousAggregatedValueMap,
+        getOtherWorkerIdsSet(), previousAggregatedValueMap,
         currentAggregatorMap);
     allAggregatorData.reset();
     if (LOG.isDebugEnabled()) {
@@ -154,8 +157,10 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
    */
   public void finishSuperstep(
       WorkerAggregatorRequestProcessor requestProcessor) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("finishSuperstep: Start finishing aggregators");
+    if (LOG.isInfoEnabled()) {
+      LOG.info("finishSuperstep: Start gathering aggregators, " +
+          "workers will send their aggregated values " +
+          "once they are done with superstep computation");
     }
     OwnerAggregatorServerData ownerAggregatorData =
         serviceWorker.getServerData().getOwnerAggregatorData();
@@ -189,7 +194,7 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
     // Wait to receive partial aggregated values from all other workers
     Iterable<Map.Entry<String, Writable>> myAggregators =
         ownerAggregatorData.getMyAggregatorValuesWhenReady(
-            serviceWorker.getWorkerInfoList().size());
+            getOtherWorkerIdsSet());
 
     // Send final aggregated values to master
     AggregatedValueOutputStream aggregatorOutput =
@@ -245,6 +250,22 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
   }
 
   /**
+   * Get set of all worker task ids except the current one
+   *
+   * @return Set of all other worker task ids
+   */
+  public Set<Integer> getOtherWorkerIdsSet() {
+    Set<Integer> otherWorkers = Sets.newHashSetWithExpectedSize(
+        serviceWorker.getWorkerInfoList().size());
+    for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
+      if (workerInfo.getTaskId() != serviceWorker.getWorkerInfo().getTaskId()) {
+        otherWorkers.add(workerInfo.getTaskId());
+      }
+    }
+    return otherWorkers;
+  }
+
+  /**
    * Not thread-safe implementation of {@link WorkerThreadAggregatorUsage}.
    * We can use one instance of this object per thread to prevent
    * synchronizing on each aggregate() call. In the end of superstep,