You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/02/25 20:23:02 UTC
git commit: GIRAPH-539: When having open requests log which workers
are they sent to (majakabiljo)
Updated Branches:
refs/heads/trunk 2347627d5 -> 71eab655e
GIRAPH-539: When having open requests log which workers are they sent to (majakabiljo)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/71eab655
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/71eab655
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/71eab655
Branch: refs/heads/trunk
Commit: 71eab655eea02b946fb88907650168d7c2d18bef
Parents: 2347627
Author: Maja Kabiljo <ma...@maja-mbp.local>
Authored: Mon Feb 25 11:21:00 2013 -0800
Committer: Maja Kabiljo <ma...@maja-mbp.local>
Committed: Mon Feb 25 11:22:00 2013 -0800
----------------------------------------------------------------------
CHANGELOG | 2 +
.../org/apache/giraph/comm/netty/NettyClient.java | 83 ++++++++++++---
2 files changed, 69 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/71eab655/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index ec34ba0..3323438 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-539: When having open requests log which workers are they sent to (majakabiljo)
+
GIRAPH-532: Give an explanation when trying to use unregistered aggregators (majakabiljo)
GIRAPH-453: Pure Hive I/O (nitay)
http://git-wip-us.apache.org/repos/asf/giraph/blob/71eab655/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
index feae3e2..af76410 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -53,11 +53,14 @@ import org.jboss.netty.handler.execution.MemoryAwareThreadPoolExecutor;
import com.google.common.collect.Lists;
import com.google.common.collect.MapMaker;
+import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
@@ -85,6 +88,11 @@ public class NettyClient {
public static final int MAX_NUMBER_OF_OPEN_REQUESTS_DEFAULT = 10000;
/** Maximum number of requests to list (for debugging) */
public static final int MAX_REQUESTS_TO_LIST = 10;
+ /**
+ * Maximum number of destination task ids with open requests to list
+ * (for debugging)
+ */
+ public static final int MAX_DESTINATION_TASK_IDS_TO_LIST = 10;
/** 30 seconds to connect by default */
public static final int MAX_CONNECTION_MILLISECONDS_DEFAULT = 30 * 1000;
/*if_not[HADOOP_NON_SECURE]*/
@@ -685,21 +693,7 @@ public class NettyClient {
private void waitSomeRequests(int maxOpenRequests) {
while (clientRequestIdRequestInfoMap.size() > maxOpenRequests) {
// Wait for requests to complete for some time
- if (LOG.isInfoEnabled() && requestLogger.isPrintable()) {
- LOG.info("waitSomeRequests: Waiting interval of " +
- waitingRequestMsecs + " msecs, " +
- clientRequestIdRequestInfoMap.size() +
- " open requests, waiting for it to be <= " + maxOpenRequests +
- ", " + byteCounter.getMetrics());
-
- if (clientRequestIdRequestInfoMap.size() < MAX_REQUESTS_TO_LIST) {
- for (Map.Entry<ClientRequestId, RequestInfo> entry :
- clientRequestIdRequestInfoMap.entrySet()) {
- LOG.info("waitSomeRequests: Waiting for request " +
- entry.getKey() + " - " + entry.getValue());
- }
- }
- }
+ logInfoAboutOpenRequests(maxOpenRequests);
synchronized (clientRequestIdRequestInfoMap) {
if (clientRequestIdRequestInfoMap.size() <= maxOpenRequests) {
break;
@@ -707,7 +701,7 @@ public class NettyClient {
try {
clientRequestIdRequestInfoMap.wait(waitingRequestMsecs);
} catch (InterruptedException e) {
- LOG.error("waitFutures: Got unexpected InterruptedException", e);
+ LOG.error("waitSomeRequests: Got unexpected InterruptedException", e);
}
}
// Make sure that waiting doesn't kill the job
@@ -718,6 +712,63 @@ public class NettyClient {
}
/**
+ * Log the status of open requests.
+ *
+ * @param maxOpenRequests Maximum number of requests which can be not complete
+ */
+ private void logInfoAboutOpenRequests(int maxOpenRequests) {
+ if (LOG.isInfoEnabled() && requestLogger.isPrintable()) {
+ LOG.info("logInfoAboutOpenRequests: Waiting interval of " +
+ waitingRequestMsecs + " msecs, " +
+ clientRequestIdRequestInfoMap.size() +
+ " open requests, waiting for it to be <= " + maxOpenRequests +
+ ", " + byteCounter.getMetrics());
+
+ if (clientRequestIdRequestInfoMap.size() < MAX_REQUESTS_TO_LIST) {
+ for (Map.Entry<ClientRequestId, RequestInfo> entry :
+ clientRequestIdRequestInfoMap.entrySet()) {
+ LOG.info("logInfoAboutOpenRequests: Waiting for request " +
+ entry.getKey() + " - " + entry.getValue());
+ }
+ }
+
+ // Count how many open requests each task has
+ Map<Integer, Integer> openRequestCounts = Maps.newHashMap();
+ for (ClientRequestId clientRequestId :
+ clientRequestIdRequestInfoMap.keySet()) {
+ int taskId = clientRequestId.getDestinationTaskId();
+ Integer currentCount = openRequestCounts.get(taskId);
+ openRequestCounts.put(taskId,
+ (currentCount == null ? 0 : currentCount) + 1);
+ }
+ // Sort it in decreasing order of number of open requests
+ List<Map.Entry<Integer, Integer>> sorted =
+ Lists.newArrayList(openRequestCounts.entrySet());
+ Collections.sort(sorted, new Comparator<Map.Entry<Integer, Integer>>() {
+ @Override
+ public int compare(Map.Entry<Integer, Integer> entry1,
+ Map.Entry<Integer, Integer> entry2) {
+ int value1 = entry1.getValue();
+ int value2 = entry2.getValue();
+ return (value1 < value2) ? 1 : ((value1 == value2) ? 0 : -1);
+ }
+ });
+ // Print task ids which have the most open requests
+ StringBuilder message = new StringBuilder();
+ message.append("logInfoAboutOpenRequests: ");
+ int itemsToPrint =
+ Math.min(MAX_DESTINATION_TASK_IDS_TO_LIST, sorted.size());
+ for (int i = 0; i < itemsToPrint; i++) {
+ message.append(sorted.get(i).getValue())
+ .append(" requests for taskId=")
+ .append(sorted.get(i).getKey())
+ .append(", ");
+ }
+ LOG.info(message);
+ }
+ }
+
+ /**
* Check if there are some open requests which have been sent a long time
* ago, and if so resend them.
*/