You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/02/25 20:23:02 UTC

git commit: GIRAPH-539: When having open requests log which workers are they sent to (majakabiljo)

Updated Branches:
  refs/heads/trunk 2347627d5 -> 71eab655e


GIRAPH-539: When having open requests log which workers are they sent to (majakabiljo)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/71eab655
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/71eab655
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/71eab655

Branch: refs/heads/trunk
Commit: 71eab655eea02b946fb88907650168d7c2d18bef
Parents: 2347627
Author: Maja Kabiljo <ma...@maja-mbp.local>
Authored: Mon Feb 25 11:21:00 2013 -0800
Committer: Maja Kabiljo <ma...@maja-mbp.local>
Committed: Mon Feb 25 11:22:00 2013 -0800

----------------------------------------------------------------------
 CHANGELOG                                          |    2 +
 .../org/apache/giraph/comm/netty/NettyClient.java  |   83 ++++++++++++---
 2 files changed, 69 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/71eab655/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index ec34ba0..3323438 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-539: When having open requests log which workers are they sent to (majakabiljo)
+
   GIRAPH-532: Give an explanation when trying to use unregistered aggregators (majakabiljo)
 
   GIRAPH-453: Pure Hive I/O (nitay)

http://git-wip-us.apache.org/repos/asf/giraph/blob/71eab655/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
index feae3e2..af76410 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -53,11 +53,14 @@ import org.jboss.netty.handler.execution.MemoryAwareThreadPoolExecutor;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.MapMaker;
+import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
@@ -85,6 +88,11 @@ public class NettyClient {
   public static final int MAX_NUMBER_OF_OPEN_REQUESTS_DEFAULT = 10000;
   /** Maximum number of requests to list (for debugging) */
   public static final int MAX_REQUESTS_TO_LIST = 10;
+  /**
+   * Maximum number of destination task ids with open requests to list
+   * (for debugging)
+   */
+  public static final int MAX_DESTINATION_TASK_IDS_TO_LIST = 10;
   /** 30 seconds to connect by default */
   public static final int MAX_CONNECTION_MILLISECONDS_DEFAULT = 30 * 1000;
 /*if_not[HADOOP_NON_SECURE]*/
@@ -685,21 +693,7 @@ public class NettyClient {
   private void waitSomeRequests(int maxOpenRequests) {
     while (clientRequestIdRequestInfoMap.size() > maxOpenRequests) {
       // Wait for requests to complete for some time
-      if (LOG.isInfoEnabled() && requestLogger.isPrintable()) {
-        LOG.info("waitSomeRequests: Waiting interval of " +
-            waitingRequestMsecs + " msecs, " +
-            clientRequestIdRequestInfoMap.size() +
-            " open requests, waiting for it to be <= " + maxOpenRequests +
-            ", " + byteCounter.getMetrics());
-
-        if (clientRequestIdRequestInfoMap.size() < MAX_REQUESTS_TO_LIST) {
-          for (Map.Entry<ClientRequestId, RequestInfo> entry :
-              clientRequestIdRequestInfoMap.entrySet()) {
-            LOG.info("waitSomeRequests: Waiting for request " +
-                entry.getKey() + " - " + entry.getValue());
-          }
-        }
-      }
+      logInfoAboutOpenRequests(maxOpenRequests);
       synchronized (clientRequestIdRequestInfoMap) {
         if (clientRequestIdRequestInfoMap.size() <= maxOpenRequests) {
           break;
@@ -707,7 +701,7 @@ public class NettyClient {
         try {
           clientRequestIdRequestInfoMap.wait(waitingRequestMsecs);
         } catch (InterruptedException e) {
-          LOG.error("waitFutures: Got unexpected InterruptedException", e);
+          LOG.error("waitSomeRequests: Got unexpected InterruptedException", e);
         }
       }
       // Make sure that waiting doesn't kill the job
@@ -718,6 +712,63 @@ public class NettyClient {
   }
 
   /**
+   * Log the status of open requests.
+   *
+   * @param maxOpenRequests Maximum number of requests which can be not complete
+   */
+  private void logInfoAboutOpenRequests(int maxOpenRequests) {
+    if (LOG.isInfoEnabled() && requestLogger.isPrintable()) {
+      LOG.info("logInfoAboutOpenRequests: Waiting interval of " +
+          waitingRequestMsecs + " msecs, " +
+          clientRequestIdRequestInfoMap.size() +
+          " open requests, waiting for it to be <= " + maxOpenRequests +
+          ", " + byteCounter.getMetrics());
+
+      if (clientRequestIdRequestInfoMap.size() < MAX_REQUESTS_TO_LIST) {
+        for (Map.Entry<ClientRequestId, RequestInfo> entry :
+            clientRequestIdRequestInfoMap.entrySet()) {
+          LOG.info("logInfoAboutOpenRequests: Waiting for request " +
+              entry.getKey() + " - " + entry.getValue());
+        }
+      }
+
+      // Count how many open requests each task has
+      Map<Integer, Integer> openRequestCounts = Maps.newHashMap();
+      for (ClientRequestId clientRequestId :
+          clientRequestIdRequestInfoMap.keySet()) {
+        int taskId = clientRequestId.getDestinationTaskId();
+        Integer currentCount = openRequestCounts.get(taskId);
+        openRequestCounts.put(taskId,
+            (currentCount == null ? 0 : currentCount) + 1);
+      }
+      // Sort it in decreasing order of number of open requests
+      List<Map.Entry<Integer, Integer>> sorted =
+          Lists.newArrayList(openRequestCounts.entrySet());
+      Collections.sort(sorted, new Comparator<Map.Entry<Integer, Integer>>() {
+        @Override
+        public int compare(Map.Entry<Integer, Integer> entry1,
+            Map.Entry<Integer, Integer> entry2) {
+          int value1 = entry1.getValue();
+          int value2 = entry2.getValue();
+          return (value1 < value2) ? 1 : ((value1 == value2) ? 0 : -1);
+        }
+      });
+      // Print task ids which have the most open requests
+      StringBuilder message = new StringBuilder();
+      message.append("logInfoAboutOpenRequests: ");
+      int itemsToPrint =
+          Math.min(MAX_DESTINATION_TASK_IDS_TO_LIST, sorted.size());
+      for (int i = 0; i < itemsToPrint; i++) {
+        message.append(sorted.get(i).getValue())
+            .append(" requests for taskId=")
+            .append(sorted.get(i).getKey())
+            .append(", ");
+      }
+      LOG.info(message);
+    }
+  }
+
+  /**
    * Check if there are some open requests which have been sent a long time
    * ago, and if so resend them.
    */