You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by er...@apache.org on 2013/02/05 19:13:15 UTC

git commit: GIRAPH-497: Limiting number of open requests doesn't work with multithreading (majakabiljo via ereisman)

Updated Branches:
  refs/heads/trunk 62c12fa0b -> b022dce9f


GIRAPH-497: Limiting number of open requests doesn't work with multithreading (majakabiljo via ereisman)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/b022dce9
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/b022dce9
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/b022dce9

Branch: refs/heads/trunk
Commit: b022dce9f32f2f48015a2b1489e4d72f08916bab
Parents: 62c12fa
Author: Eli Reisman <er...@apache.org>
Authored: Tue Feb 5 10:08:15 2013 -0800
Committer: Eli Reisman <er...@apache.org>
Committed: Tue Feb 5 10:08:15 2013 -0800

----------------------------------------------------------------------
 CHANGELOG                                          |    2 +
 .../org/apache/giraph/comm/netty/NettyClient.java  |  115 +++++++++------
 2 files changed, 70 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/b022dce9/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 2d25746..d50e1e5 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-497: Limiting number of open requests doesn't work with multithreading (majakabiljo via ereisman)
+
   GIRAPH-461: Convert static assignment of in-memory partitions with LRU cache (claudio)
 
   GIRAPH-494: Make Edge an interface (nitay)

http://git-wip-us.apache.org/repos/asf/giraph/blob/b022dce9/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
index ed92d82..76d38e2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -65,6 +65,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static org.jboss.netty.channel.Channels.pipeline;
 
@@ -150,6 +151,9 @@ public class NettyClient {
   private final ExecutionHandler executionHandler;
   /** Name of the handler before the execution handler (if used) */
   private final String handlerBeforeExecutionHandler;
+  /** When was the last time we checked if we should resend some requests */
+  private final AtomicLong lastTimeCheckedRequestsForProblems =
+      new AtomicLong(0);
 
   /**
    * Only constructor
@@ -640,6 +644,8 @@ public class NettyClient {
         addressRequestIdGenerator.getNextRequestId(remoteServer));
       ClientRequestId clientRequestId =
         new ClientRequestId(destTaskId, request.getRequestId());
+      ChannelFuture writeFuture = channel.write(request);
+      newRequestInfo.setWriteFuture(writeFuture);
       RequestInfo oldRequestInfo = clientRequestIdRequestInfoMap.putIfAbsent(
         clientRequestId, newRequestInfo);
       if (oldRequestInfo != null) {
@@ -648,8 +654,6 @@ public class NettyClient {
           "request info of " + oldRequestInfo);
       }
     }
-    ChannelFuture writeFuture = channel.write(request);
-    newRequestInfo.setWriteFuture(writeFuture);
 
     if (limitNumberOfOpenRequests &&
         clientRequestIdRequestInfoMap.size() > maxNumberOfOpenRequests) {
@@ -679,9 +683,6 @@ public class NettyClient {
    *                        complete
    */
   private void waitSomeRequests(int maxOpenRequests) {
-    List<ClientRequestId> addedRequestIds = Lists.newArrayList();
-    List<RequestInfo> addedRequestInfos = Lists.newArrayList();
-
     while (clientRequestIdRequestInfoMap.size() > maxOpenRequests) {
       // Wait for requests to complete for some time
       if (LOG.isInfoEnabled() && requestLogger.isPrintable()) {
@@ -712,53 +713,73 @@ public class NettyClient {
       // Make sure that waiting doesn't kill the job
       context.progress();
 
-      // Check all the requests for problems
-      for (Map.Entry<ClientRequestId, RequestInfo> entry :
-          clientRequestIdRequestInfoMap.entrySet()) {
-        RequestInfo requestInfo = entry.getValue();
-        ChannelFuture writeFuture = requestInfo.getWriteFuture();
-        // If not connected anymore, request failed, or the request is taking
-        // too long, re-establish and resend
-        if (!writeFuture.getChannel().isConnected() ||
-            (writeFuture.isDone() && !writeFuture.isSuccess()) ||
-            (requestInfo.getElapsedMsecs() > maxRequestMilliseconds)) {
-          LOG.warn("waitSomeRequests: Problem with request id " +
-              entry.getKey() + " connected = " +
-              writeFuture.getChannel().isConnected() +
-              ", future done = " + writeFuture.isDone() + ", " +
-              "success = " + writeFuture.isSuccess() + ", " +
-              "cause = " + writeFuture.getCause() + ", " +
-              "elapsed time = " + requestInfo.getElapsedMsecs() + ", " +
-              "destination = " + writeFuture.getChannel().getRemoteAddress() +
-              " " + requestInfo);
-          addedRequestIds.add(entry.getKey());
-          addedRequestInfos.add(new RequestInfo(
-              requestInfo.getDestinationAddress(), requestInfo.getRequest()));
-        }
+      checkRequestsForProblems();
+    }
+  }
+
+  /**
+   * Check if there are some open requests which have been sent a long time
+   * ago, and if so resend them.
+   */
+  private void checkRequestsForProblems() {
+    long lastTimeChecked = lastTimeCheckedRequestsForProblems.get();
+    // If not enough time passed from the previous check, return
+    if (System.currentTimeMillis() < lastTimeChecked + waitingRequestMsecs) {
+      return;
+    }
+    // If another thread did the check already, return
+    if (!lastTimeCheckedRequestsForProblems.compareAndSet(lastTimeChecked,
+        System.currentTimeMillis())) {
+      return;
+    }
+    List<ClientRequestId> addedRequestIds = Lists.newArrayList();
+    List<RequestInfo> addedRequestInfos = Lists.newArrayList();
+    // Check all the requests for problems
+    for (Map.Entry<ClientRequestId, RequestInfo> entry :
+        clientRequestIdRequestInfoMap.entrySet()) {
+      RequestInfo requestInfo = entry.getValue();
+      ChannelFuture writeFuture = requestInfo.getWriteFuture();
+      // If not connected anymore, request failed, or the request is taking
+      // too long, re-establish and resend
+      if (!writeFuture.getChannel().isConnected() ||
+          (writeFuture.isDone() && !writeFuture.isSuccess()) ||
+          (requestInfo.getElapsedMsecs() > maxRequestMilliseconds)) {
+        LOG.warn("checkRequestsForProblems: Problem with request id " +
+            entry.getKey() + " connected = " +
+            writeFuture.getChannel().isConnected() +
+            ", future done = " + writeFuture.isDone() + ", " +
+            "success = " + writeFuture.isSuccess() + ", " +
+            "cause = " + writeFuture.getCause() + ", " +
+            "elapsed time = " + requestInfo.getElapsedMsecs() + ", " +
+            "destination = " + writeFuture.getChannel().getRemoteAddress() +
+            " " + requestInfo);
+        addedRequestIds.add(entry.getKey());
+        addedRequestInfos.add(new RequestInfo(
+            requestInfo.getDestinationAddress(), requestInfo.getRequest()));
       }
+    }
 
-      // Add any new requests to the system, connect if necessary, and re-send
-      for (int i = 0; i < addedRequestIds.size(); ++i) {
-        ClientRequestId requestId = addedRequestIds.get(i);
-        RequestInfo requestInfo = addedRequestInfos.get(i);
+    // Add any new requests to the system, connect if necessary, and re-send
+    for (int i = 0; i < addedRequestIds.size(); ++i) {
+      ClientRequestId requestId = addedRequestIds.get(i);
+      RequestInfo requestInfo = addedRequestInfos.get(i);
 
-        if (clientRequestIdRequestInfoMap.put(requestId, requestInfo) ==
-            null) {
-          LOG.warn("waitSomeRequests: Request " + requestId +
-              " completed prior to sending the next request");
-          clientRequestIdRequestInfoMap.remove(requestId);
-        }
-        InetSocketAddress remoteServer = requestInfo.getDestinationAddress();
-        Channel channel = getNextChannel(remoteServer);
-        if (LOG.isInfoEnabled()) {
-          LOG.info("waitSomeRequests: Re-issuing request " + requestInfo);
-        }
-        ChannelFuture writeFuture = channel.write(requestInfo.getRequest());
-        requestInfo.setWriteFuture(writeFuture);
+      if (clientRequestIdRequestInfoMap.put(requestId, requestInfo) ==
+          null) {
+        LOG.warn("checkRequestsForProblems: Request " + requestId +
+            " completed prior to sending the next request");
+        clientRequestIdRequestInfoMap.remove(requestId);
+      }
+      InetSocketAddress remoteServer = requestInfo.getDestinationAddress();
+      Channel channel = getNextChannel(remoteServer);
+      if (LOG.isInfoEnabled()) {
+        LOG.info("checkRequestsForProblems: Re-issuing request " + requestInfo);
       }
-      addedRequestIds.clear();
-      addedRequestInfos.clear();
+      ChannelFuture writeFuture = channel.write(requestInfo.getRequest());
+      requestInfo.setWriteFuture(writeFuture);
     }
+    addedRequestIds.clear();
+    addedRequestInfos.clear();
   }
 
   /**