You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by jg...@apache.org on 2012/08/07 23:05:24 UTC

svn commit: r1370522 - in /giraph/trunk: CHANGELOG src/main/java/org/apache/giraph/comm/NettyClient.java

Author: jghoman
Date: Tue Aug  7 21:05:23 2012
New Revision: 1370522

URL: http://svn.apache.org/viewvc?rev=1370522&view=rev
Log:
GIRAPH-287. Add option to limit the number of open requests. Contributed by Maja Kabiljo.

Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1370522&r1=1370521&r2=1370522&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Tue Aug  7 21:05:23 2012
@@ -2,6 +2,9 @@ Giraph Change Log
 
 Release 0.2.0 - unreleased
 
+  GIRAPH-287: Add option to limit the number of open requests.
+  (Maja Kabiljo via jghoman)
+
   GIRAPH-262: Netty optimization to handle requests locally whenever
   possible. (aching)
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java?rev=1370522&r1=1370521&r2=1370522&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java Tue Aug  7 21:05:23 2012
@@ -57,6 +57,17 @@ public class NettyClient<I extends Writa
     M extends Writable> {
   /** Msecs to wait between waiting for all requests to finish */
   public static final int WAITING_REQUEST_MSECS = 15000;
+  /** Do we have a limit on number of open requests we can have */
+  public static final String LIMIT_NUMBER_OF_OPEN_REQUESTS =
+      "giraph.waitForRequestsConfirmation";
+  /** Default choice about having a limit on number of open requests */
+  public static final boolean LIMIT_NUMBER_OF_OPEN_REQUESTS_DEFAULT = false;
+  /** Maximum number of requests without confirmation we should have */
+  public static final String MAX_NUMBER_OF_OPEN_REQUESTS =
+      "giraph.maxNumberOfOpenRequests";
+  /** Default maximum number of requests without confirmation */
+  public static final int MAX_NUMBER_OF_OPEN_REQUESTS_DEFAULT = 10000;
+
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(NettyClient.class);
   /** Context used to report progress */
@@ -80,6 +91,11 @@ public class NettyClient<I extends Writa
   /** Receive buffer size */
   private final int receiveBufferSize;
 
+  /** Do we have a limit on number of open requests */
+  private final boolean limitNumberOfOpenRequests;
+  /** Maximum number of requests without confirmation we can have */
+  private final int maxNumberOfOpenRequests;
+
   /**
    * Only constructor
    *
@@ -96,6 +112,21 @@ public class NettyClient<I extends Writa
     receiveBufferSize = conf.getInt(GiraphJob.CLIENT_RECEIVE_BUFFER_SIZE,
         GiraphJob.DEFAULT_CLIENT_RECEIVE_BUFFER_SIZE);
 
+    limitNumberOfOpenRequests = conf.getBoolean(
+        LIMIT_NUMBER_OF_OPEN_REQUESTS,
+        LIMIT_NUMBER_OF_OPEN_REQUESTS_DEFAULT);
+    if (limitNumberOfOpenRequests) {
+      maxNumberOfOpenRequests = conf.getInt(
+          MAX_NUMBER_OF_OPEN_REQUESTS,
+          MAX_NUMBER_OF_OPEN_REQUESTS_DEFAULT);
+      if (LOG.isInfoEnabled()) {
+        LOG.info("NettyClient: Limit number of open requests to " +
+            maxNumberOfOpenRequests);
+      }
+    } else {
+      maxNumberOfOpenRequests = -1;
+    }
+
     // Configure the client.
     bootstrap = new ClientBootstrap(
         new NioClientSocketChannelFactory(
@@ -222,6 +253,10 @@ public class NettyClient<I extends Writa
           "sendWritableRequest: No channel exists for " + remoteServer);
     }
     channel.write(request);
+    if (limitNumberOfOpenRequests &&
+        waitingRequestCount.get() > maxNumberOfOpenRequests) {
+      waitSomeRequests(maxNumberOfOpenRequests);
+    }
   }
 
   /**
@@ -230,12 +265,27 @@ public class NettyClient<I extends Writa
    * @throws InterruptedException
    */
   public void waitAllRequests() {
+    waitSomeRequests(0);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("waitAllRequests: Finished all requests. " +
+          byteCounter.getMetrics());
+    }
+  }
+
+  /**
+   * Ensure that at most maxOpenRequests are not complete
+   *
+   * @param maxOpenRequests Maximum number of requests which can be not
+   *                        complete
+   */
+  private void waitSomeRequests(int maxOpenRequests) {
     synchronized (waitingRequestCount) {
-      while (waitingRequestCount.get() != 0) {
+      while (waitingRequestCount.get() > maxOpenRequests) {
         if (LOG.isInfoEnabled()) {
-          LOG.info("waitAllRequests: Waiting interval of " +
-              WAITING_REQUEST_MSECS + " msecs and still waiting on " +
-              waitingRequestCount + " requests, " + byteCounter.getMetrics());
+          LOG.info("waitSomeRequests: Waiting interval of " +
+              WAITING_REQUEST_MSECS + " msecs, " + waitingRequestCount +
+              " open requests, waiting for it to be <= " + maxOpenRequests +
+              ", " + byteCounter.getMetrics());
         }
         try {
           waitingRequestCount.wait(WAITING_REQUEST_MSECS);
@@ -246,10 +296,6 @@ public class NettyClient<I extends Writa
         context.progress();
       }
     }
-    if (LOG.isInfoEnabled()) {
-      LOG.info("waitAllRequests: Finished all requests. " +
-          byteCounter.getMetrics());
-    }
   }
 
   /**