You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2015/02/16 18:04:47 UTC
hbase git commit: HBASE-13011 TestLoadIncrementalHFiles is flakey
when using AsyncRpcClient as client implementation Added comment to
AsyncRpcChannel data members
Repository: hbase
Updated Branches:
refs/heads/master 6d72a993e -> e99091e97
HBASE-13011 TestLoadIncrementalHFiles is flakey when using AsyncRpcClient as client implementation
Added comment to AsyncRpcChannel data members
Signed-off-by: stack <st...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e99091e9
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e99091e9
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e99091e9
Branch: refs/heads/master
Commit: e99091e97c374a026ba690cf9d6a4592f3c3fbdd
Parents: 6d72a99
Author: zhangduo <zh...@wandoujia.com>
Authored: Sat Feb 14 08:36:38 2015 +0800
Committer: stack <st...@apache.org>
Committed: Mon Feb 16 09:03:31 2015 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/ipc/AsyncCall.java | 9 +-
.../hadoop/hbase/ipc/AsyncRpcChannel.java | 300 ++++++++-----------
.../apache/hadoop/hbase/ipc/AsyncRpcClient.java | 59 ++--
.../hbase/ipc/AsyncServerResponseHandler.java | 16 +-
4 files changed, 180 insertions(+), 204 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/e99091e9/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
index c35238c..68a494d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
@@ -72,7 +72,7 @@ public class AsyncCall extends DefaultPromise<Message> {
this.responseDefaultType = responseDefaultType;
this.startTime = EnvironmentEdgeManager.currentTime();
- this.rpcTimeout = controller.getCallTimeout();
+ this.rpcTimeout = controller.hasCallTimeout() ? controller.getCallTimeout() : 0;
}
/**
@@ -84,9 +84,10 @@ public class AsyncCall extends DefaultPromise<Message> {
return this.startTime;
}
- @Override public String toString() {
- return "callId: " + this.id + " methodName: " + this.method.getName() + " param {" +
- (this.param != null ? ProtobufUtil.getShortTextFormat(this.param) : "") + "}";
+ @Override
+ public String toString() {
+ return "callId: " + this.id + " methodName: " + this.method.getName() + " param {"
+ + (this.param != null ? ProtobufUtil.getShortTextFormat(this.param) : "") + "}";
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/e99091e9/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
index 054c9b5..ffb2dcf 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
@@ -17,9 +17,6 @@
*/
package org.apache.hadoop.hbase.ipc;
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
@@ -31,6 +28,23 @@ import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import javax.security.sasl.SaslException;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants;
@@ -56,18 +70,9 @@ import org.apache.hadoop.security.token.TokenSelector;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
-import javax.security.sasl.SaslException;
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.InetSocketAddress;
-import java.net.SocketException;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedExceptionAction;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.TimeUnit;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
/**
* Netty RPC channel
@@ -97,8 +102,6 @@ public class AsyncRpcChannel {
final String serviceName;
final InetSocketAddress address;
- ConcurrentSkipListMap<Integer, AsyncCall> calls = new ConcurrentSkipListMap<>();
-
private int ioFailureCounter = 0;
private int connectFailureCounter = 0;
@@ -108,15 +111,18 @@ public class AsyncRpcChannel {
private Token<? extends TokenIdentifier> token;
private String serverPrincipal;
- volatile boolean shouldCloseConnection = false;
- private IOException closeException;
+
+ // NOTE: closed and connected flags below are only changed when a lock on pendingCalls
+ private final Map<Integer, AsyncCall> pendingCalls = new HashMap<Integer, AsyncCall>();
+ private boolean connected = false;
+ private boolean closed = false;
private Timeout cleanupTimer;
private final TimerTask timeoutTask = new TimerTask() {
- @Override public void run(Timeout timeout) throws Exception {
- cleanupTimer = null;
- cleanupCalls(false);
+ @Override
+ public void run(Timeout timeout) throws Exception {
+ cleanupCalls();
}
};
@@ -213,15 +219,20 @@ public class AsyncRpcChannel {
ch.pipeline()
.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
ch.pipeline().addLast(new AsyncServerResponseHandler(this));
-
try {
writeChannelHeader(ch).addListener(new GenericFutureListener<ChannelFuture>() {
- @Override public void operationComplete(ChannelFuture future) throws Exception {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
close(future.cause());
return;
}
- for (AsyncCall call : calls.values()) {
+ List<AsyncCall> callsToWrite;
+ synchronized (pendingCalls) {
+ connected = true;
+ callsToWrite = new ArrayList<AsyncCall>(pendingCalls.values());
+ }
+ for (AsyncCall call : callsToWrite) {
writeRequest(call);
}
}
@@ -240,17 +251,18 @@ public class AsyncRpcChannel {
*/
private SaslClientHandler getSaslHandler(final Bootstrap bootstrap) throws IOException {
return new SaslClientHandler(authMethod, token, serverPrincipal, client.fallbackAllowed,
- client.conf.get("hbase.rpc.protection",
- SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase()),
- new SaslClientHandler.SaslExceptionHandler() {
- @Override public void handle(int retryCount, Random random, Throwable cause) {
+ client.conf.get("hbase.rpc.protection", SaslUtil.QualityOfProtection.AUTHENTICATION.name()
+ .toLowerCase()), new SaslClientHandler.SaslExceptionHandler() {
+ @Override
+ public void handle(int retryCount, Random random, Throwable cause) {
try {
// Handle Sasl failure. Try to potentially get new credentials
handleSaslConnectionFailure(retryCount, cause, ticket.getUGI());
// Try to reconnect
AsyncRpcClient.WHEEL_TIMER.newTimeout(new TimerTask() {
- @Override public void run(Timeout timeout) throws Exception {
+ @Override
+ public void run(Timeout timeout) throws Exception {
connect(bootstrap);
}
}, random.nextInt(reloginMaxBackoff) + 1, TimeUnit.MILLISECONDS);
@@ -259,10 +271,11 @@ public class AsyncRpcChannel {
}
}
}, new SaslClientHandler.SaslSuccessfulConnectHandler() {
- @Override public void onSuccess(Channel channel) {
- startHBaseConnection(channel);
- }
- });
+ @Override
+ public void onSuccess(Channel channel) {
+ startHBaseConnection(channel);
+ }
+ });
}
/**
@@ -295,66 +308,50 @@ public class AsyncRpcChannel {
public Promise<Message> callMethod(final Descriptors.MethodDescriptor method,
final PayloadCarryingRpcController controller, final Message request,
final Message responsePrototype) {
- if (shouldCloseConnection) {
- Promise<Message> promise = channel.eventLoop().newPromise();
- promise.setFailure(new ConnectException());
- return promise;
- }
-
- final AsyncCall call = new AsyncCall(channel.eventLoop(), client.callIdCnt.getAndIncrement(),
- method, request, controller, responsePrototype);
-
+ final AsyncCall call =
+ new AsyncCall(channel.eventLoop(), client.callIdCnt.getAndIncrement(), method, request,
+ controller, responsePrototype);
controller.notifyOnCancel(new RpcCallback<Object>() {
@Override
public void run(Object parameter) {
- calls.remove(call.id);
+ // TODO: do not need to call AsyncCall.setFailed?
+ synchronized (pendingCalls) {
+ pendingCalls.remove(call.id);
+ }
}
});
+ // TODO: this should be handled by PayloadCarryingRpcController.
if (controller.isCanceled()) {
// To finish if the call was cancelled before we set the notification (race condition)
call.cancel(true);
return call;
}
- calls.put(call.id, call);
-
- // check again, see https://issues.apache.org/jira/browse/HBASE-12951
- if (shouldCloseConnection) {
- Promise<Message> promise = channel.eventLoop().newPromise();
- promise.setFailure(new ConnectException());
- return promise;
- }
-
- // Add timeout for cleanup if none is present
- if (cleanupTimer == null) {
- cleanupTimer = AsyncRpcClient.WHEEL_TIMER.newTimeout(timeoutTask, call.getRpcTimeout(),
- TimeUnit.MILLISECONDS);
- }
-
- if(channel.isActive()) {
- writeRequest(call);
+ synchronized (pendingCalls) {
+ if (closed) {
+ Promise<Message> promise = channel.eventLoop().newPromise();
+ promise.setFailure(new ConnectException());
+ return promise;
+ }
+ pendingCalls.put(call.id, call);
+ // Add timeout for cleanup if none is present
+ if (cleanupTimer == null && call.getRpcTimeout() > 0) {
+ cleanupTimer =
+ AsyncRpcClient.WHEEL_TIMER.newTimeout(timeoutTask, call.getRpcTimeout(),
+ TimeUnit.MILLISECONDS);
+ }
+ if (!connected) {
+ return call;
+ }
}
-
+ writeRequest(call);
return call;
}
- /**
- * Calls method and returns a promise
- * @param method to call
- * @param controller to run call with
- * @param request to send
- * @param responsePrototype for response message
- * @return Promise to listen to result
- * @throws java.net.ConnectException on connection failures
- */
- public Promise<Message> callMethodWithPromise(
- final Descriptors.MethodDescriptor method, final PayloadCarryingRpcController controller,
- final Message request, final Message responsePrototype) throws ConnectException {
- if (shouldCloseConnection || !channel.isOpen()) {
- throw new ConnectException();
+ AsyncCall removePendingCall(int id) {
+ synchronized (pendingCalls) {
+ return pendingCalls.remove(id);
}
-
- return this.callMethod(method, controller, request, responsePrototype);
}
/**
@@ -400,10 +397,6 @@ public class AsyncRpcChannel {
*/
private void writeRequest(final AsyncCall call) {
try {
- if (shouldCloseConnection) {
- return;
- }
-
final RPCProtos.RequestHeader.Builder requestHeaderBuilder = RPCProtos.RequestHeader
.newBuilder();
requestHeaderBuilder.setCallId(call.id)
@@ -439,26 +432,13 @@ public class AsyncRpcChannel {
IPCUtil.write(out, rh, call.param, cellBlock);
}
- channel.writeAndFlush(b).addListener(new CallWriteListener(this,call));
+ channel.writeAndFlush(b).addListener(new CallWriteListener(this, call.id));
} catch (IOException e) {
- if (!shouldCloseConnection) {
- close(e);
- }
+ close(e);
}
}
/**
- * Fail a call
- *
- * @param call to fail
- * @param cause of fail
- */
- void failCall(AsyncCall call, IOException cause) {
- calls.remove(call.id);
- call.setFailed(cause);
- }
-
- /**
* Set up server authorization
*
* @throws java.io.IOException if auth setup failed
@@ -550,18 +530,22 @@ public class AsyncRpcChannel {
* @param e exception on close
*/
public void close(final Throwable e) {
- client.removeConnection(ConnectionId.hashCode(ticket,serviceName,address));
+ client.removeConnection(this);
// Move closing from the requesting thread to the channel thread
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
- if (shouldCloseConnection) {
- return;
+ List<AsyncCall> toCleanup;
+ synchronized (pendingCalls) {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ toCleanup = new ArrayList<AsyncCall>(pendingCalls.values());
+ pendingCalls.clear();
}
-
- shouldCloseConnection = true;
-
+ IOException closeException = null;
if (e != null) {
if (e instanceof IOException) {
closeException = (IOException) e;
@@ -569,16 +553,19 @@ public class AsyncRpcChannel {
closeException = new IOException(e);
}
}
-
// log the info
if (LOG.isDebugEnabled() && closeException != null) {
- LOG.debug(name + ": closing ipc connection to " + address + ": " +
- closeException.getMessage());
+ LOG.debug(name + ": closing ipc connection to " + address, closeException);
+ }
+ if (cleanupTimer != null) {
+ cleanupTimer.cancel();
+ cleanupTimer = null;
+ }
+ for (AsyncCall call : toCleanup) {
+ call.setFailed(closeException != null ? closeException : new ConnectionClosingException(
+ "Call id=" + call.id + " on server " + address + " aborted: connection is closing"));
}
-
- cleanupCalls(true);
channel.disconnect().addListener(ChannelFutureListener.CLOSE);
-
if (LOG.isDebugEnabled()) {
LOG.debug(name + ": closed");
}
@@ -591,64 +578,37 @@ public class AsyncRpcChannel {
*
* @param cleanAll true if all calls should be cleaned, false for only the timed out calls
*/
- public void cleanupCalls(boolean cleanAll) {
- // Cancel outstanding timers
- if (cleanupTimer != null) {
- cleanupTimer.cancel();
- cleanupTimer = null;
- }
-
- if (cleanAll) {
- for (AsyncCall call : calls.values()) {
- synchronized (call) {
- // Calls can be done on another thread so check before failing them
- if(!call.isDone()) {
- if (closeException == null) {
- failCall(call, new ConnectionClosingException("Call id=" + call.id +
- " on server " + address + " aborted: connection is closing"));
- } else {
- failCall(call, closeException);
- }
- }
- }
- }
- } else {
- for (AsyncCall call : calls.values()) {
- long waitTime = EnvironmentEdgeManager.currentTime() - call.getStartTime();
+ private void cleanupCalls() {
+ List<AsyncCall> toCleanup = new ArrayList<AsyncCall>();
+ long currentTime = EnvironmentEdgeManager.currentTime();
+ long nextCleanupTaskDelay = -1L;
+ synchronized (pendingCalls) {
+ for (Iterator<AsyncCall> iter = pendingCalls.values().iterator(); iter.hasNext();) {
+ AsyncCall call = iter.next();
long timeout = call.getRpcTimeout();
- if (timeout > 0 && waitTime >= timeout) {
- synchronized (call) {
- // Calls can be done on another thread so check before failing them
- if (!call.isDone()) {
- closeException = new CallTimeoutException("Call id=" + call.id +
- ", waitTime=" + waitTime + ", rpcTimeout=" + timeout);
- failCall(call, closeException);
+ if (timeout > 0) {
+ if (currentTime - call.getStartTime() >= timeout) {
+ iter.remove();
+ toCleanup.add(call);
+ } else {
+ if (nextCleanupTaskDelay < 0 || timeout < nextCleanupTaskDelay) {
+ nextCleanupTaskDelay = timeout;
}
}
- } else {
- // We expect the call to be ordered by timeout. It may not be the case, but stopping
- // at the first valid call allows to be sure that we still have something to do without
- // spending too much time by reading the full list.
- break;
}
}
-
- if (!calls.isEmpty()) {
- AsyncCall firstCall = calls.firstEntry().getValue();
-
- final long newTimeout;
- long maxWaitTime = EnvironmentEdgeManager.currentTime() - firstCall.getStartTime();
- if (maxWaitTime < firstCall.getRpcTimeout()) {
- newTimeout = firstCall.getRpcTimeout() - maxWaitTime;
- } else {
- newTimeout = 0;
- }
-
- closeException = null;
- cleanupTimer = AsyncRpcClient.WHEEL_TIMER.newTimeout(timeoutTask,
- newTimeout, TimeUnit.MILLISECONDS);
+ if (nextCleanupTaskDelay > 0) {
+ cleanupTimer =
+ AsyncRpcClient.WHEEL_TIMER.newTimeout(timeoutTask, nextCleanupTaskDelay,
+ TimeUnit.MILLISECONDS);
+ } else {
+ cleanupTimer = null;
}
}
+ for (AsyncCall call : toCleanup) {
+ call.setFailed(new CallTimeoutException("Call id=" + call.id + ", waitTime="
+ + (currentTime - call.getRpcTimeout()) + ", rpcTimeout=" + call.getRpcTimeout()));
+ }
}
/**
@@ -745,6 +705,10 @@ public class AsyncRpcChannel {
});
}
+ public int getConnectionHashCode() {
+ return ConnectionId.hashCode(ticket, serviceName, address);
+ }
+
@Override
public String toString() {
return this.address.toString() + "/" + this.serviceName + "/" + this.ticket;
@@ -755,20 +719,22 @@ public class AsyncRpcChannel {
*/
private static final class CallWriteListener implements ChannelFutureListener {
private final AsyncRpcChannel rpcChannel;
- private final AsyncCall call;
+ private final int id;
- public CallWriteListener(AsyncRpcChannel asyncRpcChannel, AsyncCall call) {
+ public CallWriteListener(AsyncRpcChannel asyncRpcChannel, int id) {
this.rpcChannel = asyncRpcChannel;
- this.call = call;
+ this.id = id;
}
- @Override public void operationComplete(ChannelFuture future) throws Exception {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
- if(!this.call.isDone()) {
+ AsyncCall call = rpcChannel.removePendingCall(id);
+ if (call != null) {
if (future.cause() instanceof IOException) {
- rpcChannel.failCall(call, (IOException) future.cause());
+ call.setFailed((IOException) future.cause());
} else {
- rpcChannel.failCall(call, new IOException(future.cause()));
+ call.setFailed(new IOException(future.cause()));
}
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e99091e9/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
index 30b622a..192e583 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
@@ -17,12 +17,6 @@
*/
package org.apache.hadoop.hbase.ipc;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcChannel;
-import com.google.protobuf.RpcController;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
@@ -38,6 +32,16 @@ import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HConstants;
@@ -49,13 +53,12 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.PoolMap;
import org.apache.hadoop.hbase.util.Threads;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcChannel;
+import com.google.protobuf.RpcController;
/**
* Netty client for the requests and responses
@@ -169,16 +172,16 @@ public class AsyncRpcClient extends AbstractRpcClient {
* @throws InterruptedException if call is interrupted
* @throws java.io.IOException if a connection failure is encountered
*/
- @Override protected Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc,
+ @Override
+ protected Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc,
Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket,
InetSocketAddress addr) throws IOException, InterruptedException {
-
final AsyncRpcChannel connection = createRpcChannel(md.getService().getName(), addr, ticket);
- Promise<Message> promise = connection.callMethodWithPromise(md, pcrc, param, returnType);
-
+ Promise<Message> promise = connection.callMethod(md, pcrc, param, returnType);
+ long timeout = pcrc.hasCallTimeout() ? pcrc.getCallTimeout() : 0;
try {
- Message response = promise.get();
+ Message response = timeout > 0 ? promise.get(timeout, TimeUnit.MILLISECONDS) : promise.get();
return new Pair<>(response, pcrc.cellScanner());
} catch (ExecutionException e) {
if (e.getCause() instanceof IOException) {
@@ -186,6 +189,8 @@ public class AsyncRpcClient extends AbstractRpcClient {
} else {
throw new IOException(e.getCause());
}
+ } catch (TimeoutException e) {
+ throw new CallTimeoutException(promise.toString());
}
}
@@ -337,12 +342,20 @@ public class AsyncRpcClient extends AbstractRpcClient {
/**
* Remove connection from pool
- *
- * @param connectionHashCode of connection
*/
- public void removeConnection(int connectionHashCode) {
+ public void removeConnection(AsyncRpcChannel connection) {
+ int connectionHashCode = connection.getConnectionHashCode();
synchronized (connections) {
- this.connections.remove(connectionHashCode);
+ // we use address as cache key, so we should check here to prevent removing the
+ // wrong connection
+ AsyncRpcChannel connectionInPool = this.connections.get(connectionHashCode);
+ if (connectionInPool == connection) {
+ this.connections.remove(connectionHashCode);
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("%s already removed, expected instance %08x, actual %08x",
+ connection.toString(), System.identityHashCode(connection),
+ System.identityHashCode(connectionInPool)));
+ }
}
}
@@ -399,4 +412,4 @@ public class AsyncRpcClient extends AbstractRpcClient {
this.rpcClient.callMethod(md, pcrc, param, returnType, this.ticket, this.isa, done);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e99091e9/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
index d71bf5e..a900140 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
@@ -17,11 +17,13 @@
*/
package org.apache.hadoop.hbase.ipc;
-import com.google.protobuf.Message;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
+
+import java.io.IOException;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.CellScanner;
@@ -29,7 +31,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
import org.apache.hadoop.ipc.RemoteException;
-import java.io.IOException;
+import com.google.protobuf.Message;
/**
* Handles Hbase responses
@@ -52,16 +54,12 @@ public class AsyncServerResponseHandler extends ChannelInboundHandlerAdapter {
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf inBuffer = (ByteBuf) msg;
ByteBufInputStream in = new ByteBufInputStream(inBuffer);
-
- if (channel.shouldCloseConnection) {
- return;
- }
int totalSize = inBuffer.readableBytes();
try {
// Read the header
RPCProtos.ResponseHeader responseHeader = RPCProtos.ResponseHeader.parseDelimitedFrom(in);
int id = responseHeader.getCallId();
- AsyncCall call = channel.calls.get(id);
+ AsyncCall call = channel.removePendingCall(id);
if (call == null) {
// So we got a response for which we have no corresponding 'call' here on the client-side.
// We probably timed out waiting, cleaned up all references, and now the server decides
@@ -85,7 +83,7 @@ public class AsyncServerResponseHandler extends ChannelInboundHandlerAdapter {
equals(FatalConnectionException.class.getName())) {
channel.close(re);
} else {
- channel.failCall(call, re);
+ call.setFailed(re);
}
} else {
Message value = null;
@@ -104,13 +102,11 @@ public class AsyncServerResponseHandler extends ChannelInboundHandlerAdapter {
}
call.setSuccess(value, cellBlockScanner);
}
- channel.calls.remove(id);
} catch (IOException e) {
// Treat this as a fatal condition and close this connection
channel.close(e);
} finally {
inBuffer.release();
- channel.cleanupCalls(false);
}
}