You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2013/04/23 04:49:49 UTC
svn commit: r1470791 - in
/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase:
HConstants.java ipc/HBaseRPC.java ipc/HBaseRpcMetrics.java
ipc/HBaseServer.java regionserver/HRegionServer.java
Author: liyin
Date: Tue Apr 23 02:49:48 2013
New Revision: 1470791
URL: http://svn.apache.org/r1470791
Log:
[HBASE-8114][89-fb] Interrupt the RS RPC call when the client connection has been closed
Author: adela
Summary: when the socket connection is closed we are interrupting the current call. Also, we don't want to interrupt the prefatch call just the actual get/next.
Test Plan: to be defined
Reviewers: liyintang, aaiyer, manukranthk
Reviewed By: liyintang
CC: hbase-eng@, rshroff
Differential Revision: https://phabricator.fb.com/D722166
Task ID: 2044405
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1470791&r1=1470790&r2=1470791&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java Tue Apr 23 02:49:48 2013
@@ -678,6 +678,9 @@ public final class HConstants {
public static final String HISTOGRAM_BASED_METRICS_WINDOW =
"hbase.histogrambasedmetric.window";
+ public static final String CLIENT_SOCKED_CLOSED_EXC_MSG = "Interrupting the read request";
+ public static final String SERVER_INTERRUPTED_CALLS_KEY = "serverInterruptedCalls";
+
private HConstants() {
// Can't be instantiated with this constructor.
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java?rev=1470791&r1=1470790&r2=1470791&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java Tue Apr 23 02:49:48 2013
@@ -24,10 +24,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.client.Operation;
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
-import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
@@ -37,7 +35,6 @@ import org.apache.hadoop.hbase.HRegionIn
import org.apache.hadoop.hbase.util.ParamFormatHelper;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.VersionedProtocol;
-import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.codehaus.jackson.map.ObjectMapper;
@@ -47,15 +44,12 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.lang.reflect.Array;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
-import java.net.ConnectException;
import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -496,7 +490,6 @@ public class HBaseRPC {
private final int warnResponseTime;
private final int warnResponseSize;
-
/**
* Construct an RPC server.
* @param instance the instance whose methods will be called
@@ -581,7 +574,6 @@ public class HBaseRPC {
status.setRPC(call.getMethodName(), call.getParameters(), receivedTime, method);
status.setRPCPacket(param);
status.resume("Servicing call");
-
long startTime = System.currentTimeMillis();
Object value = method.invoke(instance, call.getParameters());
int processingTime = (int) (System.currentTimeMillis() - startTime);
@@ -640,7 +632,7 @@ public class HBaseRPC {
} catch (InvocationTargetException e) {
Throwable target = e.getTargetException();
if (target instanceof IOException) {
- throw (IOException)target;
+ throw (IOException) target;
}
IOException ioe = new IOException(target.toString());
ioe.setStackTrace(target.getStackTrace());
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java?rev=1470791&r1=1470790&r2=1470791&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java Tue Apr 23 02:49:48 2013
@@ -103,7 +103,7 @@ public class HBaseRpcMetrics implements
rpcProcessingTime.pushMetric(metricsRecord);
rpcProcessingTime.resetMinMax();
-
+
synchronized (registry) {
// Iterate through the registry to propagate the different rpc metrics.
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1470791&r1=1470790&r2=1470791&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Tue Apr 23 02:49:48 2013
@@ -170,7 +170,6 @@ public abstract class HBaseServer {
protected Configuration conf;
- @SuppressWarnings({"FieldCanBeLocal"})
protected int socketSendBufferSize;
protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
protected final boolean tcpKeepAlive; // if T then use keepalives
@@ -285,6 +284,10 @@ public abstract class HBaseServer {
this.partialResponseSize = partialResponseSize;
}
+ public Connection getConnection() {
+ return this.connection;
+ }
+
@Override
public String toString() {
return param.toString() + " from " + connection.toString();
@@ -813,7 +816,6 @@ public abstract class HBaseServer {
// Processes one response. Returns true if there are no more pending
// data for this channel.
//
- @SuppressWarnings({"ConstantConditions"})
private boolean processResponse(final LinkedList<Call> responseQueue,
boolean inHandler) throws IOException {
boolean error = true;
@@ -1002,7 +1004,7 @@ public abstract class HBaseServer {
/** Reads calls from a connection and queues them for handling. */
- private class Connection {
+ public class Connection {
private boolean versionRead = false; //if initial signature and
//version are read
private int version = -1;
@@ -1048,6 +1050,10 @@ public abstract class HBaseServer {
}
}
+ public Socket getSocket() {
+ return socket;
+ }
+
@Override
public String toString() {
return getHostAddress() + ":" + remotePort;
@@ -1065,10 +1071,6 @@ public abstract class HBaseServer {
this.lastContact = lastContact;
}
- public long getLastContact() {
- return lastContact;
- }
-
/* Return true if the connection has no outstanding rpc */
private boolean isIdle() {
return rpcCount == 0;
@@ -1586,26 +1588,6 @@ public abstract class HBaseServer {
return (nBytes > 0) ? nBytes : ret;
}
- /**
- * Set the OP_READ interest for the selection key
- * @param selectionKey
- */
- private static void setReadInterest(final SelectionKey selectionKey) {
- synchronized (selectionKey) {
- selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_READ);
- }
- }
-
- /**
- * Unset the OP_READ interest for the selection key
- * @param selectionKey
- */
- private static void unsetReadInterest(final SelectionKey selectionKey) {
- synchronized (selectionKey) {
- selectionKey.interestOps(selectionKey.interestOps() & (~SelectionKey.OP_READ));
- }
- }
-
public long getResponseQueueSize(){
return responseQueuesSizeThrottler.getCurrentValue();
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1470791&r1=1470790&r2=1470791&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Apr 23 02:49:48 2013
@@ -623,6 +623,14 @@ public class HRegionServer implements HR
}
/**
+ * if the current call from a client has closed his connection
+ * @return
+ */
+ public static boolean isCurrentConnectionClosed() {
+ return callContext.get().getConnection().getSocket().isClosed();
+ }
+
+ /**
* The HRegionServer sticks in this loop until closed. It repeatedly checks
* in with the HMaster, sending heartbeats & reports, and receiving HRegion
* load/unload instructions.
@@ -1124,7 +1132,7 @@ public class HRegionServer implements HR
return createRegionLoad(this.onlineRegions.get(Bytes.mapKey(regionName)));
}
- /*
+ /**
* Cleanup after Throwable caught invoking method. Converts <code>t</code>
* to IOE if it isn't already.
* @param t Throwable
@@ -1134,7 +1142,7 @@ public class HRegionServer implements HR
return cleanup(t, null);
}
- /*
+ /**
* Cleanup after Throwable caught invoking method. Converts <code>t</code>
* to IOE if it isn't already.
* @param t Throwable
@@ -1160,7 +1168,7 @@ public class HRegionServer implements HR
return t;
}
- /*
+ /**
* @param t
* @return Make <code>t</code> an IOE if it isn't already.
*/
@@ -1168,7 +1176,7 @@ public class HRegionServer implements HR
return convertThrowableToIOE(t, null);
}
- /*
+ /**
* @param t
* @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
* @return Make <code>t</code> an IOE if it isn't already.
@@ -1179,7 +1187,8 @@ public class HRegionServer implements HR
msg == null || msg.length() == 0?
new IOException(t): new IOException(msg, t));
}
- /*
+
+ /**
* Check if an OOME and if so, call abort immediately and avoid creating more
* objects.
* @param e