You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ed...@apache.org on 2016/05/20 22:14:29 UTC
[5/5] git commit: updated refs/heads/trunk to 6256a76
Integrating out-of-core mechanism with credit-based flow-control and data generation tethering
Summary: This diff integrates out-of-core infrastructure with credit-based flow control and adds the ability to tether the rate of data generation/processing. Data generation/processing rate is controlled by changing the number of active processing (input/compute) threads. This diff also implements a new (and more performant) adaptive out-of-core policy.
Test Plan:
mvn clean verify
all snapshot tests including ones with large data pass
Running adaptive out-of-core on large graph with very limited memory does not fail.
This diff should enable us to avoid *any* reasonable job to fail!
Reviewers: maja.kabiljo, sergey.edunov, avery.ching, dionysis.logothetis
Reviewed By: dionysis.logothetis
Subscribers: ramesh-muthusamy
Differential Revision: https://reviews.facebook.net/D55479
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/6256a761
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/6256a761
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/6256a761
Branch: refs/heads/trunk
Commit: 6256a761d61a5b27a05878da2449ce8537d60c99
Parents: 4321e44
Author: Sergey Edunov <ed...@fb.com>
Authored: Fri May 20 15:14:08 2016 -0700
Committer: Sergey Edunov <ed...@fb.com>
Committed: Fri May 20 15:14:08 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/giraph/comm/ServerData.java | 14 +-
.../flow_control/CreditBasedFlowControl.java | 400 ++++++-
.../giraph/comm/flow_control/FlowControl.java | 19 +-
.../comm/flow_control/NoOpFlowControl.java | 14 +-
.../comm/flow_control/StaticFlowControl.java | 32 +-
.../apache/giraph/comm/netty/NettyClient.java | 39 +-
.../handler/AddressRequestIdGenerator.java | 53 -
.../netty/handler/RequestServerHandler.java | 6 +-
.../netty/handler/ResponseClientHandler.java | 4 +-
.../netty/handler/TaskRequestIdGenerator.java | 51 +
.../giraph/comm/requests/RequestType.java | 4 +-
.../giraph/comm/requests/SendResumeRequest.java | 80 ++
.../apache/giraph/conf/GiraphConfiguration.java | 4 -
.../org/apache/giraph/conf/GiraphConstants.java | 18 +-
.../org/apache/giraph/counters/GiraphStats.java | 34 +-
.../apache/giraph/edge/AbstractEdgeStore.java | 23 +-
.../apache/giraph/graph/ComputeCallable.java | 62 +-
.../org/apache/giraph/graph/GlobalStats.java | 30 +
.../apache/giraph/graph/GraphTaskManager.java | 66 +-
.../apache/giraph/master/BspServiceMaster.java | 8 +
.../apache/giraph/metrics/AggregatedMetric.java | 46 +-
.../giraph/metrics/AggregatedMetricDouble.java | 50 +
.../giraph/metrics/AggregatedMetricLong.java | 50 +
.../giraph/metrics/AggregatedMetrics.java | 62 +-
.../giraph/metrics/ValueWithHostname.java | 18 +-
.../giraph/metrics/WorkerSuperstepMetrics.java | 62 +
.../apache/giraph/ooc/FixedOutOfCoreEngine.java | 147 ---
.../giraph/ooc/FixedOutOfCoreIOScheduler.java | 211 ----
.../giraph/ooc/FixedPartitionsOracle.java | 139 +++
.../org/apache/giraph/ooc/OutOfCoreEngine.java | 382 +++++-
.../apache/giraph/ooc/OutOfCoreIOCallable.java | 71 +-
.../apache/giraph/ooc/OutOfCoreIOScheduler.java | 182 ++-
.../giraph/ooc/OutOfCoreIOStatistics.java | 360 ++++++
.../org/apache/giraph/ooc/OutOfCoreOracle.java | 131 +++
.../giraph/ooc/SimpleGCMonitoringOracle.java | 355 ++++++
.../apache/giraph/ooc/ThresholdBasedOracle.java | 364 ++++++
.../giraph/ooc/data/DiskBackedEdgeStore.java | 22 +-
.../giraph/ooc/data/DiskBackedMessageStore.java | 43 +-
.../ooc/data/DiskBackedPartitionStore.java | 46 +-
.../giraph/ooc/data/MetaPartitionManager.java | 677 +++++------
.../giraph/ooc/data/OutOfCoreDataManager.java | 42 +-
.../org/apache/giraph/ooc/io/IOCommand.java | 47 +-
.../giraph/ooc/io/LoadPartitionIOCommand.java | 20 +-
.../giraph/ooc/io/StoreDataBufferIOCommand.java | 18 +-
.../ooc/io/StoreIncomingMessageIOCommand.java | 13 +-
.../giraph/ooc/io/StorePartitionIOCommand.java | 21 +-
.../org/apache/giraph/ooc/io/WaitIOCommand.java | 8 +-
.../giraph/utils/AdjustableSemaphore.java | 6 +
.../apache/giraph/worker/BspServiceWorker.java | 5 +
.../giraph/worker/EdgeInputSplitsCallable.java | 10 +
.../giraph/worker/InputSplitsCallable.java | 20 +-
.../worker/VertexInputSplitsCallable.java | 10 +
.../giraph/partition/TestPartitionStores.java | 6 +-
.../java/org/apache/giraph/TestOutOfCore.java | 4 +-
src/site/xdoc/options.xml | 1095 ------------------
55 files changed, 3593 insertions(+), 2111 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
index be34820..69fbfee 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
@@ -44,7 +44,6 @@ import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.ooc.data.DiskBackedEdgeStore;
import org.apache.giraph.ooc.data.DiskBackedMessageStore;
import org.apache.giraph.ooc.data.DiskBackedPartitionStore;
-import org.apache.giraph.ooc.FixedOutOfCoreEngine;
import org.apache.giraph.ooc.OutOfCoreEngine;
import org.apache.giraph.partition.Partition;
import org.apache.giraph.partition.PartitionStore;
@@ -149,16 +148,7 @@ public class ServerData<I extends WritableComparable,
PartitionStore<I, V, E> inMemoryPartitionStore =
new SimplePartitionStore<I, V, E>(conf, context);
if (GiraphConstants.USE_OUT_OF_CORE_GRAPH.get(conf)) {
- int maxPartitionsInMemory =
- GiraphConstants.MAX_PARTITIONS_IN_MEMORY.get(conf);
- if (maxPartitionsInMemory == 0) {
- throw new IllegalStateException("ServerData: Adaptive " +
- "out-of-core engine is not supported yet! Number of partitions in" +
- " memory should be greater than 0.");
- } else {
- oocEngine = new FixedOutOfCoreEngine(conf, service,
- maxPartitionsInMemory);
- }
+ oocEngine = new OutOfCoreEngine(conf, service);
partitionStore =
new DiskBackedPartitionStore<I, V, E>(inMemoryPartitionStore,
conf, context, service, oocEngine);
@@ -312,7 +302,7 @@ public class ServerData<I extends WritableComparable,
currentMessageStore = nextCurrentMessageStore;
incomingMessageStore = nextIncomingMessageStore;
if (oocEngine != null) {
- oocEngine.getMetaPartitionManager().resetMessages();
+ oocEngine.reset();
oocEngine.getSuperstepLock().writeLock().unlock();
}
currentMessageStore.finalizeStore();
http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java
index ff82dd1..36d4f20 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java
@@ -18,18 +18,36 @@
package org.apache.giraph.comm.flow_control;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.giraph.comm.netty.NettyClient;
import org.apache.giraph.comm.netty.handler.AckSignalFlag;
+import org.apache.giraph.comm.requests.SendResumeRequest;
import org.apache.giraph.comm.requests.WritableRequest;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.conf.IntConfOption;
import org.apache.giraph.utils.AdjustableSemaphore;
+import org.apache.giraph.utils.CallableFactory;
+import org.apache.giraph.utils.LogStacktraceCallable;
+import org.apache.giraph.utils.ThreadUtils;
import org.apache.log4j.Logger;
import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -38,9 +56,27 @@ import static com.google.common.base.Preconditions.checkState;
import static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS;
/**
- * Representation of credit-based flow control policy where each worker has a
- * constant user-defined credit. The number of open requests to a particular
- * worker cannot be more than its specified credit.
+ * Representation of credit-based flow control policy. With this policy there
+ * can be limited number of open requests from any worker x to any other worker
+ * y. This number is called 'credit'. Let's denote this number by C{x->y}. This
+ * implementation assumes that for a particular worker W, all values of C{x->W}
+ * are the same. Let's denote this value by CR_W. CR_W may change due to other
+ * reasons (e.g. memory pressure observed in an out-of-core mechanism). However,
+ * CR_W is always in range [0, MAX_CR], where MAX_CR is a user-defined constant.
+ * Note that MAX_CR should be representable by at most 14 bits.
+ *
+ * In this implementation, the value of CR_W is announced to other workers along
+ * with the ACK response envelope for all ACK response envelope going out of W.
+ * Therefore, for non-zero values of CR_W, other workers know the instant value
+ * of CR_W, hence they can control the number of open requests they have to W.
+ * However, it is possible that W announces 0 as CR_W. In this case, other
+ * workers stop opening more requests to W, hence they will not get any new
+ * response envelope from W. This means other workers should be notified with
+ * a dedicated request to resume sending more requests once CR_W becomes
+ * non-zero. In this implementation, once W_CR is announced as 0 to a particular
+ * worker U, we keep U in a set, so later on we can send 'resume signal' to U
+ * once CR_W becomes non-zero. Sending resume signals are done through a
+ * separate thread.
*/
public class CreditBasedFlowControl implements FlowControl {
/**
@@ -69,28 +105,87 @@ public class CreditBasedFlowControl implements FlowControl {
private final int unsentWaitMsecs;
/** Waiting interval for checking outstanding requests msecs */
private final int waitingRequestMsecs;
- /** Maximum number of open requests we can have for each worker */
- private final int maxOpenRequestsPerWorker;
+ /**
+ * Maximum number of open requests each worker can have to this work at each
+ * moment (CR_W -define above- for this worker)
+ */
+ private volatile short maxOpenRequestsPerWorker;
/** Total number of unsent, cached requests */
private final AtomicInteger aggregateUnsentRequests = new AtomicInteger(0);
/**
- * Map of requests permits per worker. Key in the map is the worker id and the
- * value is the semaphore to control the number of open requests for the
- * particular worker. Basically, the number of available permits on this
- * semaphore is the credit available for the worker.
+ * Map of requests permits per worker. Keys in the map are worker ids and
+ * values are pairs (X, Y) where:
+ * X: is the semaphore to control the number of open requests for a
+ * particular worker. Basically, the number of available permits on a
+ * semaphore is the credit available for the worker associated with that
+ * semaphore.
+ * Y: is the timestamp of the latest message (resume signal or ACK response)
+ * that changed the number of permits in the semaphore.
+ * The idea behind keeping a timestamp is to avoid any issue that may happen
+ * due to out-of-order message delivery. For example, consider this scenario:
+ * an ACK response is sent to a worker announcing the credit is 0. Later on,
+ * a resume signal announcing a non-zero credit is sent to the same worker.
+ * Now, if the resume signal receives before the ACK message, the worker
+ * would incorrectly assume credit value of 0, and would avoid sending any
+ * messages, which may lead to a live-lock.
+ *
+ * The timestamp value is simply the request id generated by NettyClient.
+ * These ids are generated in consecutive order, hence simulating the concept
+ * of timestamp. However, the timestamp value should be sent along with
+ * any ACK response envelope. The ACK response envelope is already very small
+ * (maybe 10-20 bytes). So, the timestamp value should not add much overhead
+ * to it. Instead of sending the whole long value request id (8 bytes) as the
+ * timestamp, we can simply send the least significant 2 bytes of it. This is
+ * a valid timestamp, as the credit value can be 0x3FFF (=16383) at most. This
+ * means there will be at most 0x3FFF messages on the fly at each moment,
+ * which means that the timestamp value sent by all messages in fly will fall
+ * into a range of size 0x3FFF. So, it is enough to only consider timestamp
+ * values twice as big as the mentioned range to be able to accurately
+ * determine ordering even when values wrap around. This means we only need to
+ * consider 15 least significant bits of request ids as timestamp values.
+ *
+ * The ACK response value contains following information (from least
+ * significant to most significant):
+ * - 16 bits timestamp
+ * - 14 bits credit value
+ * - 1 bit specifying whether one end of communication is master and hence
+ * credit based flow control should be ignored
+ * - 1 bit response flag
*/
- private final ConcurrentMap<Integer, AdjustableSemaphore>
+ private final ConcurrentMap<Integer, Pair<AdjustableSemaphore, Integer>>
perWorkerOpenRequestMap = Maps.newConcurrentMap();
/** Map of unsent cached requests per worker */
private final ConcurrentMap<Integer, Deque<WritableRequest>>
perWorkerUnsentRequestMap = Maps.newConcurrentMap();
/**
+ * Set of workers that should be notified to resume sending more requests if
+ * the credit becomes non-zero
+ */
+ private final Set<Integer> workersToResume = Sets.newHashSet();
+ /**
+ * Resume signals are not using any credit, so they should be treated
+ * differently than normal requests. Resume signals should be ignored in
+ * accounting for credits in credit-based flow control. The following map
+ * keeps sets of request ids, for resume signals sent to other workers that
+ * are still "open". The set of request ids used for resume signals for a
+ * worker is important so we can determine if a received response is for a
+ * resume signal or not.
+ */
+ private final Map<Integer, Set<Long>> resumeRequestsId =
+ Maps.newConcurrentMap();
+ /**
* Semaphore to control number of cached unsent requests. Maximum number of
* permits of this semaphore should be equal to MAX_NUM_OF_UNSENT_REQUESTS.
*/
private final Semaphore unsentRequestPermit;
/** Netty client used for sending requests */
private final NettyClient nettyClient;
+ /**
+ * Result of execution for the thread responsible for sending resume signals
+ */
+ private final Future<Void> resumeThreadResult;
+ /** Whether we are shutting down the execution */
+ private volatile boolean shouldTerminate;
/**
* Constructor
@@ -100,31 +195,92 @@ public class CreditBasedFlowControl implements FlowControl {
public CreditBasedFlowControl(ImmutableClassesGiraphConfiguration conf,
NettyClient nettyClient) {
this.nettyClient = nettyClient;
- maxOpenRequestsPerWorker = MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER.get(conf);
+ maxOpenRequestsPerWorker =
+ (short) MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER.get(conf);
checkState(maxOpenRequestsPerWorker < 0x4000 &&
maxOpenRequestsPerWorker > 0, "NettyClient: max number of open " +
"requests should be in range (0, " + 0x4FFF + ")");
unsentRequestPermit = new Semaphore(MAX_NUM_OF_UNSENT_REQUESTS.get(conf));
unsentWaitMsecs = UNSENT_CACHE_WAIT_INTERVAL.get(conf);
waitingRequestMsecs = WAITING_REQUEST_MSECS.get(conf);
+ shouldTerminate = false;
+ CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
+ @Override
+ public Callable<Void> newCallable(int callableId) {
+ return new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ while (true) {
+ synchronized (workersToResume) {
+ if (shouldTerminate) {
+ break;
+ }
+ for (Integer workerId : workersToResume) {
+ if (maxOpenRequestsPerWorker != 0) {
+ sendResumeSignal(workerId);
+ } else {
+ break;
+ }
+ }
+ try {
+ workersToResume.wait();
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("call: caught exception " +
+ "while waiting for resume-sender thread to be notified!",
+ e);
+ }
+ }
+ }
+ return null;
+ }
+ };
+ }
+ };
+
+ ExecutorService executor = Executors.newSingleThreadExecutor(
+ ThreadUtils.createThreadFactory("resume-sender"));
+ resumeThreadResult = executor.submit(new LogStacktraceCallable<>(
+ callableFactory.newCallable(0)));
+ executor.shutdown();
+ }
+
+ /**
+ * Send resume signal request to a given worker
+ *
+ * @param workerId id of the worker to send the resume signal to
+ */
+ private void sendResumeSignal(int workerId) {
+ WritableRequest request = new SendResumeRequest(maxOpenRequestsPerWorker);
+ Long resumeId = nettyClient.doSend(workerId, request);
+ checkState(resumeId != null);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("sendResumeSignal: sending signal to worker " + workerId +
+ " with credit=" + maxOpenRequestsPerWorker + ", ID=" +
+ (resumeId & 0xFFFF));
+ }
+ resumeRequestsId.get(workerId).add(resumeId);
}
@Override
public void sendRequest(int destTaskId, WritableRequest request) {
- AdjustableSemaphore openRequestPermit =
+ Pair<AdjustableSemaphore, Integer> pair =
perWorkerOpenRequestMap.get(destTaskId);
// Check if this is the first time sending a request to a worker. If so, we
// should the worker id to necessary bookkeeping data structure.
- if (openRequestPermit == null) {
- openRequestPermit = new AdjustableSemaphore(maxOpenRequestsPerWorker);
- AdjustableSemaphore temp = perWorkerOpenRequestMap.putIfAbsent(destTaskId,
- openRequestPermit);
- perWorkerUnsentRequestMap
- .putIfAbsent(destTaskId, new ArrayDeque<WritableRequest>());
+ if (pair == null) {
+ pair = new MutablePair<>(
+ new AdjustableSemaphore(maxOpenRequestsPerWorker), -1);
+ Pair<AdjustableSemaphore, Integer> temp =
+ perWorkerOpenRequestMap.putIfAbsent(destTaskId, pair);
+ perWorkerUnsentRequestMap.putIfAbsent(
+ destTaskId, new ArrayDeque<WritableRequest>());
+ resumeRequestsId.putIfAbsent(
+ destTaskId, Sets.<Long>newConcurrentHashSet());
if (temp != null) {
- openRequestPermit = temp;
+ pair = temp;
}
}
+ AdjustableSemaphore openRequestPermit = pair.getLeft();
// Try to reserve a spot for the request amongst the open requests of
// the destination worker.
boolean shouldSend = openRequestPermit.tryAcquire();
@@ -198,8 +354,8 @@ public class CreditBasedFlowControl implements FlowControl {
* @param response response received
* @return true iff credit should be ignored, false otherwise
*/
- private boolean shouldIgnoreCredit(short response) {
- return ((short) ((response >> 14) & 1)) == 1;
+ private boolean shouldIgnoreCredit(int response) {
+ return ((short) ((response >> (14 + 16)) & 1)) == 1;
}
/**
@@ -208,8 +364,18 @@ public class CreditBasedFlowControl implements FlowControl {
* @param response response received
* @return credit from the received response
*/
- private short getCredit(short response) {
- return (short) (response & 0x3FFF);
+ private short getCredit(int response) {
+ return (short) ((response >> 16) & 0x3FFF);
+ }
+
+ /**
+ * Get the timestamp from a response
+ *
+ * @param response response received
+ * @return timestamp from the received response
+ */
+ private int getTimestamp(int response) {
+ return response & 0xFFFF;
}
/**
@@ -219,15 +385,73 @@ public class CreditBasedFlowControl implements FlowControl {
* @return AckSignalFlag coming with the response
*/
@Override
- public AckSignalFlag getAckSignalFlag(short response) {
- return AckSignalFlag.values()[(response >> 15) & 1];
+ public AckSignalFlag getAckSignalFlag(int response) {
+ return AckSignalFlag.values()[(response >> (16 + 14 + 1)) & 1];
}
@Override
- public short calculateResponse(AckSignalFlag flag, int taskId) {
+ public int calculateResponse(AckSignalFlag flag, int taskId) {
boolean ignoreCredit = nettyClient.masterInvolved(taskId);
- return (short) ((flag.ordinal() << 15) |
- ((ignoreCredit ? 1 : 0) << 14) | (maxOpenRequestsPerWorker & 0x3FFF));
+ if (!ignoreCredit && maxOpenRequestsPerWorker == 0) {
+ synchronized (workersToResume) {
+ workersToResume.add(taskId);
+ }
+ }
+ int timestamp = (int) (nettyClient.getNextRequestId(taskId) & 0xFFFF);
+ return (flag.ordinal() << (16 + 14 + 1)) |
+ ((ignoreCredit ? 1 : 0) << (16 + 14)) |
+ (maxOpenRequestsPerWorker << 16) |
+ timestamp;
+ }
+
+ @Override
+ public void shutdown() {
+ synchronized (workersToResume) {
+ shouldTerminate = true;
+ workersToResume.notifyAll();
+ }
+ try {
+ resumeThreadResult.get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IllegalStateException("shutdown: caught exception while" +
+ "getting result of resume-sender thread");
+ }
+ }
+
+ @Override
+ public void logInfo() {
+ if (LOG.isInfoEnabled()) {
+ // Count how many unsent requests each task has
+ Map<Integer, Integer> unsentRequestCounts = Maps.newHashMap();
+ for (Map.Entry<Integer, Deque<WritableRequest>> entry :
+ perWorkerUnsentRequestMap.entrySet()) {
+ unsentRequestCounts.put(entry.getKey(), entry.getValue().size());
+ }
+ ArrayList<Map.Entry<Integer, Integer>> sorted =
+ Lists.newArrayList(unsentRequestCounts.entrySet());
+ Collections.sort(sorted, new Comparator<Map.Entry<Integer, Integer>>() {
+ @Override
+ public int compare(Map.Entry<Integer, Integer> entry1,
+ Map.Entry<Integer, Integer> entry2) {
+ int value1 = entry1.getValue();
+ int value2 = entry2.getValue();
+ return (value1 < value2) ? 1 : ((value1 == value2) ? 0 : -1);
+ }
+ });
+ StringBuilder message = new StringBuilder();
+ message.append("logInfo: ").append(aggregateUnsentRequests.get())
+ .append(" unsent requests in total. ");
+ int itemsToPrint = Math.min(10, sorted.size());
+ for (int i = 0; i < itemsToPrint; ++i) {
+ message.append(sorted.get(i).getValue())
+ .append(" unsent requests for taskId=")
+ .append(sorted.get(i).getKey()).append(" (credit=")
+ .append(perWorkerOpenRequestMap.get(sorted.get(i).getKey())
+ .getKey().getMaxPermits())
+ .append("), ");
+ }
+ LOG.info(message);
+ }
}
@Override
@@ -240,7 +464,7 @@ public class CreditBasedFlowControl implements FlowControl {
try {
aggregateUnsentRequests.wait(waitingRequestMsecs);
} catch (InterruptedException e) {
- throw new IllegalStateException("waitSomeRequests: failed while " +
+ throw new IllegalStateException("waitAllRequests: failed while " +
"waiting on open/cached requests");
}
}
@@ -257,21 +481,52 @@ public class CreditBasedFlowControl implements FlowControl {
}
@Override
- public void messageAckReceived(int taskId, short response) {
- boolean shouldIgnoreCredit = shouldIgnoreCredit(response);
+ public void messageAckReceived(int taskId, long requestId, int response) {
+ boolean ignoreCredit = shouldIgnoreCredit(response);
short credit = getCredit(response);
- AdjustableSemaphore openRequestPermit =
- perWorkerOpenRequestMap.get(taskId);
- openRequestPermit.release();
- if (!shouldIgnoreCredit) {
- openRequestPermit.setMaxPermits(credit);
+ int timestamp = getTimestamp(response);
+ MutablePair<AdjustableSemaphore, Integer> pair =
+ (MutablePair<AdjustableSemaphore, Integer>)
+ perWorkerOpenRequestMap.get(taskId);
+ AdjustableSemaphore openRequestPermit = pair.getLeft();
+ // Release a permit on open requests if we received ACK of a request other
+ // than a Resume request (resume requests are always sent regardless of
+ // number of open requests)
+ if (!resumeRequestsId.get(taskId).remove(requestId)) {
+ openRequestPermit.release();
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug("messageAckReceived: ACK of resume received from " + taskId +
+ " timestamp=" + timestamp);
+ }
+ if (!ignoreCredit) {
+ synchronized (pair) {
+ if (compareTimestamps(timestamp, pair.getRight()) > 0) {
+ pair.setRight(timestamp);
+ openRequestPermit.setMaxPermits(credit);
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug("messageAckReceived: received out-of-order messages." +
+ "Received timestamp=" + timestamp + " and current timestamp=" +
+ pair.getRight());
+ }
+ }
}
- Deque<WritableRequest> requestDeque =
- perWorkerUnsentRequestMap.get(taskId);
// Since we received a response and we changed the credit of the sender
// client, we may be able to send some more requests to the sender
// client. So, we try to send as much request as we can to the sender
// client.
+ trySendCachedRequests(taskId);
+ }
+
+ /**
+ * Try to send as much as cached requests to a given worker
+ *
+ * @param taskId id of the worker to send cached requests to
+ */
+ private void trySendCachedRequests(int taskId) {
+ Deque<WritableRequest> requestDeque =
+ perWorkerUnsentRequestMap.get(taskId);
+ AdjustableSemaphore openRequestPermit =
+ perWorkerOpenRequestMap.get(taskId).getLeft();
while (true) {
WritableRequest request;
synchronized (requestDeque) {
@@ -285,6 +540,7 @@ public class CreditBasedFlowControl implements FlowControl {
break;
}
}
+ unsentRequestPermit.release();
// At this point, we have a request, and we reserved a credit for the
// sender client. So, we send the request to the client and update
// the state.
@@ -294,7 +550,73 @@ public class CreditBasedFlowControl implements FlowControl {
aggregateUnsentRequests.notifyAll();
}
}
- unsentRequestPermit.release();
}
}
+
+ /**
+ * Update the max credit that is announced to other workers
+ *
+ * @param newCredit new credit
+ */
+ public void updateCredit(short newCredit) {
+ newCredit = (short) Math.max(0, Math.min(0x3FFF, newCredit));
+ // Check whether we should send resume signals to some workers
+ if (maxOpenRequestsPerWorker == 0 && newCredit != 0) {
+ maxOpenRequestsPerWorker = newCredit;
+ synchronized (workersToResume) {
+ workersToResume.notifyAll();
+ }
+ } else {
+ maxOpenRequestsPerWorker = newCredit;
+ }
+ }
+
+ /**
+ * Compare two timestamps accounting for wrap around. Note that the timestamp
+ * values should be in a range that fits into 14 bits values. This means if
+ * the difference of the two given timestamp is large, we are dealing with one
+ * value being wrapped around.
+ *
+ * @param ts1 first timestamp
+ * @param ts2 second timestamp
+ * @return positive value if first timestamp is later than second timestamp,
+ * negative otherwise
+ */
+ private int compareTimestamps(int ts1, int ts2) {
+ int diff = ts1 - ts2;
+ if (Math.abs(diff) < 0x7FFF) {
+ return diff;
+ } else {
+ return -diff;
+ }
+ }
+
+ /**
+ * Process a resume signal came from a given worker
+ *
+ * @param clientId id of the worker that sent the signal
+ * @param credit the credit value sent along with the resume signal
+ * @param requestId timestamp (request id) of the resume signal
+ */
+ public void processResumeSignal(int clientId, short credit, long requestId) {
+ int timestamp = (int) (requestId & 0xFFFF);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("processResumeSignal: resume signal from " + clientId +
+ " with timestamp=" + timestamp);
+ }
+ MutablePair<AdjustableSemaphore, Integer> pair =
+ (MutablePair<AdjustableSemaphore, Integer>)
+ perWorkerOpenRequestMap.get(clientId);
+ synchronized (pair) {
+ if (compareTimestamps(timestamp, pair.getRight()) > 0) {
+ pair.setRight(timestamp);
+ pair.getLeft().setMaxPermits(credit);
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug("processResumeSignal: received out-of-order messages. " +
+ "Received timestamp=" + timestamp + " and current timestamp=" +
+ pair.getRight());
+ }
+ }
+ trySendCachedRequests(clientId);
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/FlowControl.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/FlowControl.java b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/FlowControl.java
index 4eda193..4072af7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/FlowControl.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/FlowControl.java
@@ -45,9 +45,10 @@ public interface FlowControl {
* Notify the flow control policy that an open request is completed.
*
* @param taskId id of the task to which the open request is completed
- * @param response the response heard from the task
+ * @param requestId id of the open request which is completed
+ * @param response the response heard from the client
*/
- void messageAckReceived(int taskId, short response);
+ void messageAckReceived(int taskId, long requestId, int response);
/**
* Decode the acknowledgement signal from the response after an open request
@@ -56,7 +57,7 @@ public interface FlowControl {
* @param response the response heard after completion of a request
* @return the Acknowledgement signal decoded from the response
*/
- AckSignalFlag getAckSignalFlag(short response);
+ AckSignalFlag getAckSignalFlag(int response);
/**
* There may be requests in possession of the flow control mechanism, as the
@@ -79,5 +80,15 @@ public interface FlowControl {
* @param taskId id of the task the acknowledgement is for
* @return the response to piggyback along with the acknowledgement message
*/
- short calculateResponse(AckSignalFlag flag, int taskId);
+ int calculateResponse(AckSignalFlag flag, int taskId);
+
+ /**
+ * Shutdown the flow control policy
+ */
+ void shutdown();
+
+ /**
+ * Log the status of the flow control
+ */
+ void logInfo();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/NoOpFlowControl.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/NoOpFlowControl.java b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/NoOpFlowControl.java
index d50fe92..c97c967 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/NoOpFlowControl.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/NoOpFlowControl.java
@@ -45,10 +45,10 @@ public class NoOpFlowControl implements FlowControl {
}
@Override
- public void messageAckReceived(int taskId, short response) { }
+ public void messageAckReceived(int taskId, long requestId, int response) { }
@Override
- public AckSignalFlag getAckSignalFlag(short response) {
+ public AckSignalFlag getAckSignalFlag(int response) {
return AckSignalFlag.values()[response];
}
@@ -61,7 +61,13 @@ public class NoOpFlowControl implements FlowControl {
}
@Override
- public short calculateResponse(AckSignalFlag alreadyDone, int taskId) {
- return (short) alreadyDone.ordinal();
+ public int calculateResponse(AckSignalFlag alreadyDone, int taskId) {
+ return alreadyDone.ordinal();
}
+
+ @Override
+ public void shutdown() { }
+
+ @Override
+ public void logInfo() { }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/StaticFlowControl.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/StaticFlowControl.java b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/StaticFlowControl.java
index 1fc43a7..6d67afd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/StaticFlowControl.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/StaticFlowControl.java
@@ -31,6 +31,8 @@ import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
import org.apache.giraph.metrics.SuperstepMetricsRegistry;
import org.apache.log4j.Logger;
+import java.util.concurrent.atomic.AtomicInteger;
+
import static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS;
/**
@@ -69,6 +71,8 @@ public class StaticFlowControl implements
private final Object requestSpotAvailable = new Object();
/** Counter for time spent waiting on too many open requests */
private Counter timeWaitingOnOpenRequests;
+ /** Number of threads waiting on too many open requests */
+ private final AtomicInteger numWaitingThreads = new AtomicInteger(0);
/**
* Constructor
@@ -108,11 +112,12 @@ public class StaticFlowControl implements
}
/**
- * Ensure that at most maxOpenRequests are not complete. Periodically,
- * check the state of every request. If we find the connection failed,
- * re-establish it and re-send the request.
+ * Ensure that at most numberOfRequestsToProceed are not complete.
+ * Periodically, check the state of every request. If we find the connection
+ * failed, re-establish it and re-send the request.
*/
private void waitSomeRequests() {
+ numWaitingThreads.getAndIncrement();
while (nettyClient.getNumberOfOpenRequests() > numberOfRequestsToProceed) {
// Wait for requests to complete for some time
synchronized (requestSpotAvailable) {
@@ -129,23 +134,36 @@ public class StaticFlowControl implements
}
nettyClient.logAndSanityCheck();
}
+ numWaitingThreads.getAndDecrement();
}
@Override
- public void messageAckReceived(int taskId, short response) {
+ public void messageAckReceived(int taskId, long requestId, int response) {
synchronized (requestSpotAvailable) {
requestSpotAvailable.notifyAll();
}
}
@Override
- public AckSignalFlag getAckSignalFlag(short response) {
+ public AckSignalFlag getAckSignalFlag(int response) {
return AckSignalFlag.values()[response];
}
@Override
- public short calculateResponse(AckSignalFlag alreadyDone, int taskId) {
- return (short) alreadyDone.ordinal();
+ public int calculateResponse(AckSignalFlag alreadyDone, int clientId) {
+ return alreadyDone.ordinal();
+ }
+
+ @Override
+ public void shutdown() { }
+
+ @Override
+ public void logInfo() {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("logInfo: " + numWaitingThreads.get() + " threads waiting " +
+ "until number of open requests falls below " +
+ numberOfRequestsToProceed);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
index c185fdc..217dba6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -23,7 +23,7 @@ import org.apache.giraph.comm.flow_control.FlowControl;
import org.apache.giraph.comm.flow_control.NoOpFlowControl;
import org.apache.giraph.comm.flow_control.StaticFlowControl;
import org.apache.giraph.comm.netty.handler.AckSignalFlag;
-import org.apache.giraph.comm.netty.handler.AddressRequestIdGenerator;
+import org.apache.giraph.comm.netty.handler.TaskRequestIdGenerator;
import org.apache.giraph.comm.netty.handler.ClientRequestId;
import org.apache.giraph.comm.netty.handler.RequestEncoder;
import org.apache.giraph.comm.netty.handler.RequestInfo;
@@ -174,12 +174,12 @@ public class NettyClient {
/** Waiting interval for checking outstanding requests msecs */
private final int waitingRequestMsecs;
/** Timed logger for printing request debugging */
- private final TimedLogger requestLogger = new TimedLogger(15 * 1000, LOG);
+ private final TimedLogger requestLogger;
/** Worker executor group */
private final EventLoopGroup workerGroup;
- /** Address request id generator */
- private final AddressRequestIdGenerator addressRequestIdGenerator =
- new AddressRequestIdGenerator();
+ /** Task request id generator */
+ private final TaskRequestIdGenerator taskRequestIdGenerator =
+ new TaskRequestIdGenerator();
/** Task info */
private final TaskInfo myTaskInfo;
/** Maximum thread pool size */
@@ -245,6 +245,7 @@ public class NettyClient {
waitTimeBetweenConnectionRetriesMs =
WAIT_TIME_BETWEEN_CONNECTION_RETRIES_MS.get(conf);
waitingRequestMsecs = WAITING_REQUEST_MSECS.get(conf);
+ requestLogger = new TimedLogger(waitingRequestMsecs, LOG);
maxPoolSize = GiraphConstants.NETTY_CLIENT_THREADS.get(conf);
maxResolveAddressAttempts = MAX_RESOLVE_ADDRESS_ATTEMPTS.get(conf);
@@ -621,6 +622,7 @@ public class NettyClient {
if (LOG.isInfoEnabled()) {
LOG.info("stop: Halting netty client");
}
+ flowControl.shutdown();
// Close connections asynchronously, in a Netty-approved
// way, without cleaning up thread pools until all channels
// in addressChannelMap are closed (success or failure)
@@ -730,14 +732,16 @@ public class NettyClient {
*
* @param destTaskId destination to send the request to
* @param request request itself
+ * @return request id generated for sending the request
*/
- public void doSend(int destTaskId, WritableRequest request) {
+ public Long doSend(int destTaskId, WritableRequest request) {
InetSocketAddress remoteServer = taskIdAddressMap.get(destTaskId);
if (clientRequestIdRequestInfoMap.isEmpty()) {
inboundByteCounter.resetAll();
outboundByteCounter.resetAll();
}
boolean registerRequest = true;
+ Long requestId = null;
/*if_not[HADOOP_NON_SECURE]*/
if (request.getType() == RequestType.SASL_TOKEN_MESSAGE_REQUEST) {
registerRequest = false;
@@ -748,8 +752,8 @@ public class NettyClient {
RequestInfo newRequestInfo = new RequestInfo(remoteServer, request);
if (registerRequest) {
request.setClientId(myTaskInfo.getTaskId());
- request.setRequestId(
- addressRequestIdGenerator.getNextRequestId(remoteServer));
+ requestId = taskRequestIdGenerator.getNextRequestId(destTaskId);
+ request.setRequestId(requestId);
ClientRequestId clientRequestId =
new ClientRequestId(destTaskId, request.getRequestId());
RequestInfo oldRequestInfo = clientRequestIdRequestInfoMap.putIfAbsent(
@@ -769,6 +773,7 @@ public class NettyClient {
ChannelFuture writeFuture = channel.write(request);
newRequestInfo.setWriteFuture(writeFuture);
writeFuture.addListener(logErrorListener);
+ return requestId;
}
/**
@@ -779,7 +784,7 @@ public class NettyClient {
* @param response Actual response
* @param shouldDrop Drop the message?
*/
- public void messageReceived(int senderId, long requestId, short response,
+ public void messageReceived(int senderId, long requestId, int response,
boolean shouldDrop) {
if (shouldDrop) {
synchronized (clientRequestIdRequestInfoMap) {
@@ -806,7 +811,7 @@ public class NettyClient {
requestInfo + ". Waiting on " +
clientRequestIdRequestInfoMap.size() + " requests");
}
- flowControl.messageAckReceived(senderId, response);
+ flowControl.messageAckReceived(senderId, requestId, response);
// Help #waitAllRequests() to finish faster
synchronized (clientRequestIdRequestInfoMap) {
clientRequestIdRequestInfoMap.notifyAll();
@@ -862,8 +867,7 @@ public class NettyClient {
LOG.info("logInfoAboutOpenRequests: Waiting interval of " +
waitingRequestMsecs + " msecs, " +
clientRequestIdRequestInfoMap.size() +
- " open requests, " + flowControl.getNumberOfUnsentRequests() +
- " cached unsent requests, " + inboundByteCounter.getMetrics() + "\n" +
+ " open requests, " + inboundByteCounter.getMetrics() + "\n" +
outboundByteCounter.getMetrics());
if (clientRequestIdRequestInfoMap.size() < MAX_REQUESTS_TO_LIST) {
@@ -907,6 +911,7 @@ public class NettyClient {
.append(", ");
}
LOG.info(message);
+ flowControl.logInfo();
}
}
@@ -1022,6 +1027,16 @@ public class NettyClient {
}
/**
+ * Generate and get the next request id to be used for a given worker
+ *
+ * @param taskId id of the worker to generate the next request id
+ * @return request id
+ */
+ public Long getNextRequestId(int taskId) {
+ return taskRequestIdGenerator.getNextRequestId(taskId);
+ }
+
+ /**
* @return number of open requests
*/
public int getNumberOfOpenRequests() {
http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/AddressRequestIdGenerator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/AddressRequestIdGenerator.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/AddressRequestIdGenerator.java
deleted file mode 100644
index 8ba5b96..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/AddressRequestIdGenerator.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.netty.handler;
-
-import com.google.common.collect.Maps;
-import java.net.InetSocketAddress;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * Generate different request ids based on the address of the well known
- * port on the workers. Thread-safe.
- */
-public class AddressRequestIdGenerator {
- /** Address request generator map */
- private final ConcurrentMap<InetSocketAddress, AtomicLong>
- addressRequestGeneratorMap = Maps.newConcurrentMap();
-
- /**
- * Get the next request id for a given destination. Thread-safe.
- *
- * @param address Address of the worker (consistent during a superstep)
- * @return Valid request id
- */
- public Long getNextRequestId(InetSocketAddress address) {
- AtomicLong requestGenerator = addressRequestGeneratorMap.get(address);
- if (requestGenerator == null) {
- requestGenerator = new AtomicLong(0);
- AtomicLong oldRequestGenerator =
- addressRequestGeneratorMap.putIfAbsent(address, requestGenerator);
- if (oldRequestGenerator != null) {
- requestGenerator = oldRequestGenerator;
- }
- }
- return requestGenerator.getAndIncrement();
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
index df50e2a..7bb4464 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
@@ -44,7 +44,7 @@ import static org.apache.giraph.conf.GiraphConstants.NETTY_SIMULATE_FIRST_REQUES
public abstract class RequestServerHandler<R> extends
ChannelInboundHandlerAdapter {
/** Number of bytes in the encoded response */
- public static final int RESPONSE_BYTES = 14;
+ public static final int RESPONSE_BYTES = 16;
/** Time class to use */
private static Time TIME = SystemTime.get();
/** Class logger */
@@ -137,9 +137,9 @@ public abstract class RequestServerHandler<R> extends
ByteBuf buffer = ctx.alloc().buffer(RESPONSE_BYTES);
buffer.writeInt(myTaskInfo.getTaskId());
buffer.writeLong(request.getRequestId());
- short signal =
+ int signal =
flowControl.calculateResponse(alreadyDone, request.getClientId());
- buffer.writeShort(signal);
+ buffer.writeInt(signal);
ctx.write(buffer);
// NettyServer is bootstrapped with auto-read set to true by default. After
// the first request is processed, we set auto-read to false. This prevents
http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
index 54cb201..12dde3b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
@@ -64,11 +64,11 @@ public class ResponseClientHandler extends ChannelInboundHandlerAdapter {
ByteBuf buf = (ByteBuf) msg;
int senderId = -1;
long requestId = -1;
- short response = -1;
+ int response = -1;
try {
senderId = buf.readInt();
requestId = buf.readLong();
- response = buf.readShort();
+ response = buf.readInt();
} catch (IndexOutOfBoundsException e) {
throw new IllegalStateException(
"channelRead: Got IndexOutOfBoundsException ", e);
http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/TaskRequestIdGenerator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/TaskRequestIdGenerator.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/TaskRequestIdGenerator.java
new file mode 100644
index 0000000..b172e9a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/TaskRequestIdGenerator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.netty.handler;
+
+import com.google.common.collect.Maps;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Generate different request ids based on the task id. Thread-safe.
+ */
+public class TaskRequestIdGenerator {
+ /** Task request generator map */
+ private final ConcurrentMap<Integer, AtomicLong>
+ taskRequestGeneratorMap = Maps.newConcurrentMap();
+
+ /**
+ * Get the next request id for a given destination. Thread-safe.
+ *
+ * @param taskId id of the task(consistent during a superstep)
+ * @return Valid request id
+ */
+ public Long getNextRequestId(Integer taskId) {
+ AtomicLong requestGenerator = taskRequestGeneratorMap.get(taskId);
+ if (requestGenerator == null) {
+ requestGenerator = new AtomicLong(0);
+ AtomicLong oldRequestGenerator =
+ taskRequestGeneratorMap.putIfAbsent(taskId, requestGenerator);
+ if (oldRequestGenerator != null) {
+ requestGenerator = oldRequestGenerator;
+ }
+ }
+ return requestGenerator.getAndIncrement();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
index bebac28..627c2af 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
@@ -64,7 +64,9 @@ else[HADOOP_NON_SECURE]*/
/** Send request for input split from worker to master */
ASK_FOR_INPUT_SPLIT_REQUEST(AskForInputSplitRequest.class),
/** Send request with granted input split from master to workers */
- REPLY_WITH_INPUT_SPLIT_REQUEST(ReplyWithInputSplitRequest.class);
+ REPLY_WITH_INPUT_SPLIT_REQUEST(ReplyWithInputSplitRequest.class),
+ /** Send request to resume sending messages (used in flow-control) */
+ SEND_RESUME_REQUEST(SendResumeRequest.class);
/** Class of request which this type corresponds to */
private final Class<? extends WritableRequest> requestClass;
http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendResumeRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendResumeRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendResumeRequest.java
new file mode 100644
index 0000000..0e5e8bb
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendResumeRequest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.requests;
+
+import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.comm.flow_control.CreditBasedFlowControl;
+import org.apache.giraph.comm.flow_control.FlowControl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Send to a worker a signal to resume sending messages to sender worker. This
+ * type of request is used in adaptive credit-based flow control, where a
+ * worker (W) may assign credit value of 0 to some worker (U), so that U would
+ * stop sending messages to W. Later on, W may want to notify U to continue
+ * sending messages to W. Along with the resume signal, W also announces a new
+ * credit value to U.
+ */
+public class SendResumeRequest extends WritableRequest
+ implements WorkerRequest {
+ /** credit value */
+ private short credit;
+
+ /** Constructor used for reflection only */
+ public SendResumeRequest() { }
+
+ /**
+ * Constructor
+ *
+ * @param credit credit value
+ */
+ public SendResumeRequest(short credit) {
+ checkState(credit > 0);
+ this.credit = credit;
+ }
+
+ @Override
+ public void doRequest(ServerData serverData) {
+ FlowControl flowControl =
+ serverData.getServiceWorker().getWorkerClient().getFlowControl();
+ checkState(flowControl != null);
+ ((CreditBasedFlowControl) flowControl).processResumeSignal(getClientId(),
+ credit, getRequestId());
+ }
+
+ @Override
+ public RequestType getType() {
+ return RequestType.SEND_RESUME_REQUEST;
+ }
+
+ @Override
+ void readFieldsRequest(DataInput input) throws IOException {
+ credit = input.readShort();
+ }
+
+ @Override
+ void writeRequest(DataOutput output) throws IOException {
+ output.writeShort(credit);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 78bd5ef..df79b7f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -957,10 +957,6 @@ public class GiraphConfiguration extends Configuration
return NUM_COMPUTE_THREADS.get(this);
}
- public int getNumOocThreads() {
- return NUM_OOC_THREADS.get(this);
- }
-
/**
* Set the number of input split threads
*
http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index b5bb9ed..17e030f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -71,6 +71,8 @@ import org.apache.giraph.mapping.translate.TranslateEdge;
import org.apache.giraph.master.DefaultMasterCompute;
import org.apache.giraph.master.MasterCompute;
import org.apache.giraph.master.MasterObserver;
+import org.apache.giraph.ooc.OutOfCoreOracle;
+import org.apache.giraph.ooc.ThresholdBasedOracle;
import org.apache.giraph.partition.GraphPartitionerFactory;
import org.apache.giraph.partition.HashPartitionerFactory;
import org.apache.giraph.partition.Partition;
@@ -995,10 +997,16 @@ public interface GiraphConstants {
new BooleanConfOption("giraph.useOutOfCoreGraph", false,
"Enable out-of-core graph.");
- /** Number of threads participating in swapping graph/messages to disk. */
- IntConfOption NUM_OOC_THREADS =
- new IntConfOption("giraph.numOutOfCoreThreads", 1,
- "Number of threads participating in swapping data to disk.");
+ /**
+ * Out-of-core oracle that is to be used for adaptive out-of-core engine. If
+ * the `MAX_PARTITIONS_IN_MEMORY` is already set, this will be over-written
+ * to be `FixedPartitionsOracle`.
+ */
+ ClassConfOption<OutOfCoreOracle> OUT_OF_CORE_ORACLE =
+ ClassConfOption.create("giraph.outOfCoreOracle",
+ ThresholdBasedOracle.class, OutOfCoreOracle.class,
+ "Out-of-core oracle that is to be used for adaptive out-of-core " +
+ "engine");
/** Maximum number of partitions to hold in memory for each worker. */
IntConfOption MAX_PARTITIONS_IN_MEMORY =
@@ -1006,8 +1014,6 @@ public interface GiraphConstants {
"Maximum number of partitions to hold in memory for each worker. By" +
" default it is set to 0 (for adaptive out-of-core mechanism");
-
-
/** Directory to write YourKit snapshots to */
String YOURKIT_OUTPUT_DIR = "giraph.yourkit.outputDir";
/** Default directory to write YourKit snapshots to */
http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/counters/GiraphStats.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/counters/GiraphStats.java b/giraph-core/src/main/java/org/apache/giraph/counters/GiraphStats.java
index d96b474..0cb8486 100644
--- a/giraph-core/src/main/java/org/apache/giraph/counters/GiraphStats.java
+++ b/giraph-core/src/main/java/org/apache/giraph/counters/GiraphStats.java
@@ -47,7 +47,7 @@ public class GiraphStats extends HadoopCountersBase {
= "Aggregate sent messages";
/** aggregate sent messages bytes counter name */
public static final String AGGREGATE_SENT_MESSAGE_BYTES_NAME
- = "Aggregate sent message message bytes";
+ = "Aggregate sent message bytes";
/** workers counter name */
public static final String CURRENT_WORKERS_NAME = "Current workers";
/** current master partition task counter name */
@@ -56,6 +56,12 @@ public class GiraphStats extends HadoopCountersBase {
/** last checkpointed superstep counter name */
public static final String LAST_CHECKPOINTED_SUPERSTEP_NAME =
"Last checkpointed superstep";
+ /** aggregate bytes loaded from local disks in out-of-core */
+ public static final String OOC_BYTES_LOADED_NAME =
+ "Aggregate bytes loaded from local disks (out-of-core)";
+ /** aggregate bytes stored to local disks in out-of-core */
+ public static final String OOC_BYTES_STORED_NAME =
+ "Aggregate bytes stored to local disks (out-of-core)";
/** Singleton instance for everyone to use */
private static GiraphStats INSTANCE;
@@ -82,8 +88,12 @@ public class GiraphStats extends HadoopCountersBase {
private static final int AGG_SENT_MESSAGES = 9;
/** Aggregate sent message bytes counter */
private static final int AGG_SENT_MESSAGE_BYTES = 10;
+ /** Aggregate OOC loaded bytes counter */
+ private static final int OOC_BYTES_LOADED = 11;
+ /** Aggregate OOC stored bytes counter */
+ private static final int OOC_BYTES_STORED = 12;
/** Number of counters in this class */
- private static final int NUM_COUNTERS = 11;
+ private static final int NUM_COUNTERS = 13;
/** All the counters stored */
private final GiraphHadoopCounter[] counters;
@@ -111,6 +121,8 @@ public class GiraphStats extends HadoopCountersBase {
getCounter(AGGREGATE_SENT_MESSAGES_NAME);
counters[AGG_SENT_MESSAGE_BYTES] =
getCounter(AGGREGATE_SENT_MESSAGE_BYTES_NAME);
+ counters[OOC_BYTES_LOADED] = getCounter(OOC_BYTES_LOADED_NAME);
+ counters[OOC_BYTES_STORED] = getCounter(OOC_BYTES_STORED_NAME);
}
/**
@@ -230,6 +242,24 @@ public class GiraphStats extends HadoopCountersBase {
return counters[LAST_CHECKPOINTED_SUPERSTEP];
}
+ /**
+ * Get OOCBytesLoaded counter
+ *
+ * @return OOCBytesLoaded counter
+ */
+ public GiraphHadoopCounter getAggregateOOCBytesLoaded() {
+ return counters[OOC_BYTES_LOADED];
+ }
+
+ /**
+ * Get OOCBytesStored counter
+ *
+ * @return OOCBytesStored counter
+ */
+ public GiraphHadoopCounter getAggregateOOCBytesStored() {
+ return counters[OOC_BYTES_STORED];
+ }
+
@Override
public Iterator<GiraphHadoopCounter> iterator() {
return Arrays.asList(counters).iterator();
http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
index 0f3d668..104cae2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
@@ -23,6 +23,7 @@ import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.ooc.OutOfCoreEngine;
import org.apache.giraph.partition.Partition;
import org.apache.giraph.utils.CallableFactory;
import org.apache.giraph.utils.ProgressableUtils;
@@ -78,6 +79,8 @@ public abstract class AbstractEdgeStore<I extends WritableComparable,
* from the one used during computation.
*/
protected boolean useInputOutEdges;
+ /** Whether we spilled edges on disk */
+ private boolean hasEdgesOnDisk = false;
/**
* Constructor.
@@ -169,6 +172,9 @@ public abstract class AbstractEdgeStore<I extends WritableComparable,
Map<K, OutEdges<I, E>> edges = transientEdges.remove(partitionId);
if (edges != null) {
output.writeInt(edges.size());
+ if (edges.size() > 0) {
+ hasEdgesOnDisk = true;
+ }
for (Map.Entry<K, OutEdges<I, E>> edge : edges.entrySet()) {
writeVertexKey(edge.getKey(), output);
edge.getValue().write(output);
@@ -242,7 +248,7 @@ public abstract class AbstractEdgeStore<I extends WritableComparable,
@Override
public void moveEdgesToVertices() {
final boolean createSourceVertex = configuration.getCreateSourceVertex();
- if (transientEdges.isEmpty()) {
+ if (transientEdges.isEmpty() && !hasEdgesOnDisk) {
if (LOG.isInfoEnabled()) {
LOG.info("moveEdgesToVertices: No edges to move");
}
@@ -264,6 +270,10 @@ public abstract class AbstractEdgeStore<I extends WritableComparable,
public Void call() throws Exception {
Integer partitionId;
I representativeVertexId = configuration.createVertexId();
+ OutOfCoreEngine oocEngine = service.getServerData().getOocEngine();
+ if (oocEngine != null) {
+ oocEngine.processingThreadStart();
+ }
while (true) {
Partition<I, V, E> partition =
service.getPartitionStore().getNextPartition();
@@ -280,7 +290,15 @@ public abstract class AbstractEdgeStore<I extends WritableComparable,
Iterator<Et> iterator =
getPartitionEdgesIterator(partitionEdges);
// process all vertices in given partition
+ int count = 0;
while (iterator.hasNext()) {
+ // If out-of-core mechanism is used, check whether this thread
+ // can stay active or it should temporarily suspend and stop
+ // processing and generating more data for the moment.
+ if (oocEngine != null &&
+ (++count & OutOfCoreEngine.CHECK_IN_INTERVAL) == 0) {
+ oocEngine.activeThreadCheckIn();
+ }
Et entry = iterator.next();
I vertexId = getVertexId(entry, representativeVertexId);
OutEdges<I, E> outEdges = convertInputToComputeEdges(
@@ -320,6 +338,9 @@ public abstract class AbstractEdgeStore<I extends WritableComparable,
// partition after modifying it.
service.getPartitionStore().putPartition(partition);
}
+ if (oocEngine != null) {
+ oocEngine.processingThreadFinish();
+ }
return null;
}
};
http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
index d59d044..78c1ec3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
@@ -31,6 +31,7 @@ import org.apache.giraph.io.SimpleVertexWriter;
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.metrics.MetricNames;
import org.apache.giraph.metrics.SuperstepMetricsRegistry;
+import org.apache.giraph.ooc.OutOfCoreEngine;
import org.apache.giraph.partition.Partition;
import org.apache.giraph.partition.PartitionStats;
import org.apache.giraph.partition.PartitionStore;
@@ -99,6 +100,12 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
private final Counter messageBytesSentCounter;
/** Compute time per partition */
private final Histogram histogramComputePerPartition;
+ /** GC time per compute thread */
+ private final Histogram histogramGCTimePerThread;
+ /** Wait time per compute thread */
+ private final Histogram histogramWaitTimePerThread;
+ /** Processing time per compute thread */
+ private final Histogram histogramProcessingTimePerThread;
/**
* Constructor
@@ -125,6 +132,11 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
metrics.getCounter(MetricNames.MESSAGE_BYTES_SENT);
histogramComputePerPartition = metrics.getUniformHistogram(
MetricNames.HISTOGRAM_COMPUTE_PER_PARTITION);
+ histogramGCTimePerThread = metrics.getUniformHistogram("gc-per-thread-ms");
+ histogramWaitTimePerThread =
+ metrics.getUniformHistogram("wait-per-thread-ms");
+ histogramProcessingTimePerThread =
+ metrics.getUniformHistogram("processing-per-thread-ms");
}
@Override
@@ -148,17 +160,32 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
List<PartitionStats> partitionStatsList = Lists.newArrayList();
PartitionStore<I, V, E> partitionStore = serviceWorker.getPartitionStore();
+ OutOfCoreEngine oocEngine = serviceWorker.getServerData().getOocEngine();
+ GraphTaskManager<I, V, E> taskManager = serviceWorker.getGraphTaskManager();
+ if (oocEngine != null) {
+ oocEngine.processingThreadStart();
+ }
+ long timeWaiting = 0;
+ long timeProcessing = 0;
+ long timeDoingGC = 0;
while (true) {
long startTime = System.currentTimeMillis();
+ long startGCTime = taskManager.getSuperstepGCTime();
Partition<I, V, E> partition = partitionStore.getNextPartition();
+ long timeDoingGCWhileWaiting =
+ taskManager.getSuperstepGCTime() - startGCTime;
+ timeDoingGC += timeDoingGCWhileWaiting;
+ timeWaiting += System.currentTimeMillis() - startTime -
+ timeDoingGCWhileWaiting;
if (partition == null) {
break;
}
-
+ long startProcessingTime = System.currentTimeMillis();
+ startGCTime = taskManager.getSuperstepGCTime();
try {
serviceWorker.getServerData().resolvePartitionMutation(partition);
PartitionStats partitionStats =
- computePartition(computation, partition);
+ computePartition(computation, partition, oocEngine);
partitionStatsList.add(partitionStats);
long partitionMsgs = workerClientRequestProcessor.resetMessageCount();
partitionStats.addMessagesSentCount(partitionMsgs);
@@ -180,10 +207,17 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
} finally {
partitionStore.putPartition(partition);
}
+ long timeDoingGCWhileProcessing =
+ taskManager.getSuperstepGCTime() - startGCTime;
+ timeDoingGC += timeDoingGCWhileProcessing;
+ timeProcessing += System.currentTimeMillis() - startProcessingTime -
+ timeDoingGCWhileProcessing;
histogramComputePerPartition.update(
System.currentTimeMillis() - startTime);
}
-
+ histogramGCTimePerThread.update(timeDoingGC);
+ histogramWaitTimePerThread.update(timeWaiting);
+ histogramProcessingTimePerThread.update(timeProcessing);
computation.postSuperstep();
// Return VertexWriter after the usage
@@ -194,7 +228,12 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
Time.NS_PER_SECOND_AS_FLOAT;
LOG.info("call: Computation took " + seconds + " secs for " +
partitionStatsList.size() + " partitions on superstep " +
- graphState.getSuperstep() + ". Flushing started");
+ graphState.getSuperstep() + ". Flushing started (time waiting on " +
+ "partitions was " +
+ String.format("%.2f s", timeWaiting / 1000.0) + ", time processing " +
+ "partitions was " + String.format("%.2f s", timeProcessing / 1000.0) +
+ ", time spent on gc was " +
+ String.format("%.2f s", timeDoingGC / 1000.0) + ")");
}
try {
workerClientRequestProcessor.flush();
@@ -211,6 +250,9 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
} catch (IOException e) {
throw new IllegalStateException("call: Flushing failed.", e);
}
+ if (oocEngine != null) {
+ oocEngine.processingThreadFinish();
+ }
return partitionStatsList;
}
@@ -219,17 +261,27 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
*
* @param computation Computation to use
* @param partition Partition to compute
+ * @param oocEngine out-of-core engine
* @return Partition stats for this computed partition
*/
private PartitionStats computePartition(
Computation<I, V, E, M1, M2> computation,
- Partition<I, V, E> partition) throws IOException, InterruptedException {
+ Partition<I, V, E> partition, OutOfCoreEngine oocEngine)
+ throws IOException, InterruptedException {
PartitionStats partitionStats =
new PartitionStats(partition.getId(), 0, 0, 0, 0, 0);
long verticesComputedProgress = 0;
// Make sure this is thread-safe across runs
synchronized (partition) {
+ int count = 0;
for (Vertex<I, V, E> vertex : partition) {
+ // If out-of-core mechanism is used, check whether this thread
+ // can stay active or it should temporarily suspend and stop
+ // processing and generating more data for the moment.
+ if (oocEngine != null &&
+ (++count & OutOfCoreEngine.CHECK_IN_INTERVAL) == 0) {
+ oocEngine.activeThreadCheckIn();
+ }
Iterable<M1> messages = messageStore.getVertexMessages(vertex.getId());
if (vertex.isHalted() && !Iterables.isEmpty(messages)) {
vertex.wakeUp();
http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java b/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java
index 499c862..dab3c2f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java
@@ -42,6 +42,10 @@ public class GlobalStats implements Writable {
private long messageBytesCount = 0;
/** Whether the computation should be halted */
private boolean haltComputation = false;
+ /** Bytes of data stored to disk in the last superstep */
+ private long oocStoreBytesCount = 0;
+ /** Bytes of data loaded to disk in the last superstep */
+ private long oocLoadBytesCount = 0;
/**
* Master's decision on whether we should checkpoint and
* what to do next.
@@ -88,6 +92,14 @@ public class GlobalStats implements Writable {
haltComputation = value;
}
+ public long getOocStoreBytesCount() {
+ return oocStoreBytesCount;
+ }
+
+ public long getOocLoadBytesCount() {
+ return oocLoadBytesCount;
+ }
+
public CheckpointStatus getCheckpointStatus() {
return checkpointStatus;
}
@@ -97,6 +109,24 @@ public class GlobalStats implements Writable {
}
/**
+ * Add bytes loaded to the global stats.
+ *
+ * @param oocLoadBytesCount number of bytes to be added
+ */
+ public void addOocLoadBytesCount(long oocLoadBytesCount) {
+ this.oocLoadBytesCount += oocLoadBytesCount;
+ }
+
+ /**
+ * Add bytes stored to the global stats.
+ *
+ * @param oocStoreBytesCount number of bytes to be added
+ */
+ public void addOocStoreBytesCount(long oocStoreBytesCount) {
+ this.oocStoreBytesCount += oocStoreBytesCount;
+ }
+
+ /**
* Add messages to the global stats.
*
* @param messageCount Number of messages to be added.
http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 19ac615..87d5248 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -19,6 +19,8 @@
package org.apache.giraph.graph;
import java.io.IOException;
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
import java.net.URL;
import java.net.URLDecoder;
import java.util.ArrayList;
@@ -29,6 +31,8 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import com.sun.management.GarbageCollectionNotificationInfo;
+import com.yammer.metrics.core.Counter;
import org.apache.giraph.bsp.BspService;
import org.apache.giraph.bsp.CentralizedServiceMaster;
import org.apache.giraph.bsp.CentralizedServiceWorker;
@@ -46,6 +50,7 @@ import org.apache.giraph.metrics.GiraphTimer;
import org.apache.giraph.metrics.GiraphTimerContext;
import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
import org.apache.giraph.metrics.SuperstepMetricsRegistry;
+import org.apache.giraph.ooc.OutOfCoreEngine;
import org.apache.giraph.partition.PartitionOwner;
import org.apache.giraph.partition.PartitionStats;
import org.apache.giraph.partition.PartitionStore;
@@ -70,6 +75,11 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
+import javax.management.Notification;
+import javax.management.NotificationEmitter;
+import javax.management.NotificationListener;
+import javax.management.openmbean.CompositeData;
+
/**
* The Giraph-specific business logic for a single BSP
* compute node in whatever underlying type of cluster
@@ -110,6 +120,8 @@ end[PURE_YARN]*/
"time-to-first-message-ms";
/** Name of metric for time from first message till last message flushed */
public static final String TIMER_COMMUNICATION_TIME = "communication-time-ms";
+ /** Name of metric for time spent doing GC per superstep in msec */
+ public static final String TIMER_SUPERSTEP_GC_TIME = "superstep-gc-time-ms";
/** Class logger */
private static final Logger LOG = Logger.getLogger(GraphTaskManager.class);
@@ -156,6 +168,8 @@ end[PURE_YARN]*/
private GiraphTimerContext communicationTimerContext;
/** Timer for WorkerContext#preSuperstep() */
private GiraphTimer wcPreSuperstepTimer;
+ /** Timer to keep aggregated time spent in GC in a superstep */
+ private Counter gcTimeMetric;
/** The Hadoop Mapper#Context for this job */
private final Mapper<?, ?, ?, ?>.Context context;
/** is this GraphTaskManager the master? */
@@ -293,7 +307,9 @@ end[PURE_YARN]*/
return;
}
preLoadOnWorkerObservers();
+ GiraphTimerContext superstepTimerContext = superstepTimer.time();
finishedSuperstepStats = serviceWorker.setup();
+ superstepTimerContext.stop();
if (collectInputSuperstepStats(finishedSuperstepStats)) {
return;
}
@@ -304,8 +320,7 @@ end[PURE_YARN]*/
// main superstep processing loop
while (!finishedSuperstepStats.allVerticesHalted()) {
final long superstep = serviceWorker.getSuperstep();
- GiraphTimerContext superstepTimerContext =
- getTimerForThisSuperstep(superstep);
+ superstepTimerContext = getTimerForThisSuperstep(superstep);
GraphState graphState = new GraphState(superstep,
finishedSuperstepStats.getVertexCount(),
finishedSuperstepStats.getEdgeCount(),
@@ -619,6 +634,7 @@ end[PURE_YARN]*/
LOG.info("setup: Starting up BspServiceWorker...");
}
serviceWorker = new BspServiceWorker<I, V, E>(context, this);
+ installGCMonitoring();
if (LOG.isInfoEnabled()) {
LOG.info("setup: Registering health of this worker...");
}
@@ -626,6 +642,45 @@ end[PURE_YARN]*/
}
/**
+ * Install GC monitoring. This method intercepts all GC, log the gc, and
+ * notifies an out-of-core engine (if any is used) about the GC.
+ */
+ private void installGCMonitoring() {
+ List<GarbageCollectorMXBean> mxBeans = ManagementFactory
+ .getGarbageCollectorMXBeans();
+ final OutOfCoreEngine oocEngine =
+ serviceWorker.getServerData().getOocEngine();
+ for (GarbageCollectorMXBean gcBean : mxBeans) {
+ NotificationEmitter emitter = (NotificationEmitter) gcBean;
+ NotificationListener listener = new NotificationListener() {
+ @Override
+ public void handleNotification(Notification notification,
+ Object handle) {
+ if (notification.getType().equals(GarbageCollectionNotificationInfo
+ .GARBAGE_COLLECTION_NOTIFICATION)) {
+ GarbageCollectionNotificationInfo info =
+ GarbageCollectionNotificationInfo.from(
+ (CompositeData) notification.getUserData());
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("installGCMonitoring: name = " + info.getGcName() +
+ ", action = " + info.getGcAction() + ", cause = " +
+ info.getGcCause() + ", duration = " +
+ info.getGcInfo().getDuration() + "ms");
+ }
+ gcTimeMetric.inc(info.getGcInfo().getDuration());
+ if (oocEngine != null) {
+ oocEngine.gcCompleted(info);
+ }
+ }
+ }
+ };
+ //Add the listener
+ emitter.addNotificationListener(listener, null, null);
+ }
+ }
+
+ /**
* Initialize the root logger and appender to the settings in conf.
*/
private void initializeAndConfigureLogging() {
@@ -680,6 +735,7 @@ end[PURE_YARN]*/
TIMER_TIME_TO_FIRST_MSG, TimeUnit.MICROSECONDS);
communicationTimer = new GiraphTimer(superstepMetrics,
TIMER_COMMUNICATION_TIME, TimeUnit.MILLISECONDS);
+ gcTimeMetric = superstepMetrics.getCounter(TIMER_SUPERSTEP_GC_TIME);
wcPreSuperstepTimer = new GiraphTimer(superstepMetrics,
"worker-context-pre-superstep", TimeUnit.MILLISECONDS);
}
@@ -1000,6 +1056,12 @@ end[PURE_YARN]*/
return conf;
}
+ /**
+ * @return Time spent in GC recorder by the GC listener
+ */
+ public long getSuperstepGCTime() {
+ return gcTimeMetric.count();
+ }
/**
* Default handler for uncaught exceptions.
http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index cf8d1bd..8372bd3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -942,6 +942,10 @@ public class BspServiceMaster<I extends WritableComparable,
workerFinishedInfoObj.getString(
JSONOBJ_METRICS_KEY)),
workerMetrics);
+ globalStats.addOocLoadBytesCount(
+ workerMetrics.getBytesLoadedFromDisk());
+ globalStats.addOocStoreBytesCount(
+ workerMetrics.getBytesStoredOnDisk());
aggregatedMetrics.add(workerMetrics, hostnamePartitionId);
}
} catch (JSONException e) {
@@ -2050,5 +2054,9 @@ public class BspServiceMaster<I extends WritableComparable,
gs.getAggregateSentMessages().increment(globalStats.getMessageCount());
gs.getAggregateSentMessageBytes()
.increment(globalStats.getMessageBytesCount());
+ gs.getAggregateOOCBytesLoaded()
+ .increment(globalStats.getOocLoadBytesCount());
+ gs.getAggregateOOCBytesStored()
+ .increment(globalStats.getOocStoreBytesCount());
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetric.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetric.java b/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetric.java
index 9d782c4..f99043f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetric.java
+++ b/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetric.java
@@ -20,41 +20,26 @@ package org.apache.giraph.metrics;
/**
* An aggregator over metrics from multiple hosts. Computes min, max, and mean.
+ *
+ * @param <T> value type
*/
-public class AggregatedMetric {
+public abstract class AggregatedMetric<T extends Number> {
/** Minimum value seen with the host that it came from */
- private ValueWithHostname min;
+ protected ValueWithHostname<T> min;
/** Maximum value seen with the host that it came from */
- private ValueWithHostname max;
+ protected ValueWithHostname<T> max;
/** Total of all the values seen */
- private long sum;
+ protected T sum;
/** Number of values seen */
- private long count;
-
- /**
- * Create new aggregated metric.
- */
- public AggregatedMetric() {
- min = new ValueWithHostname(Long.MAX_VALUE);
- max = new ValueWithHostname(Long.MIN_VALUE);
- }
+ protected long count;
/**
* Add another item to the aggregation.
*
- * @param value long value to add
+ * @param value value to add
* @param hostnamePartitionId String hostname it came from
*/
- public void addItem(long value, String hostnamePartitionId) {
- if (value < min.getValue()) {
- min.set(value, hostnamePartitionId);
- }
- if (value > max.getValue()) {
- max.set(value, hostnamePartitionId);
- }
- sum += value;
- count++;
- }
+ public abstract void addItem(T value, String hostnamePartitionId);
/**
* Whether this AggregatedMetric has any data.
@@ -70,7 +55,7 @@ public class AggregatedMetric {
*
* @return ValueWithHostname for minimum
*/
- public ValueWithHostname min() {
+ public ValueWithHostname<T> min() {
return min;
}
@@ -79,16 +64,16 @@ public class AggregatedMetric {
*
* @return ValueWithHostname for maximum
*/
- public ValueWithHostname max() {
+ public ValueWithHostname<T> max() {
return max;
}
/**
* Get total of all the values seen
*
- * @return long total of values seen
+ * @return total of values seen
*/
- public long sum() {
+ public T sum() {
return sum;
}
@@ -97,9 +82,7 @@ public class AggregatedMetric {
*
* @return computed average of all the values
*/
- public double mean() {
- return sum / (double) count;
- }
+ public abstract double mean();
/**
* Get number of values seen
@@ -110,3 +93,4 @@ public class AggregatedMetric {
return count;
}
}
+
http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetricDouble.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetricDouble.java b/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetricDouble.java
new file mode 100644
index 0000000..abd0ff6
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetricDouble.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.metrics;
+
+/**
+ * Aggregator over metrics with double values
+ */
+public final class AggregatedMetricDouble extends AggregatedMetric<Double> {
+ /**
+ * Constructor
+ */
+ public AggregatedMetricDouble() {
+ min = new ValueWithHostname<>(Double.MAX_VALUE);
+ max = new ValueWithHostname<>(Double.MIN_VALUE);
+ sum = 0.0;
+ }
+
+ @Override
+ public void addItem(Double value, String hostnamePartitionId) {
+ if (value < min.getValue()) {
+ min.set(value, hostnamePartitionId);
+ }
+ if (value > max.getValue()) {
+ max.set(value, hostnamePartitionId);
+ }
+ sum += value;
+ count++;
+ }
+
+ @Override
+ public double mean() {
+ return sum / count;
+ }
+}