You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by er...@apache.org on 2013/02/05 19:13:15 UTC
git commit: GIRAPH-497: Limiting number of open requests doesn't work
with multithreading (majakabiljo via ereisman)
Updated Branches:
refs/heads/trunk 62c12fa0b -> b022dce9f
GIRAPH-497: Limiting number of open requests doesn't work with multithreading (majakabiljo via ereisman)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/b022dce9
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/b022dce9
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/b022dce9
Branch: refs/heads/trunk
Commit: b022dce9f32f2f48015a2b1489e4d72f08916bab
Parents: 62c12fa
Author: Eli Reisman <er...@apache.org>
Authored: Tue Feb 5 10:08:15 2013 -0800
Committer: Eli Reisman <er...@apache.org>
Committed: Tue Feb 5 10:08:15 2013 -0800
----------------------------------------------------------------------
CHANGELOG | 2 +
.../org/apache/giraph/comm/netty/NettyClient.java | 115 +++++++++------
2 files changed, 70 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/b022dce9/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 2d25746..d50e1e5 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-497: Limiting number of open requests doesn't work with multithreading (majakabiljo via ereisman)
+
GIRAPH-461: Convert static assignment of in-memory partitions with LRU cache (claudio)
GIRAPH-494: Make Edge an interface (nitay)
http://git-wip-us.apache.org/repos/asf/giraph/blob/b022dce9/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
index ed92d82..76d38e2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -65,6 +65,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import static org.jboss.netty.channel.Channels.pipeline;
@@ -150,6 +151,9 @@ public class NettyClient {
private final ExecutionHandler executionHandler;
/** Name of the handler before the execution handler (if used) */
private final String handlerBeforeExecutionHandler;
+ /** When was the last time we checked if we should resend some requests */
+ private final AtomicLong lastTimeCheckedRequestsForProblems =
+ new AtomicLong(0);
/**
* Only constructor
@@ -640,6 +644,8 @@ public class NettyClient {
addressRequestIdGenerator.getNextRequestId(remoteServer));
ClientRequestId clientRequestId =
new ClientRequestId(destTaskId, request.getRequestId());
+ ChannelFuture writeFuture = channel.write(request);
+ newRequestInfo.setWriteFuture(writeFuture);
RequestInfo oldRequestInfo = clientRequestIdRequestInfoMap.putIfAbsent(
clientRequestId, newRequestInfo);
if (oldRequestInfo != null) {
@@ -648,8 +654,6 @@ public class NettyClient {
"request info of " + oldRequestInfo);
}
}
- ChannelFuture writeFuture = channel.write(request);
- newRequestInfo.setWriteFuture(writeFuture);
if (limitNumberOfOpenRequests &&
clientRequestIdRequestInfoMap.size() > maxNumberOfOpenRequests) {
@@ -679,9 +683,6 @@ public class NettyClient {
* complete
*/
private void waitSomeRequests(int maxOpenRequests) {
- List<ClientRequestId> addedRequestIds = Lists.newArrayList();
- List<RequestInfo> addedRequestInfos = Lists.newArrayList();
-
while (clientRequestIdRequestInfoMap.size() > maxOpenRequests) {
// Wait for requests to complete for some time
if (LOG.isInfoEnabled() && requestLogger.isPrintable()) {
@@ -712,53 +713,73 @@ public class NettyClient {
// Make sure that waiting doesn't kill the job
context.progress();
- // Check all the requests for problems
- for (Map.Entry<ClientRequestId, RequestInfo> entry :
- clientRequestIdRequestInfoMap.entrySet()) {
- RequestInfo requestInfo = entry.getValue();
- ChannelFuture writeFuture = requestInfo.getWriteFuture();
- // If not connected anymore, request failed, or the request is taking
- // too long, re-establish and resend
- if (!writeFuture.getChannel().isConnected() ||
- (writeFuture.isDone() && !writeFuture.isSuccess()) ||
- (requestInfo.getElapsedMsecs() > maxRequestMilliseconds)) {
- LOG.warn("waitSomeRequests: Problem with request id " +
- entry.getKey() + " connected = " +
- writeFuture.getChannel().isConnected() +
- ", future done = " + writeFuture.isDone() + ", " +
- "success = " + writeFuture.isSuccess() + ", " +
- "cause = " + writeFuture.getCause() + ", " +
- "elapsed time = " + requestInfo.getElapsedMsecs() + ", " +
- "destination = " + writeFuture.getChannel().getRemoteAddress() +
- " " + requestInfo);
- addedRequestIds.add(entry.getKey());
- addedRequestInfos.add(new RequestInfo(
- requestInfo.getDestinationAddress(), requestInfo.getRequest()));
- }
+ checkRequestsForProblems();
+ }
+ }
+
+ /**
+ * Check if there are some open requests which have been sent a long time
+ * ago, and if so resend them.
+ */
+ private void checkRequestsForProblems() {
+ long lastTimeChecked = lastTimeCheckedRequestsForProblems.get();
+ // If not enough time passed from the previous check, return
+ if (System.currentTimeMillis() < lastTimeChecked + waitingRequestMsecs) {
+ return;
+ }
+ // If another thread did the check already, return
+ if (!lastTimeCheckedRequestsForProblems.compareAndSet(lastTimeChecked,
+ System.currentTimeMillis())) {
+ return;
+ }
+ List<ClientRequestId> addedRequestIds = Lists.newArrayList();
+ List<RequestInfo> addedRequestInfos = Lists.newArrayList();
+ // Check all the requests for problems
+ for (Map.Entry<ClientRequestId, RequestInfo> entry :
+ clientRequestIdRequestInfoMap.entrySet()) {
+ RequestInfo requestInfo = entry.getValue();
+ ChannelFuture writeFuture = requestInfo.getWriteFuture();
+ // If not connected anymore, request failed, or the request is taking
+ // too long, re-establish and resend
+ if (!writeFuture.getChannel().isConnected() ||
+ (writeFuture.isDone() && !writeFuture.isSuccess()) ||
+ (requestInfo.getElapsedMsecs() > maxRequestMilliseconds)) {
+ LOG.warn("checkRequestsForProblems: Problem with request id " +
+ entry.getKey() + " connected = " +
+ writeFuture.getChannel().isConnected() +
+ ", future done = " + writeFuture.isDone() + ", " +
+ "success = " + writeFuture.isSuccess() + ", " +
+ "cause = " + writeFuture.getCause() + ", " +
+ "elapsed time = " + requestInfo.getElapsedMsecs() + ", " +
+ "destination = " + writeFuture.getChannel().getRemoteAddress() +
+ " " + requestInfo);
+ addedRequestIds.add(entry.getKey());
+ addedRequestInfos.add(new RequestInfo(
+ requestInfo.getDestinationAddress(), requestInfo.getRequest()));
}
+ }
- // Add any new requests to the system, connect if necessary, and re-send
- for (int i = 0; i < addedRequestIds.size(); ++i) {
- ClientRequestId requestId = addedRequestIds.get(i);
- RequestInfo requestInfo = addedRequestInfos.get(i);
+ // Add any new requests to the system, connect if necessary, and re-send
+ for (int i = 0; i < addedRequestIds.size(); ++i) {
+ ClientRequestId requestId = addedRequestIds.get(i);
+ RequestInfo requestInfo = addedRequestInfos.get(i);
- if (clientRequestIdRequestInfoMap.put(requestId, requestInfo) ==
- null) {
- LOG.warn("waitSomeRequests: Request " + requestId +
- " completed prior to sending the next request");
- clientRequestIdRequestInfoMap.remove(requestId);
- }
- InetSocketAddress remoteServer = requestInfo.getDestinationAddress();
- Channel channel = getNextChannel(remoteServer);
- if (LOG.isInfoEnabled()) {
- LOG.info("waitSomeRequests: Re-issuing request " + requestInfo);
- }
- ChannelFuture writeFuture = channel.write(requestInfo.getRequest());
- requestInfo.setWriteFuture(writeFuture);
+ if (clientRequestIdRequestInfoMap.put(requestId, requestInfo) ==
+ null) {
+ LOG.warn("checkRequestsForProblems: Request " + requestId +
+ " completed prior to sending the next request");
+ clientRequestIdRequestInfoMap.remove(requestId);
+ }
+ InetSocketAddress remoteServer = requestInfo.getDestinationAddress();
+ Channel channel = getNextChannel(remoteServer);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("checkRequestsForProblems: Re-issuing request " + requestInfo);
}
- addedRequestIds.clear();
- addedRequestInfos.clear();
+ ChannelFuture writeFuture = channel.write(requestInfo.getRequest());
+ requestInfo.setWriteFuture(writeFuture);
}
+ addedRequestIds.clear();
+ addedRequestInfos.clear();
}
/**