You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by jg...@apache.org on 2012/08/07 23:05:24 UTC
svn commit: r1370522 - in /giraph/trunk: CHANGELOG
src/main/java/org/apache/giraph/comm/NettyClient.java
Author: jghoman
Date: Tue Aug 7 21:05:23 2012
New Revision: 1370522
URL: http://svn.apache.org/viewvc?rev=1370522&view=rev
Log:
GIRAPH-287. Add option to limit the number of open requests. Contributed by Maja Kabiljo.
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java
Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1370522&r1=1370521&r2=1370522&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Tue Aug 7 21:05:23 2012
@@ -2,6 +2,9 @@ Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-287: Add option to limit the number of open requests.
+ (Maja Kabiljo via jghoman)
+
GIRAPH-262: Netty optimization to handle requests locally whenever
possible. (aching)
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java?rev=1370522&r1=1370521&r2=1370522&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java Tue Aug 7 21:05:23 2012
@@ -57,6 +57,17 @@ public class NettyClient<I extends Writa
M extends Writable> {
/** Msecs to wait between waiting for all requests to finish */
public static final int WAITING_REQUEST_MSECS = 15000;
+ /** Do we have a limit on number of open requests we can have */
+ public static final String LIMIT_NUMBER_OF_OPEN_REQUESTS =
+ "giraph.waitForRequestsConfirmation";
+ /** Default choice about having a limit on number of open requests */
+ public static final boolean LIMIT_NUMBER_OF_OPEN_REQUESTS_DEFAULT = false;
+ /** Maximum number of requests without confirmation we should have */
+ public static final String MAX_NUMBER_OF_OPEN_REQUESTS =
+ "giraph.maxNumberOfOpenRequests";
+ /** Default maximum number of requests without confirmation */
+ public static final int MAX_NUMBER_OF_OPEN_REQUESTS_DEFAULT = 10000;
+
/** Class logger */
private static final Logger LOG = Logger.getLogger(NettyClient.class);
/** Context used to report progress */
@@ -80,6 +91,11 @@ public class NettyClient<I extends Writa
/** Receive buffer size */
private final int receiveBufferSize;
+ /** Do we have a limit on number of open requests */
+ private final boolean limitNumberOfOpenRequests;
+ /** Maximum number of requests without confirmation we can have */
+ private final int maxNumberOfOpenRequests;
+
/**
* Only constructor
*
@@ -96,6 +112,21 @@ public class NettyClient<I extends Writa
receiveBufferSize = conf.getInt(GiraphJob.CLIENT_RECEIVE_BUFFER_SIZE,
GiraphJob.DEFAULT_CLIENT_RECEIVE_BUFFER_SIZE);
+ limitNumberOfOpenRequests = conf.getBoolean(
+ LIMIT_NUMBER_OF_OPEN_REQUESTS,
+ LIMIT_NUMBER_OF_OPEN_REQUESTS_DEFAULT);
+ if (limitNumberOfOpenRequests) {
+ maxNumberOfOpenRequests = conf.getInt(
+ MAX_NUMBER_OF_OPEN_REQUESTS,
+ MAX_NUMBER_OF_OPEN_REQUESTS_DEFAULT);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("NettyClient: Limit number of open requests to " +
+ maxNumberOfOpenRequests);
+ }
+ } else {
+ maxNumberOfOpenRequests = -1;
+ }
+
// Configure the client.
bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
@@ -222,6 +253,10 @@ public class NettyClient<I extends Writa
"sendWritableRequest: No channel exists for " + remoteServer);
}
channel.write(request);
+ if (limitNumberOfOpenRequests &&
+ waitingRequestCount.get() > maxNumberOfOpenRequests) {
+ waitSomeRequests(maxNumberOfOpenRequests);
+ }
}
/**
@@ -230,12 +265,27 @@ public class NettyClient<I extends Writa
* @throws InterruptedException
*/
public void waitAllRequests() {
+ waitSomeRequests(0);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("waitAllRequests: Finished all requests. " +
+ byteCounter.getMetrics());
+ }
+ }
+
+ /**
+ * Ensure that at most maxOpenRequests are not complete
+ *
+ * @param maxOpenRequests Maximum number of requests which can be not
+ * complete
+ */
+ private void waitSomeRequests(int maxOpenRequests) {
synchronized (waitingRequestCount) {
- while (waitingRequestCount.get() != 0) {
+ while (waitingRequestCount.get() > maxOpenRequests) {
if (LOG.isInfoEnabled()) {
- LOG.info("waitAllRequests: Waiting interval of " +
- WAITING_REQUEST_MSECS + " msecs and still waiting on " +
- waitingRequestCount + " requests, " + byteCounter.getMetrics());
+ LOG.info("waitSomeRequests: Waiting interval of " +
+ WAITING_REQUEST_MSECS + " msecs, " + waitingRequestCount +
+ " open requests, waiting for it to be <= " + maxOpenRequests +
+ ", " + byteCounter.getMetrics());
}
try {
waitingRequestCount.wait(WAITING_REQUEST_MSECS);
@@ -246,10 +296,6 @@ public class NettyClient<I extends Writa
context.progress();
}
}
- if (LOG.isInfoEnabled()) {
- LOG.info("waitAllRequests: Finished all requests. " +
- byteCounter.getMetrics());
- }
}
/**