You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2013/04/23 04:49:49 UTC

svn commit: r1470791 - in /hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase: HConstants.java ipc/HBaseRPC.java ipc/HBaseRpcMetrics.java ipc/HBaseServer.java regionserver/HRegionServer.java

Author: liyin
Date: Tue Apr 23 02:49:48 2013
New Revision: 1470791

URL: http://svn.apache.org/r1470791
Log:
[HBASE-8114][89-fb] Interrupt the RS RPC call when the client connection has been closed

Author: adela

Summary: when the socket connection is closed we are interrupting the current call. Also, we don't want to interrupt the prefatch call just the actual get/next.

Test Plan: to be defined

Reviewers: liyintang, aaiyer, manukranthk

Reviewed By: liyintang

CC: hbase-eng@, rshroff

Differential Revision: https://phabricator.fb.com/D722166

Task ID: 2044405

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1470791&r1=1470790&r2=1470791&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java Tue Apr 23 02:49:48 2013
@@ -678,6 +678,9 @@ public final class HConstants {
   public static final String HISTOGRAM_BASED_METRICS_WINDOW =
       "hbase.histogrambasedmetric.window";
 
+  public static final String CLIENT_SOCKED_CLOSED_EXC_MSG = "Interrupting the read request";
+  public static final String SERVER_INTERRUPTED_CALLS_KEY = "serverInterruptedCalls";
+
   private HConstants() {
     // Can't be instantiated with this constructor.
   }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java?rev=1470791&r1=1470790&r2=1470791&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java Tue Apr 23 02:49:48 2013
@@ -24,10 +24,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.RetriesExhaustedException;
 import org.apache.hadoop.hbase.client.Operation;
 import org.apache.hadoop.hbase.io.HbaseObjectWritable;
-import org.apache.hadoop.hbase.io.hfile.Compression;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -37,7 +35,6 @@ import org.apache.hadoop.hbase.HRegionIn
 import org.apache.hadoop.hbase.util.ParamFormatHelper;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.ipc.VersionedProtocol;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 
 import org.codehaus.jackson.map.ObjectMapper;
@@ -47,15 +44,12 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-import java.lang.reflect.Array;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 
 import java.lang.reflect.Proxy;
-import java.net.ConnectException;
 import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -496,7 +490,6 @@ public class HBaseRPC {
     private final int warnResponseTime;
     private final int warnResponseSize;
 
-
     /**
      * Construct an RPC server.
      * @param instance the instance whose methods will be called
@@ -581,7 +574,6 @@ public class HBaseRPC {
         status.setRPC(call.getMethodName(), call.getParameters(), receivedTime, method);
         status.setRPCPacket(param);
         status.resume("Servicing call");
-
         long startTime = System.currentTimeMillis();
         Object value = method.invoke(instance, call.getParameters());
         int processingTime = (int) (System.currentTimeMillis() - startTime);
@@ -640,7 +632,7 @@ public class HBaseRPC {
       } catch (InvocationTargetException e) {
         Throwable target = e.getTargetException();
         if (target instanceof IOException) {
-          throw (IOException)target;
+          throw (IOException) target;
         }
         IOException ioe = new IOException(target.toString());
         ioe.setStackTrace(target.getStackTrace());

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java?rev=1470791&r1=1470790&r2=1470791&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java Tue Apr 23 02:49:48 2013
@@ -103,7 +103,7 @@ public class HBaseRpcMetrics implements 
     
     rpcProcessingTime.pushMetric(metricsRecord);
     rpcProcessingTime.resetMinMax();
-    
+
     synchronized (registry) {
       // Iterate through the registry to propagate the different rpc metrics.
 

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1470791&r1=1470790&r2=1470791&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Tue Apr 23 02:49:48 2013
@@ -170,7 +170,6 @@ public abstract class HBaseServer {
 
   protected Configuration conf;
 
-  @SuppressWarnings({"FieldCanBeLocal"})
   protected int socketSendBufferSize;
   protected final boolean tcpNoDelay;   // if T then disable Nagle's Algorithm
   protected final boolean tcpKeepAlive; // if T then use keepalives
@@ -285,6 +284,10 @@ public abstract class HBaseServer {
       this.partialResponseSize = partialResponseSize;
     }
 
+    public Connection getConnection() {
+      return this.connection;
+    }
+
     @Override
     public String toString() {
       return param.toString() + " from " + connection.toString();
@@ -813,7 +816,6 @@ public abstract class HBaseServer {
     // Processes one response. Returns true if there are no more pending
     // data for this channel.
     //
-    @SuppressWarnings({"ConstantConditions"})
     private boolean processResponse(final LinkedList<Call> responseQueue,
                                     boolean inHandler) throws IOException {
       boolean error = true;
@@ -1002,7 +1004,7 @@ public abstract class HBaseServer {
 
 
   /** Reads calls from a connection and queues them for handling. */
-  private class Connection {
+  public class Connection {
     private boolean versionRead = false; //if initial signature and
                                          //version are read
     private int version = -1;
@@ -1048,6 +1050,10 @@ public abstract class HBaseServer {
       }
     }
 
+    public Socket getSocket() {
+      return socket;
+    }
+
     @Override
     public String toString() {
       return getHostAddress() + ":" + remotePort;
@@ -1065,10 +1071,6 @@ public abstract class HBaseServer {
       this.lastContact = lastContact;
     }
 
-    public long getLastContact() {
-      return lastContact;
-    }
-
     /* Return true if the connection has no outstanding rpc */
     private boolean isIdle() {
       return rpcCount == 0;
@@ -1586,26 +1588,6 @@ public abstract class HBaseServer {
     return (nBytes > 0) ? nBytes : ret;
   }
 
-  /**
-   * Set the OP_READ interest for the selection key
-   * @param selectionKey
-   */
-  private static void setReadInterest(final SelectionKey selectionKey) {
-    synchronized (selectionKey) {
-      selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_READ);
-    }
-  }
-
-  /**
-   * Unset the OP_READ interest for the selection key
-   * @param selectionKey
-   */
-  private static void unsetReadInterest(final SelectionKey selectionKey) {
-    synchronized (selectionKey) {
-      selectionKey.interestOps(selectionKey.interestOps() & (~SelectionKey.OP_READ));
-    }
-  }
-
   public long getResponseQueueSize(){
     return responseQueuesSizeThrottler.getCurrentValue();
   }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1470791&r1=1470790&r2=1470791&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Apr 23 02:49:48 2013
@@ -623,6 +623,14 @@ public class HRegionServer implements HR
   }
 
   /**
+   * if the current call from a client has closed his connection
+   * @return
+   */
+  public static boolean isCurrentConnectionClosed() {
+    return callContext.get().getConnection().getSocket().isClosed();
+  }
+
+  /**
    * The HRegionServer sticks in this loop until closed. It repeatedly checks
    * in with the HMaster, sending heartbeats & reports, and receiving HRegion
    * load/unload instructions.
@@ -1124,7 +1132,7 @@ public class HRegionServer implements HR
     return createRegionLoad(this.onlineRegions.get(Bytes.mapKey(regionName)));
   }
 
-  /*
+  /**
    * Cleanup after Throwable caught invoking method.  Converts <code>t</code>
    * to IOE if it isn't already.
    * @param t Throwable
@@ -1134,7 +1142,7 @@ public class HRegionServer implements HR
     return cleanup(t, null);
   }
 
-  /*
+  /**
    * Cleanup after Throwable caught invoking method.  Converts <code>t</code>
    * to IOE if it isn't already.
    * @param t Throwable
@@ -1160,7 +1168,7 @@ public class HRegionServer implements HR
     return t;
   }
 
-  /*
+  /**
    * @param t
    * @return Make <code>t</code> an IOE if it isn't already.
    */
@@ -1168,7 +1176,7 @@ public class HRegionServer implements HR
     return convertThrowableToIOE(t, null);
   }
 
-  /*
+  /**
    * @param t
    * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
    * @return Make <code>t</code> an IOE if it isn't already.
@@ -1179,7 +1187,8 @@ public class HRegionServer implements HR
       msg == null || msg.length() == 0?
         new IOException(t): new IOException(msg, t));
   }
-  /*
+
+  /**
    * Check if an OOME and if so, call abort immediately and avoid creating more
    * objects.
    * @param e