You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by he...@apache.org on 2017/03/27 18:22:45 UTC
git commit: updated refs/heads/trunk to 2173d87
Repository: giraph
Updated Branches:
refs/heads/trunk a1d546f7a -> 2173d87cf
JIRA-1137
closes #26
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/2173d87c
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/2173d87c
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/2173d87c
Branch: refs/heads/trunk
Commit: 2173d87cf4aa84937abe438d0f836c8fdd3ba1c1
Parents: a1d546f
Author: Hassan Eslami <he...@apache.org>
Authored: Mon Mar 27 13:22:09 2017 -0500
Committer: Hassan Eslami <ha...@wirelessprv-10-193-225-240.near.illinois.edu>
Committed: Mon Mar 27 13:22:09 2017 -0500
----------------------------------------------------------------------
.../flow_control/CreditBasedFlowControl.java | 68 +++++++++++++++-----
.../ooc/policy/MemoryEstimatorOracle.java | 11 ++--
.../giraph/ooc/policy/ThresholdBasedOracle.java | 11 ++--
.../org/apache/giraph/utils/ThreadUtils.java | 18 ++++++
4 files changed, 79 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/2173d87c/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java
index 18cf017..1e06925 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java
@@ -23,6 +23,7 @@ import static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.giraph.comm.netty.NettyClient;
@@ -32,6 +33,7 @@ import org.apache.giraph.comm.requests.WritableRequest;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.conf.IntConfOption;
import org.apache.giraph.utils.AdjustableSemaphore;
+import org.apache.giraph.utils.ThreadUtils;
import org.apache.log4j.Logger;
import java.util.ArrayDeque;
@@ -41,6 +43,8 @@ import java.util.Comparator;
import java.util.Deque;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -166,6 +170,14 @@ public class CreditBasedFlowControl implements FlowControl {
private final ConcurrentMap<Integer, Set<Long>> resumeRequestsId =
Maps.newConcurrentMap();
/**
+ * Queue of the cached requests to be sent out. The queue keeps pairs of
+ * (destination id, request). The thread-safe blocking queue is used here for
+ * the sake of simplicity. The blocking queue should be bounded (with bounds
+ * no more than user-defined max number of unsent/cached requests) to avoid
+ * excessive memory footprint.
+ */
+ private final BlockingQueue<Pair<Integer, WritableRequest>> toBeSent;
+ /**
* Semaphore to control number of cached unsent requests. Maximum number of
* permits of this semaphore should be equal to MAX_NUM_OF_UNSENT_REQUESTS.
*/
@@ -180,7 +192,7 @@ public class CreditBasedFlowControl implements FlowControl {
* @param exceptionHandler Exception handler
*/
public CreditBasedFlowControl(ImmutableClassesGiraphConfiguration conf,
- NettyClient nettyClient,
+ final NettyClient nettyClient,
Thread.UncaughtExceptionHandler
exceptionHandler) {
this.nettyClient = nettyClient;
@@ -189,10 +201,15 @@ public class CreditBasedFlowControl implements FlowControl {
checkState(maxOpenRequestsPerWorker < 0x4000 &&
maxOpenRequestsPerWorker > 0, "NettyClient: max number of open " +
"requests should be in range (0, " + 0x4FFF + ")");
- unsentRequestPermit = new Semaphore(MAX_NUM_OF_UNSENT_REQUESTS.get(conf));
+ int maxUnsentRequests = MAX_NUM_OF_UNSENT_REQUESTS.get(conf);
+ unsentRequestPermit = new Semaphore(maxUnsentRequests);
+ this.toBeSent = new ArrayBlockingQueue<Pair<Integer, WritableRequest>>(
+ maxUnsentRequests);
unsentWaitMsecs = UNSENT_CACHE_WAIT_INTERVAL.get(conf);
waitingRequestMsecs = WAITING_REQUEST_MSECS.get(conf);
- Thread thread = new Thread(new Runnable() {
+
+ // Thread to handle/send resume signals when necessary
+ ThreadUtils.startThread(new Runnable() {
@Override
public void run() {
while (true) {
@@ -214,11 +231,31 @@ public class CreditBasedFlowControl implements FlowControl {
}
}
}
- });
- thread.setUncaughtExceptionHandler(exceptionHandler);
- thread.setName("resume-sender");
- thread.setDaemon(true);
- thread.start();
+ }, "resume-sender", exceptionHandler);
+
+ // Thread to handle/send cached requests
+ ThreadUtils.startThread(new Runnable() {
+ @Override
+ public void run() {
+ while (true) {
+ Pair<Integer, WritableRequest> pair = null;
+ try {
+ pair = toBeSent.take();
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("run: failed while waiting to " +
+ "take an element from the request queue!", e);
+ }
+ int taskId = pair.getLeft();
+ WritableRequest request = pair.getRight();
+ nettyClient.doSend(taskId, request);
+ if (aggregateUnsentRequests.decrementAndGet() == 0) {
+ synchronized (aggregateUnsentRequests) {
+ aggregateUnsentRequests.notifyAll();
+ }
+ }
+ }
+ }
+ }, "cached-req-sender", exceptionHandler);
}
/**
@@ -510,13 +547,14 @@ public class CreditBasedFlowControl implements FlowControl {
}
unsentRequestPermit.release();
// At this point, we have a request, and we reserved a credit for the
- // sender client. So, we send the request to the client and update
- // the state.
- nettyClient.doSend(taskId, request);
- if (aggregateUnsentRequests.decrementAndGet() == 0) {
- synchronized (aggregateUnsentRequests) {
- aggregateUnsentRequests.notifyAll();
- }
+ // sender client. So, we put the request in a queue to be sent to the
+ // client.
+ try {
+ toBeSent.put(
+ new ImmutablePair<Integer, WritableRequest>(taskId, request));
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("trySendCachedRequests: failed while" +
+ "waiting to put element in send queue!", e);
}
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/2173d87c/giraph-core/src/main/java/org/apache/giraph/ooc/policy/MemoryEstimatorOracle.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/MemoryEstimatorOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/MemoryEstimatorOracle.java
index fd6172c..871ef6c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/MemoryEstimatorOracle.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/MemoryEstimatorOracle.java
@@ -29,6 +29,7 @@ import org.apache.giraph.ooc.OutOfCoreEngine;
import org.apache.giraph.ooc.command.IOCommand;
import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
import org.apache.giraph.ooc.command.WaitIOCommand;
+import org.apache.giraph.utils.ThreadUtils;
import org.apache.giraph.worker.EdgeInputSplitsCallable;
import org.apache.giraph.worker.VertexInputSplitsCallable;
import org.apache.giraph.worker.WorkerProgress;
@@ -171,7 +172,7 @@ public class MemoryEstimatorOracle implements OutOfCoreOracle {
final long checkMemoryInterval = CHECK_MEMORY_INTERVAL.get(conf);
- Thread thread = new Thread(new Runnable() {
+ ThreadUtils.startThread(new Runnable() {
@Override
public void run() {
while (true) {
@@ -211,12 +212,8 @@ public class MemoryEstimatorOracle implements OutOfCoreOracle {
}
}
}
- });
- thread.setUncaughtExceptionHandler(oocEngine.getServiceWorker()
- .getGraphTaskManager().createUncaughtExceptionHandler());
- thread.setName("ooc-memory-checker");
- thread.setDaemon(true);
- thread.start();
+ }, "ooc-memory-checker", oocEngine.getServiceWorker().getGraphTaskManager()
+ .createUncaughtExceptionHandler());
}
/**
http://git-wip-us.apache.org/repos/asf/giraph/blob/2173d87c/giraph-core/src/main/java/org/apache/giraph/ooc/policy/ThresholdBasedOracle.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/ThresholdBasedOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/ThresholdBasedOracle.java
index 2dd2c10..2f1ba7a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/ThresholdBasedOracle.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/ThresholdBasedOracle.java
@@ -26,6 +26,7 @@ import org.apache.giraph.conf.LongConfOption;
import org.apache.giraph.ooc.OutOfCoreEngine;
import org.apache.giraph.ooc.command.IOCommand;
import org.apache.giraph.utils.MemoryUtils;
+import org.apache.giraph.utils.ThreadUtils;
import org.apache.log4j.Logger;
import static com.google.common.base.Preconditions.checkState;
@@ -170,7 +171,7 @@ public class ThresholdBasedOracle implements OutOfCoreOracle {
this.oocEngine = oocEngine;
this.lastMajorGCTime = 0;
- final Thread thread = new Thread(new Runnable() {
+ ThreadUtils.startThread(new Runnable() {
@Override
public void run() {
while (true) {
@@ -207,12 +208,8 @@ public class ThresholdBasedOracle implements OutOfCoreOracle {
}
}
}
- });
- thread.setUncaughtExceptionHandler(oocEngine.getServiceWorker()
- .getGraphTaskManager().createUncaughtExceptionHandler());
- thread.setName("memory-checker");
- thread.setDaemon(true);
- thread.start();
+ }, "memory-checker", oocEngine.getServiceWorker().getGraphTaskManager().
+ createUncaughtExceptionHandler());
}
/**
http://git-wip-us.apache.org/repos/asf/giraph/blob/2173d87c/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java
index 0971402..21e8e49 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java
@@ -106,6 +106,24 @@ public class ThreadUtils {
}
/**
+ * Start thread with specified name, runnable and exception handler, and make
+ * it daemon
+ *
+ * @param runnable Runnable to execute
+ * @param threadName Name of the thread
+ * @param handler Exception handler
+ * @return Thread
+ */
+ public static Thread startThread(Runnable runnable, String threadName,
+ Thread.UncaughtExceptionHandler handler) {
+ Thread thread = new Thread(runnable, threadName);
+ thread.setUncaughtExceptionHandler(handler);
+ thread.setDaemon(true);
+ thread.start();
+ return thread;
+ }
+
+ /**
* Sleep for specified milliseconds, logging and ignoring interrupted
* exceptions
*