You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/03/20 19:39:17 UTC
git commit: GIRAPH-537: Fix log messages produced by aggregators
(majakabiljo)
Updated Branches:
refs/heads/trunk f31c62575 -> 40bc599b8
GIRAPH-537: Fix log messages produced by aggregators (majakabiljo)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/40bc599b
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/40bc599b
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/40bc599b
Branch: refs/heads/trunk
Commit: 40bc599b8a7e7caaeb0a284fd2d2103926b79288
Parents: f31c625
Author: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Authored: Wed Mar 20 11:38:18 2013 -0700
Committer: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Committed: Wed Mar 20 11:38:18 2013 -0700
----------------------------------------------------------------------
CHANGELOG | 2 +
.../comm/aggregators/AllAggregatorServerData.java | 34 ++--
.../aggregators/OwnerAggregatorServerData.java | 18 +-
.../giraph/comm/netty/NettyMasterClient.java | 3 +-
.../NettyWorkerAggregatorRequestProcessor.java | 6 +-
.../requests/ByteArrayWithSenderTaskIdRequest.java | 71 +++++++
.../requests/SendAggregatorsToOwnerRequest.java | 12 +-
.../requests/SendAggregatorsToWorkerRequest.java | 10 +-
.../requests/SendWorkerAggregatorsRequest.java | 10 +-
.../org/apache/giraph/utils/ExpectedBarrier.java | 125 ------------
.../apache/giraph/utils/TaskIdsPermitsBarrier.java | 155 +++++++++++++++
.../giraph/worker/WorkerAggregatorHandler.java | 31 +++-
12 files changed, 309 insertions(+), 168 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/40bc599b/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index e4430f2..462f104 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-537: Fix log messages produced by aggregators (majakabiljo)
+
GIRAPH-480: Add convergence detection to org.apache.giraph.examples.RandomWalkVertex (ssc)
GIRAPH-565: Make an easy way to gather some logs from workers on master (majakabiljo)
http://git-wip-us.apache.org/repos/asf/giraph/blob/40bc599b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java
index dddd1cb..f38c6cd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java
@@ -19,7 +19,8 @@
package org.apache.giraph.comm.aggregators;
import org.apache.giraph.aggregators.Aggregator;
-import org.apache.giraph.utils.ExpectedBarrier;
+import org.apache.giraph.master.MasterInfo;
+import org.apache.giraph.utils.TaskIdsPermitsBarrier;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.Progressable;
import org.apache.log4j.Logger;
@@ -30,6 +31,7 @@ import com.google.common.collect.Maps;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentMap;
/**
@@ -65,7 +67,7 @@ public class AllAggregatorServerData {
* (named AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)
* to know how many requests it has to receive.
*/
- private final ExpectedBarrier masterBarrier;
+ private final TaskIdsPermitsBarrier masterBarrier;
/**
* Aggregator data which this worker received from master and which it is
* going to distribute before starting next superstep. Thread-safe.
@@ -78,7 +80,7 @@ public class AllAggregatorServerData {
* (named AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)
* to know how many requests it has to receive.
*/
- private final ExpectedBarrier workersBarrier;
+ private final TaskIdsPermitsBarrier workersBarrier;
/** Progressable used to report progress */
private final Progressable progressable;
@@ -89,8 +91,8 @@ public class AllAggregatorServerData {
*/
public AllAggregatorServerData(Progressable progressable) {
this.progressable = progressable;
- workersBarrier = new ExpectedBarrier(progressable);
- masterBarrier = new ExpectedBarrier(progressable);
+ workersBarrier = new TaskIdsPermitsBarrier(progressable);
+ masterBarrier = new TaskIdsPermitsBarrier(progressable);
}
/**
@@ -154,9 +156,10 @@ public class AllAggregatorServerData {
* arrive from master.
*
* @param requestCount Number of requests which should arrive
+ * @param taskId Task id of master
*/
- public void receivedRequestCountFromMaster(long requestCount) {
- masterBarrier.requirePermits(requestCount);
+ public void receivedRequestCountFromMaster(long requestCount, int taskId) {
+ masterBarrier.requirePermits(requestCount, taskId);
}
/**
@@ -172,19 +175,22 @@ public class AllAggregatorServerData {
* arrive from one of the workers.
*
* @param requestCount Number of requests which should arrive
+ * @param taskId Task id of that worker
*/
- public void receivedRequestCountFromWorker(long requestCount) {
- workersBarrier.requirePermits(requestCount);
+ public void receivedRequestCountFromWorker(long requestCount, int taskId) {
+ workersBarrier.requirePermits(requestCount, taskId);
}
/**
* This function will wait until all aggregator requests from master have
* arrived, and return that data afterwards.
*
+ * @param masterInfo Master info
* @return Iterable through data received from master
*/
- public Iterable<byte[]> getDataFromMasterWhenReady() {
- masterBarrier.waitForRequiredPermits(1);
+ public Iterable<byte[]> getDataFromMasterWhenReady(MasterInfo masterInfo) {
+ masterBarrier.waitForRequiredPermits(
+ Collections.singleton(masterInfo.getTaskId()));
if (LOG.isDebugEnabled()) {
LOG.debug("getDataFromMasterWhenReady: " +
"Aggregator data for distribution ready");
@@ -196,7 +202,7 @@ public class AllAggregatorServerData {
* This function will wait until all aggregator requests from workers have
* arrived, and fill the maps for next superstep when ready.
*
- * @param numberOfWorkers Total number of workers in the job
+ * @param workerIds All workers in the job apart from the current one
* @param previousAggregatedValuesMap Map of values from previous
* superstep to fill out
* @param currentAggregatorMap Map of aggregators for current superstep to
@@ -204,10 +210,10 @@ public class AllAggregatorServerData {
* be set to initial value.
*/
public void fillNextSuperstepMapsWhenReady(
- int numberOfWorkers,
+ Set<Integer> workerIds,
Map<String, Writable> previousAggregatedValuesMap,
Map<String, Aggregator<Writable>> currentAggregatorMap) {
- workersBarrier.waitForRequiredPermits(numberOfWorkers - 1);
+ workersBarrier.waitForRequiredPermits(workerIds);
if (LOG.isDebugEnabled()) {
LOG.debug("fillNextSuperstepMapsWhenReady: Aggregators ready");
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/40bc599b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java
index 70ff7fe..bd6068a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java
@@ -19,7 +19,7 @@
package org.apache.giraph.comm.aggregators;
import org.apache.giraph.aggregators.Aggregator;
-import org.apache.giraph.utils.ExpectedBarrier;
+import org.apache.giraph.utils.TaskIdsPermitsBarrier;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.Progressable;
import org.apache.log4j.Logger;
@@ -30,6 +30,7 @@ import com.google.common.collect.Maps;
import java.util.AbstractMap;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentMap;
/**
@@ -68,7 +69,7 @@ public class OwnerAggregatorServerData {
* (named AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)
* to know how many requests it has to receive.
*/
- private final ExpectedBarrier workersBarrier;
+ private final TaskIdsPermitsBarrier workersBarrier;
/** Progressable used to report progress */
private final Progressable progressable;
@@ -79,7 +80,7 @@ public class OwnerAggregatorServerData {
*/
public OwnerAggregatorServerData(Progressable progressable) {
this.progressable = progressable;
- workersBarrier = new ExpectedBarrier(progressable);
+ workersBarrier = new TaskIdsPermitsBarrier(progressable);
}
/**
@@ -143,9 +144,10 @@ public class OwnerAggregatorServerData {
* arrive from one of the workers. Thread-safe.
*
* @param requestCount Number of requests which should arrive
+ * @param taskId Task id of that worker
*/
- public void receivedRequestCountFromWorker(long requestCount) {
- workersBarrier.requirePermits(requestCount);
+ public void receivedRequestCountFromWorker(long requestCount, int taskId) {
+ workersBarrier.requirePermits(requestCount, taskId);
}
/**
@@ -153,12 +155,12 @@ public class OwnerAggregatorServerData {
* workers are ready and aggregated, and return final aggregated values
* afterwards.
*
- * @param numberOfWorkers Total number of workers in the job
+ * @param workerIds All workers in the job apart from the current one
* @return Iterable through final aggregated values which this worker owns
*/
public Iterable<Map.Entry<String, Writable>>
- getMyAggregatorValuesWhenReady(int numberOfWorkers) {
- workersBarrier.waitForRequiredPermits(numberOfWorkers - 1);
+ getMyAggregatorValuesWhenReady(Set<Integer> workerIds) {
+ workersBarrier.waitForRequiredPermits(workerIds);
if (LOG.isDebugEnabled()) {
LOG.debug("getMyAggregatorValuesWhenReady: Values ready");
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/40bc599b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
index 86ea8dc..319f41a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
@@ -104,7 +104,8 @@ public class NettyMasterClient implements MasterClient {
byte[] aggregatorData =
sendAggregatorCache.removeAggregators(worker.getTaskId());
nettyClient.sendWritableRequest(
- worker.getTaskId(), new SendAggregatorsToOwnerRequest(aggregatorData));
+ worker.getTaskId(), new SendAggregatorsToOwnerRequest(aggregatorData,
+ service.getMasterInfo().getTaskId()));
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/40bc599b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java
index cd24219..d1cce64 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java
@@ -109,7 +109,8 @@ public class NettyWorkerAggregatorRequestProcessor
byte[] aggregatorData =
sendAggregatedValueCache.removeAggregators(worker.getTaskId());
workerClient.sendWritableRequest(worker.getTaskId(),
- new SendWorkerAggregatorsRequest(aggregatorData));
+ new SendWorkerAggregatorsRequest(aggregatorData,
+ serviceWorker.getWorkerInfo().getTaskId()));
}
@Override
@@ -124,7 +125,8 @@ public class NettyWorkerAggregatorRequestProcessor
Iterable<byte[]> aggregatorDataList) throws IOException {
for (byte[] aggregatorData : aggregatorDataList) {
SendAggregatorsToWorkerRequest request =
- new SendAggregatorsToWorkerRequest(aggregatorData);
+ new SendAggregatorsToWorkerRequest(aggregatorData,
+ serviceWorker.getWorkerInfo().getTaskId());
for (WorkerInfo worker : serviceWorker.getWorkerInfoList()) {
if (!isThisWorker(worker)) {
workerClient.sendWritableRequest(worker.getTaskId(), request);
http://git-wip-us.apache.org/repos/asf/giraph/blob/40bc599b/giraph-core/src/main/java/org/apache/giraph/comm/requests/ByteArrayWithSenderTaskIdRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/ByteArrayWithSenderTaskIdRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/ByteArrayWithSenderTaskIdRequest.java
new file mode 100644
index 0000000..d66339e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/ByteArrayWithSenderTaskIdRequest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.requests;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Abstract request which has a byte array and task id of the sender as its
+ * data
+ */
+public abstract class ByteArrayWithSenderTaskIdRequest
+ extends ByteArrayRequest {
+ /** Task id of the sender of request */
+ private int senderTaskId;
+
+ /**
+ * Constructor
+ *
+ * @param data Request data
+ * @param senderTaskId Sender task id
+ */
+ public ByteArrayWithSenderTaskIdRequest(byte[] data, int senderTaskId) {
+ super(data);
+ this.senderTaskId = senderTaskId;
+ }
+
+ /**
+ * Default constructor
+ */
+ public ByteArrayWithSenderTaskIdRequest() {
+ }
+
+ public int getSenderTaskId() {
+ return senderTaskId;
+ }
+
+ @Override
+ void writeRequest(DataOutput output) throws IOException {
+ super.writeRequest(output);
+ output.writeInt(senderTaskId);
+ }
+
+ @Override
+ void readFieldsRequest(DataInput input) throws IOException {
+ super.readFieldsRequest(input);
+ senderTaskId = input.readInt();
+ }
+
+ @Override
+ public int getSerializedSize() {
+ return super.getSerializedSize() + 4;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/40bc599b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
index 21b1b2d..e2681ee 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
@@ -32,16 +32,17 @@ import java.io.IOException;
* Request to send final aggregatd values from master to worker which owns
* the aggregators
*/
-public class SendAggregatorsToOwnerRequest extends ByteArrayRequest
- implements WorkerRequest {
+public class SendAggregatorsToOwnerRequest
+ extends ByteArrayWithSenderTaskIdRequest implements WorkerRequest {
/**
* Constructor
*
* @param data Serialized aggregator data
+ * @param senderTaskId Sender task id
*/
- public SendAggregatorsToOwnerRequest(byte[] data) {
- super(data);
+ public SendAggregatorsToOwnerRequest(byte[] data, int senderTaskId) {
+ super(data, senderTaskId);
}
/**
@@ -62,7 +63,8 @@ public class SendAggregatorsToOwnerRequest extends ByteArrayRequest
if (aggregatorName.equals(AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) {
LongWritable count = new LongWritable(0);
count.readFields(input);
- aggregatorData.receivedRequestCountFromMaster(count.get());
+ aggregatorData.receivedRequestCountFromMaster(count.get(),
+ getSenderTaskId());
} else {
Class<Aggregator<Writable>> aggregatorClass =
AggregatorUtils.getAggregatorClass(aggregatorClassName);
http://git-wip-us.apache.org/repos/asf/giraph/blob/40bc599b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java
index 7e84e17..52e4cba 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java
@@ -33,15 +33,16 @@ import java.io.IOException;
* other workers
*/
public class SendAggregatorsToWorkerRequest extends
- ByteArrayRequest implements WorkerRequest {
+ ByteArrayWithSenderTaskIdRequest implements WorkerRequest {
/**
* Constructor
*
* @param data Serialized aggregator data
+ * @param senderTaskId Sender task id
*/
- public SendAggregatorsToWorkerRequest(byte[] data) {
- super(data);
+ public SendAggregatorsToWorkerRequest(byte[] data, int senderTaskId) {
+ super(data, senderTaskId);
}
/**
@@ -62,7 +63,8 @@ public class SendAggregatorsToWorkerRequest extends
if (aggregatorName.equals(AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) {
LongWritable count = new LongWritable(0);
count.readFields(input);
- aggregatorData.receivedRequestCountFromWorker(count.get());
+ aggregatorData.receivedRequestCountFromWorker(count.get(),
+ getSenderTaskId());
} else {
Class<Aggregator<Writable>> aggregatorClass =
AggregatorUtils.getAggregatorClass(aggregatorClassName);
http://git-wip-us.apache.org/repos/asf/giraph/blob/40bc599b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java
index 264f03a..00a0c26 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java
@@ -32,15 +32,16 @@ import java.io.IOException;
* which were computed by one worker's vertices)
*/
public class SendWorkerAggregatorsRequest extends
- ByteArrayRequest implements WorkerRequest {
+ ByteArrayWithSenderTaskIdRequest implements WorkerRequest {
/**
* Constructor
*
* @param data Serialized aggregator data
+ * @param senderTaskId Sender task id
*/
- public SendWorkerAggregatorsRequest(byte[] data) {
- super(data);
+ public SendWorkerAggregatorsRequest(byte[] data, int senderTaskId) {
+ super(data, senderTaskId);
}
/**
@@ -62,7 +63,8 @@ public class SendWorkerAggregatorsRequest extends
AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) {
LongWritable count = new LongWritable(0);
count.readFields(input);
- aggregatorData.receivedRequestCountFromWorker(count.get());
+ aggregatorData.receivedRequestCountFromWorker(count.get(),
+ getSenderTaskId());
} else {
Writable aggregatedValue =
aggregatorData.createAggregatorInitialValue(aggregatorName);
http://git-wip-us.apache.org/repos/asf/giraph/blob/40bc599b/giraph-core/src/main/java/org/apache/giraph/utils/ExpectedBarrier.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ExpectedBarrier.java b/giraph-core/src/main/java/org/apache/giraph/utils/ExpectedBarrier.java
deleted file mode 100644
index ccd137c..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ExpectedBarrier.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.utils;
-
-import org.apache.hadoop.util.Progressable;
-import org.apache.log4j.Logger;
-
-/**
- * User must follow this protocol for concurrent access:
- *
- * (1) an object instance is constructed
- * (2) arbitrarily many times
- * (2a) concurrent calls to requirePermits(), releasePermits() and
- * waitForRequiredPermits() are issued
- * (2b) waitForRequiredPermits() returns
- *
- * Note that the next cycle of calls to requirePermits() or releasePermits()
- * cannot start until the previous call to waitForRequiredPermits()
- * has returned.
- *
- * Methods of this class are thread-safe.
- */
-public class ExpectedBarrier {
- /** Class logger */
- private static final Logger LOG = Logger.getLogger(ExpectedBarrier.class);
- /** Msecs to refresh the progress meter */
- private static final int MSEC_PERIOD = 10000;
- /** Progressable for reporting progress */
- private final Progressable progressable;
- /** Number of times permits were added */
- private long timesRequired = 0;
- /** Number of permits we are currently waiting for */
- private long waitingOnPermits = 0;
- /** Logger */
- private final TimedLogger logger;
-
- /**
- * Constructor
- *
- * @param progressable Progressable for reporting progress
- */
- public ExpectedBarrier(Progressable progressable) {
- this.progressable = progressable;
- logger = new TimedLogger(MSEC_PERIOD, LOG);
- }
-
- /**
- * Wait until permits have been required desired number of times,
- * and all required permits are available
- *
- * @param desiredTimesRequired How many times should permits have been
- * required
- */
- public synchronized void waitForRequiredPermits(
- long desiredTimesRequired) {
- while (timesRequired < desiredTimesRequired || waitingOnPermits > 0) {
- try {
- wait(MSEC_PERIOD);
- } catch (InterruptedException e) {
- throw new IllegalStateException("waitForRequiredPermits: " +
- "InterruptedException occurred");
- }
- progressable.progress();
- if (LOG.isInfoEnabled()) {
- if (timesRequired < desiredTimesRequired) {
- logger.info("waitForRequiredPermits: " +
- "Waiting for times required to be " + desiredTimesRequired +
- " (currently " + timesRequired + ") ");
- } else {
- logger.info("waitForRequiredPermits: " +
- "Waiting for " + waitingOnPermits + " more permits.");
- }
- }
- }
-
- // Reset for the next time to use
- timesRequired = 0;
- waitingOnPermits = 0;
- }
-
- /**
- * Require more permits. This will increase the number of times permits
- * were required. Doesn't wait for permits to become available.
- *
- * @param permits Number of permits to require
- */
- public synchronized void requirePermits(long permits) {
- timesRequired++;
- waitingOnPermits += permits;
- notifyAll();
- }
-
- /**
- * Release one permit.
- */
- public synchronized void releaseOnePermit() {
- releasePermits(1);
- }
-
- /**
- * Release some permits.
- *
- * @param permits Number of permits to release
- */
- public synchronized void releasePermits(long permits) {
- waitingOnPermits -= permits;
- notifyAll();
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/40bc599b/giraph-core/src/main/java/org/apache/giraph/utils/TaskIdsPermitsBarrier.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/TaskIdsPermitsBarrier.java b/giraph-core/src/main/java/org/apache/giraph/utils/TaskIdsPermitsBarrier.java
new file mode 100644
index 0000000..e2c22cc
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/TaskIdsPermitsBarrier.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.util.Progressable;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Sets;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * This barrier is used when we don't know how many events are we waiting on
+ * from the start. Instead we have a set of task ids, and each of those will,
+ * at some point of time, give the information about how many events from it
+ * should we expect. Barrier will be waiting for all the tasks to notify it
+ * about that number of events, and than it will also wait for all the events
+ * to happen.
+ *
+ * requirePermits() corresponds to task notifying us how many events from it
+ * to expect, and releasePermits() notifies us about events happening.
+ *
+ * This class is currently used during preparation of aggregators.
+ *
+ * User must follow this protocol for concurrent access:
+ *
+ * (1) an object instance is constructed
+ * (2) arbitrarily many times
+ * (2a) concurrent calls to requirePermits(), releasePermits() and
+ * waitForRequiredPermits() are issued
+ * (2b) waitForRequiredPermits() returns
+ *
+ * Note that the next cycle of calls to requirePermits() or releasePermits()
+ * cannot start until the previous call to waitForRequiredPermits()
+ * has returned.
+ *
+ * Methods of this class are thread-safe.
+ */
+public class TaskIdsPermitsBarrier {
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(TaskIdsPermitsBarrier.class);
+ /** Msecs to refresh the progress meter */
+ private static final int MSEC_PERIOD = 10000;
+ /** Maximum number of task ids to list in the log */
+ private static final int MAX_TASK_IDS_TO_LOG = 10;
+ /** Progressable for reporting progress */
+ private final Progressable progressable;
+ /** Number of permits we are currently waiting for */
+ private long waitingOnPermits = 0;
+ /** Set of task ids which required permits already */
+ private final Set<Integer> arrivedTaskIds = new HashSet<Integer>();
+ /** Logger */
+ private final TimedLogger logger;
+
+ /**
+ * Constructor
+ *
+ * @param progressable Progressable for reporting progress
+ */
+ public TaskIdsPermitsBarrier(Progressable progressable) {
+ this.progressable = progressable;
+ logger = new TimedLogger(MSEC_PERIOD, LOG);
+ }
+
+ /**
+ * Wait until permits have been required desired number of times,
+ * and all required permits are available
+ *
+ * @param expectedTaskIds List of task ids which we are waiting permits from
+ */
+ public synchronized void waitForRequiredPermits(
+ Set<Integer> expectedTaskIds) {
+ while (arrivedTaskIds.size() < expectedTaskIds.size() ||
+ waitingOnPermits > 0) {
+ try {
+ wait(MSEC_PERIOD);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("waitForRequiredPermits: " +
+ "InterruptedException occurred");
+ }
+ progressable.progress();
+ if (LOG.isInfoEnabled()) {
+ if (arrivedTaskIds.size() < expectedTaskIds.size()) {
+ String logSuffix = "";
+ if (expectedTaskIds.size() - arrivedTaskIds.size() <=
+ MAX_TASK_IDS_TO_LOG) {
+ Sets.SetView<Integer> difference =
+ Sets.difference(expectedTaskIds, arrivedTaskIds);
+ logSuffix = ", task ids: " + difference;
+ }
+ logger.info("waitForRequiredPermits: " +
+ "Waiting for " +
+ (expectedTaskIds.size() - arrivedTaskIds.size()) +
+ " more tasks to send their aggregator data" +
+ logSuffix);
+ } else {
+ logger.info("waitForRequiredPermits: " +
+ "Waiting for " + waitingOnPermits + " more aggregator requests");
+ }
+ }
+ }
+
+ // Reset for the next time to use
+ arrivedTaskIds.clear();
+ waitingOnPermits = 0;
+ }
+
+ /**
+ * Require more permits. This will increase the number of times permits
+ * were required. Doesn't wait for permits to become available.
+ *
+ * @param permits Number of permits to require
+ * @param taskId Task id which required permits
+ */
+ public synchronized void requirePermits(long permits, int taskId) {
+ arrivedTaskIds.add(taskId);
+ waitingOnPermits += permits;
+ notifyAll();
+ }
+
+ /**
+ * Release one permit.
+ */
+ public synchronized void releaseOnePermit() {
+ releasePermits(1);
+ }
+
+ /**
+ * Release some permits.
+ *
+ * @param permits Number of permits to release
+ */
+ public synchronized void releasePermits(long permits) {
+ waitingOnPermits -= permits;
+ notifyAll();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/40bc599b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
index 001cf59..3c18449 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
@@ -31,9 +31,11 @@ import org.apache.hadoop.util.Progressable;
import org.apache.log4j.Logger;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.Map;
+import java.util.Set;
/**
* Handler for aggregators on worker. Provides the aggregated values and
@@ -129,7 +131,8 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
serviceWorker.getServerData().getAllAggregatorData();
// Wait for my aggregators
Iterable<byte[]> dataToDistribute =
- allAggregatorData.getDataFromMasterWhenReady();
+ allAggregatorData.getDataFromMasterWhenReady(
+ serviceWorker.getMasterInfo());
try {
// Distribute my aggregators
requestProcessor.distributeAggregators(dataToDistribute);
@@ -139,7 +142,7 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
}
// Wait for all other aggregators and store them
allAggregatorData.fillNextSuperstepMapsWhenReady(
- serviceWorker.getWorkerInfoList().size(), previousAggregatedValueMap,
+ getOtherWorkerIdsSet(), previousAggregatedValueMap,
currentAggregatorMap);
allAggregatorData.reset();
if (LOG.isDebugEnabled()) {
@@ -154,8 +157,10 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
*/
public void finishSuperstep(
WorkerAggregatorRequestProcessor requestProcessor) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("finishSuperstep: Start finishing aggregators");
+ if (LOG.isInfoEnabled()) {
+ LOG.info("finishSuperstep: Start gathering aggregators, " +
+ "workers will send their aggregated values " +
+ "once they are done with superstep computation");
}
OwnerAggregatorServerData ownerAggregatorData =
serviceWorker.getServerData().getOwnerAggregatorData();
@@ -189,7 +194,7 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
// Wait to receive partial aggregated values from all other workers
Iterable<Map.Entry<String, Writable>> myAggregators =
ownerAggregatorData.getMyAggregatorValuesWhenReady(
- serviceWorker.getWorkerInfoList().size());
+ getOtherWorkerIdsSet());
// Send final aggregated values to master
AggregatedValueOutputStream aggregatorOutput =
@@ -245,6 +250,22 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
}
/**
+ * Get set of all worker task ids except the current one
+ *
+ * @return Set of all other worker task ids
+ */
+ public Set<Integer> getOtherWorkerIdsSet() {
+ Set<Integer> otherWorkers = Sets.newHashSetWithExpectedSize(
+ serviceWorker.getWorkerInfoList().size());
+ for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
+ if (workerInfo.getTaskId() != serviceWorker.getWorkerInfo().getTaskId()) {
+ otherWorkers.add(workerInfo.getTaskId());
+ }
+ }
+ return otherWorkers;
+ }
+
+ /**
* Not thread-safe implementation of {@link WorkerThreadAggregatorUsage}.
* We can use one instance of this object per thread to prevent
* synchronizing on each aggregate() call. In the end of superstep,