You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2012/08/31 02:05:37 UTC

svn commit: r1379235 - in /hbase/trunk: hbase-common/src/main/java/org/apache/hadoop/hbase/ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ hbase-server/src/test/java/org/apache/h...

Author: larsh
Date: Fri Aug 31 00:05:36 2012
New Revision: 1379235

URL: http://svn.apache.org/viewvc?rev=1379235&view=rev
Log:
HBASE-6165 Replication can overrun .META. scans on cluster re-start (Himanshu Vashishtha)

Modified:
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java

Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1379235&r1=1379234&r2=1379235&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java Fri Aug 31 00:05:36 2012
@@ -701,6 +701,16 @@ public final class HConstants {
   /** Configuration key for the directory to backup HFiles for a table */
   public static final String HFILE_ARCHIVE_DIRECTORY = "hbase.table.archive.directory";
 
+  /**
+   * QOS attributes: these attributes are used to demarcate RPC call processing
+   * by different set of handlers. For example, HIGH_QOS tagged methods are
+   * handled by high priority handlers.
+   */
+  public static final int NORMAL_QOS = 0;
+  public static final int QOS_THRESHOLD = 10;
+  public static final int HIGH_QOS = 100;
+  public static final int REPLICATION_QOS = 5; // normal_QOS < replication_QOS < high_QOS
+  
   private HConstants() {
     // Can't be instantiated with this ctor.
   }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java?rev=1379235&r1=1379234&r2=1379235&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java Fri Aug 31 00:05:36 2012
@@ -109,6 +109,8 @@ public class HBaseRpcMetrics implements 
          new MetricsTimeVaryingInt("rpcAuthorizationSuccesses", registry);
   public MetricsTimeVaryingRate rpcSlowResponseTime =
       new MetricsTimeVaryingRate("RpcSlowResponse", registry);
+  public final MetricsIntValue replicationCallQueueLen =
+    new MetricsIntValue("replicationCallQueueLen", registry);
 
   private void initMethods(Class<? extends VersionedProtocol> protocol) {
     for (Method m : protocol.getDeclaredMethods()) {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1379235&r1=1379234&r2=1379235&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Fri Aug 31 00:05:36 2012
@@ -275,6 +275,11 @@ public abstract class HBaseServer implem
   protected int numConnections = 0;
   private Handler[] handlers = null;
   private Handler[] priorityHandlers = null;
+  /** replication related queue; */
+  private BlockingQueue<Call> replicationQueue;
+  private int numOfReplicationHandlers = 0;
+  private Handler[] replicationHandlers = null;
+  
   protected HBaseRPCErrorHandler errorHandler = null;
 
   /**
@@ -1650,6 +1655,10 @@ public abstract class HBaseServer implem
       if (priorityCallQueue != null && getQosLevel(rpcRequestBody) > highPriorityLevel) {
         priorityCallQueue.put(call);
         updateCallQueueLenMetrics(priorityCallQueue);
+      } else if (replicationQueue != null
+          && getQosLevel(rpcRequestBody) == HConstants.REPLICATION_QOS) {
+        replicationQueue.put(call);
+        updateCallQueueLenMetrics(replicationQueue);
       } else {
         callQueue.put(call);              // queue the call; maybe blocked here
         updateCallQueueLenMetrics(callQueue);
@@ -1732,6 +1741,8 @@ public abstract class HBaseServer implem
       rpcMetrics.callQueueLen.set(callQueue.size());
     } else if (queue == priorityCallQueue) {
       rpcMetrics.priorityCallQueueLen.set(priorityCallQueue.size());
+    } else if (queue == replicationQueue) {
+      rpcMetrics.replicationCallQueueLen.set(replicationQueue.size());
     } else {
       LOG.warn("Unknown call queue");
     }
@@ -1751,6 +1762,8 @@ public abstract class HBaseServer implem
       if (cq == priorityCallQueue) {
         // this is just an amazing hack, but it works.
         threadName = "PRI " + threadName;
+      } else if (cq == replicationQueue) {
+        threadName = "REPL " + threadName;
       }
       this.setName(threadName);
       this.status = TaskMonitor.get().createRPCStatus(threadName);
@@ -1917,7 +1930,10 @@ public abstract class HBaseServer implem
     this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);
     this.purgeTimeout = conf.getLong("ipc.client.call.purge.timeout",
                                      2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
-
+    this.numOfReplicationHandlers = conf.getInt("hbase.regionserver.replication.handler.count", 3);
+    if (numOfReplicationHandlers > 0) {
+      this.replicationQueue = new LinkedBlockingQueue<Call>(maxQueueSize);
+    }
     // Start the listener here and let it bind to the port
     listener = new Listener();
     this.port = listener.getAddress().getPort();
@@ -2011,22 +2027,23 @@ public abstract class HBaseServer implem
   public synchronized void startThreads() {
     responder.start();
     listener.start();
-    handlers = new Handler[handlerCount];
+    handlers = startHandlers(callQueue, handlerCount);
+    priorityHandlers = startHandlers(priorityCallQueue, priorityHandlerCount);
+    replicationHandlers = startHandlers(replicationQueue, numOfReplicationHandlers);
+  }
 
-    for (int i = 0; i < handlerCount; i++) {
-      handlers[i] = new Handler(callQueue, i);
+  private Handler[] startHandlers(BlockingQueue<Call> queue, int numOfHandlers) {
+    if (numOfHandlers <= 0) {
+      return null;
+    }
+    Handler[] handlers = new Handler[numOfHandlers];
+    for (int i = 0; i < numOfHandlers; i++) {
+      handlers[i] = new Handler(queue, i);
       handlers[i].start();
     }
-
-    if (priorityHandlerCount > 0) {
-      priorityHandlers = new Handler[priorityHandlerCount];
-      for (int i = 0 ; i < priorityHandlerCount; i++) {
-        priorityHandlers[i] = new Handler(priorityCallQueue, i);
-        priorityHandlers[i].start();
-      }
-    }
+    return handlers;
   }
-
+  
   public SecretManager<? extends TokenIdentifier> getSecretManager() {
     return this.secretManager;
   }
@@ -2040,20 +2057,9 @@ public abstract class HBaseServer implem
   public synchronized void stop() {
     LOG.info("Stopping server on " + port);
     running = false;
-    if (handlers != null) {
-      for (Handler handler : handlers) {
-        if (handler != null) {
-          handler.interrupt();
-        }
-      }
-    }
-    if (priorityHandlers != null) {
-      for (Handler handler : priorityHandlers) {
-        if (handler != null) {
-          handler.interrupt();
-        }
-      }
-    }
+    stopHandlers(handlers);
+    stopHandlers(priorityHandlers);
+    stopHandlers(replicationHandlers);
     listener.interrupt();
     listener.doStop();
     responder.interrupt();
@@ -2063,6 +2069,16 @@ public abstract class HBaseServer implem
     }
   }
 
+  private void stopHandlers(Handler[] handlers) {
+    if (handlers != null) {
+      for (Handler handler : handlers) {
+        if (handler != null) {
+          handler.interrupt();
+        }
+      }
+    }
+  }
+  
   /** Wait for the server to be stopped.
    * Does not wait for all subthreads to finish.
    *  See {@link #stop()}.

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1379235&r1=1379234&r2=1379235&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Fri Aug 31 00:05:36 2012
@@ -27,13 +27,10 @@ import java.lang.annotation.RetentionPol
 import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryUsage;
 import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.BindException;
 import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -43,13 +40,11 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.SortedSet;
 import java.util.Map.Entry;
 import java.util.Random;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
-import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -96,7 +91,6 @@ import org.apache.hadoop.hbase.client.De
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.Increment;
-import org.apache.hadoop.hbase.client.MultiAction;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
@@ -117,7 +111,6 @@ import org.apache.hadoop.hbase.ipc.Copro
 import org.apache.hadoop.hbase.ipc.HBaseRPC;
 import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
 import org.apache.hadoop.hbase.ipc.HBaseRpcMetrics;
-import org.apache.hadoop.hbase.ipc.Invocation;
 import org.apache.hadoop.hbase.ipc.ProtocolSignature;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
@@ -209,7 +202,6 @@ import org.apache.hadoop.hbase.zookeeper
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.metrics.util.MBeanUtil;
 import org.apache.hadoop.net.DNS;
@@ -231,7 +223,6 @@ import org.apache.hadoop.hbase.RegionSer
 
 import com.google.common.base.Function;
 import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.Message;
 import com.google.protobuf.RpcController;
 
@@ -305,10 +296,6 @@ public class  HRegionServer implements C
   protected volatile boolean fsOk;
   protected HFileSystem fs;
 
-  protected static final int NORMAL_QOS = 0;
-  protected static final int QOS_THRESHOLD = 10;  // the line between low and high qos
-  protected static final int HIGH_QOS = 100;
-
   // Set when a report to the master comes back with a message asking us to
   // shutdown. Also set by call to stop when debugging or running unit tests
   // of HRegionServer in isolation.
@@ -522,7 +509,7 @@ public class  HRegionServer implements C
         conf.getInt("hbase.regionserver.handler.count", 10),
         conf.getInt("hbase.regionserver.metahandler.count", 10),
         conf.getBoolean("hbase.rpc.verbose", false),
-        conf, QOS_THRESHOLD);
+        conf, HConstants.QOS_THRESHOLD);
     // Set our address.
     this.isa = this.rpcServer.getListenerAddress();
 
@@ -674,7 +661,7 @@ public class  HRegionServer implements C
       }
 
       if (rpcArgClass == null || from.getRequest().isEmpty()) {
-        return NORMAL_QOS;
+        return HConstants.NORMAL_QOS;
       }
       Object deserializedRequestObj = null;
       //check whether the request has reference to Meta region
@@ -690,7 +677,7 @@ public class  HRegionServer implements C
           if (LOG.isDebugEnabled()) {
             LOG.debug("High priority: " + from.toString());
           }
-          return HIGH_QOS;
+          return HConstants.HIGH_QOS;
         }
       } catch (Exception ex) {
         throw new RuntimeException(ex);
@@ -699,20 +686,20 @@ public class  HRegionServer implements C
       if (methodName.equals("scan")) { // scanner methods...
         ScanRequest request = (ScanRequest)deserializedRequestObj;
         if (!request.hasScannerId()) {
-          return NORMAL_QOS;
+          return HConstants.NORMAL_QOS;
         }
         RegionScanner scanner = hRegionServer.getScanner(request.getScannerId());
         if (scanner != null && scanner.getRegionInfo().isMetaRegion()) {
           if (LOG.isDebugEnabled()) {
             LOG.debug("High priority scanner request: " + request.getScannerId());
           }
-          return HIGH_QOS;
+          return HConstants.HIGH_QOS;
         }
       }
       if (LOG.isDebugEnabled()) {
         LOG.debug("Low priority: " + from.toString());
       }
-      return NORMAL_QOS;
+      return HConstants.NORMAL_QOS;
     }
   }
 
@@ -2182,7 +2169,7 @@ public class  HRegionServer implements C
   }
 
   @Override
-  @QosPriority(priority=HIGH_QOS)
+  @QosPriority(priority=HConstants.HIGH_QOS)
   public ProtocolSignature getProtocolSignature(
       String protocol, long version, int clientMethodsHashCode)
   throws IOException {
@@ -2195,7 +2182,7 @@ public class  HRegionServer implements C
   }
 
   @Override
-  @QosPriority(priority=HIGH_QOS)
+  @QosPriority(priority=HConstants.HIGH_QOS)
   public long getProtocolVersion(final String protocol, final long clientVersion)
   throws IOException {
     if (protocol.equals(ClientProtocol.class.getName())) {
@@ -3187,7 +3174,7 @@ public class  HRegionServer implements C
    * @throws ServiceException
    */
   @Override
-  @QosPriority(priority=HIGH_QOS)
+  @QosPriority(priority=HConstants.HIGH_QOS)
   public UnlockRowResponse unlockRow(final RpcController controller,
       final UnlockRowRequest request) throws ServiceException {
     try {
@@ -3393,7 +3380,7 @@ public class  HRegionServer implements C
 // Start Admin methods
 
   @Override
-  @QosPriority(priority=HIGH_QOS)
+  @QosPriority(priority=HConstants.HIGH_QOS)
   public GetRegionInfoResponse getRegionInfo(final RpcController controller,
       final GetRegionInfoRequest request) throws ServiceException {
     try {
@@ -3440,7 +3427,7 @@ public class  HRegionServer implements C
   }
 
   @Override
-  @QosPriority(priority=HIGH_QOS)
+  @QosPriority(priority=HConstants.HIGH_QOS)
   public GetOnlineRegionResponse getOnlineRegion(final RpcController controller,
       final GetOnlineRegionRequest request) throws ServiceException {
     try {
@@ -3468,7 +3455,7 @@ public class  HRegionServer implements C
    * @throws ServiceException
    */
   @Override
-  @QosPriority(priority=HIGH_QOS)
+  @QosPriority(priority=HConstants.HIGH_QOS)
   public OpenRegionResponse openRegion(final RpcController controller, final OpenRegionRequest request)
   throws ServiceException {
     int versionOfOfflineNode = -1;
@@ -3555,7 +3542,7 @@ public class  HRegionServer implements C
    * @throws ServiceException
    */
   @Override
-  @QosPriority(priority=HIGH_QOS)
+  @QosPriority(priority=HConstants.HIGH_QOS)
   public CloseRegionResponse closeRegion(final RpcController controller,
       final CloseRegionRequest request) throws ServiceException {
     int versionOfClosingNode = -1;
@@ -3594,7 +3581,7 @@ public class  HRegionServer implements C
    * @throws ServiceException
    */
   @Override
-  @QosPriority(priority=HIGH_QOS)
+  @QosPriority(priority=HConstants.HIGH_QOS)
   public FlushRegionResponse flushRegion(final RpcController controller,
       final FlushRegionRequest request) throws ServiceException {
     try {
@@ -3625,7 +3612,7 @@ public class  HRegionServer implements C
    * @throws ServiceException
    */
   @Override
-  @QosPriority(priority=HIGH_QOS)
+  @QosPriority(priority=HConstants.HIGH_QOS)
   public SplitRegionResponse splitRegion(final RpcController controller,
       final SplitRegionRequest request) throws ServiceException {
     try {
@@ -3654,7 +3641,7 @@ public class  HRegionServer implements C
    * @throws ServiceException
    */
   @Override
-  @QosPriority(priority=HIGH_QOS)
+  @QosPriority(priority=HConstants.HIGH_QOS)
   public CompactRegionResponse compactRegion(final RpcController controller,
       final CompactRegionRequest request) throws ServiceException {
     try {
@@ -3688,7 +3675,7 @@ public class  HRegionServer implements C
    * @throws ServiceException
    */
   @Override
-  @QosPriority(priority=HIGH_QOS)
+  @QosPriority(priority=HConstants.REPLICATION_QOS)
   public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
       final ReplicateWALEntryRequest request) throws ServiceException {
     try {

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java?rev=1379235&r1=1379234&r2=1379235&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java Fri Aug 31 00:05:36 2012
@@ -25,6 +25,7 @@ import static org.junit.Assert.*;
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.SmallTests;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
@@ -85,7 +86,7 @@ public class TestPriorityRpc {
     Mockito.when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo);
     Mockito.when(mockRegionInfo.isMetaRegion()).thenReturn(true);
     qosFunction.setRegionServer(mockRS);
-    assertTrue (qosFunction.apply(rpcRequest) == HRegionServer.HIGH_QOS);
+    assertTrue (qosFunction.apply(rpcRequest) == HConstants.HIGH_QOS);
   }
 
   @Test
@@ -99,7 +100,7 @@ public class TestPriorityRpc {
     rpcRequestBuilder.setRequestClassName(GetOnlineRegionRequest.class.getCanonicalName());
     RpcRequestBody rpcRequest = rpcRequestBuilder.build();
     QosFunction qosFunc = regionServer.getQosFunction();
-    assertTrue (qosFunc.apply(rpcRequest) == HRegionServer.NORMAL_QOS);
+    assertTrue (qosFunc.apply(rpcRequest) == HConstants.NORMAL_QOS);
   }
 
   @Test
@@ -112,7 +113,7 @@ public class TestPriorityRpc {
     ByteString requestBody = scanBuilder.build().toByteString();
     rpcRequestBuilder.setRequest(requestBody);
     RpcRequestBody rpcRequest = rpcRequestBuilder.build();
-    assertTrue (qosFunction.apply(rpcRequest) == HRegionServer.NORMAL_QOS);
+    assertTrue (qosFunction.apply(rpcRequest) == HConstants.NORMAL_QOS);
 
     //build a scan request with scannerID
     scanBuilder = ScanRequest.newBuilder();
@@ -134,11 +135,11 @@ public class TestPriorityRpc {
 
     qosFunction.setRegionServer(mockRS);
 
-    assertTrue (qosFunction.apply(rpcRequest) == HRegionServer.HIGH_QOS);
+    assertTrue (qosFunction.apply(rpcRequest) == HConstants.HIGH_QOS);
 
     //the same as above but with non-meta region
     Mockito.when(mockRegionInfo.isMetaRegion()).thenReturn(false);
-    assertTrue (qosFunction.apply(rpcRequest) == HRegionServer.NORMAL_QOS);
+    assertTrue (qosFunction.apply(rpcRequest) == HConstants.NORMAL_QOS);
   }
 
   @org.junit.Rule