You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2012/08/31 02:08:00 UTC

svn commit: r1379236 - in /hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase: HConstants.java ipc/HBaseRpcMetrics.java ipc/HBaseServer.java regionserver/HRegionServer.java

Author: larsh
Date: Fri Aug 31 00:08:00 2012
New Revision: 1379236

URL: http://svn.apache.org/viewvc?rev=1379236&view=rev
Log:
HBASE-6165 Replication can overrun .META. scans on cluster re-start (Himanshu Vashishtha)

Modified:
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/HConstants.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1379236&r1=1379235&r2=1379236&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/HConstants.java Fri Aug 31 00:08:00 2012
@@ -650,6 +650,17 @@ public final class HConstants {
   public static final String ENABLE_WAL_COMPRESSION =
     "hbase.regionserver.wal.enablecompression";
 
+  /**
+   * QOS attributes: these attributes are used to demarcate RPC call processing
+   * by different set of handlers. For example, HIGH_QOS tagged methods are
+   * handled by high priority handlers.
+   */
+  public static final int NORMAL_QOS = 0;
+  public static final int QOS_THRESHOLD = 10;
+  public static final int HIGH_QOS = 100;
+  public static final int REPLICATION_QOS = 5; // normal_QOS < replication_QOS < high_QOS
+
+  
   private HConstants() {
     // Can't be instantiated with this ctor.
   }

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java?rev=1379236&r1=1379235&r2=1379236&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java Fri Aug 31 00:08:00 2012
@@ -98,6 +98,8 @@ public class HBaseRpcMetrics implements 
          new MetricsTimeVaryingInt("rpcAuthorizationSuccesses", registry);
   public MetricsTimeVaryingRate rpcSlowResponseTime =
       new MetricsTimeVaryingRate("RpcSlowResponse", registry);
+  public final MetricsIntValue replicationCallQueueLen =
+    new MetricsIntValue("replicationCallQueueLen", registry);
 
   private void initMethods(Class<? extends VersionedProtocol> protocol) {
     for (Method m : protocol.getDeclaredMethods()) {

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1379236&r1=1379235&r2=1379236&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Fri Aug 31 00:08:00 2012
@@ -225,6 +225,10 @@ public abstract class HBaseServer implem
   protected int numConnections = 0;
   private Handler[] handlers = null;
   private Handler[] priorityHandlers = null;
+  /** replication related queue; */
+  private BlockingQueue<Call> replicationQueue;
+  private int numOfReplicationHandlers = 0;
+  private Handler[] replicationHandlers = null;
   protected HBaseRPCErrorHandler errorHandler = null;
 
   /**
@@ -1299,8 +1303,11 @@ public abstract class HBaseServer implem
       if (priorityCallQueue != null && getQosLevel(param) > highPriorityLevel) {
         priorityCallQueue.put(call);
         updateCallQueueLenMetrics(priorityCallQueue);
+      } else if (replicationQueue != null && getQosLevel(param) == HConstants.REPLICATION_QOS) {
+        replicationQueue.put(call);
+        updateCallQueueLenMetrics(replicationQueue);
       } else {
-        callQueue.put(call);              // queue the call; maybe blocked here
+        callQueue.put(call); // queue the call; maybe blocked here
         updateCallQueueLenMetrics(callQueue);
       }
     }
@@ -1327,6 +1334,8 @@ public abstract class HBaseServer implem
       rpcMetrics.callQueueLen.set(callQueue.size());
     } else if (queue == priorityCallQueue) {
       rpcMetrics.priorityCallQueueLen.set(priorityCallQueue.size());
+    } else if (queue == replicationQueue) {
+      rpcMetrics.replicationCallQueueLen.set(replicationQueue.size());
     } else {
       LOG.warn("Unknown call queue");
     }
@@ -1345,6 +1354,8 @@ public abstract class HBaseServer implem
       if (cq == priorityCallQueue) {
         // this is just an amazing hack, but it works.
         threadName = "PRI " + threadName;
+      } else if (cq == replicationQueue) {
+        threadName = "REPL " + threadName;
       }
       this.setName(threadName);
       this.status = TaskMonitor.get().createRPCStatus(threadName);
@@ -1513,7 +1524,11 @@ public abstract class HBaseServer implem
     this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);
     this.purgeTimeout = conf.getLong("ipc.client.call.purge.timeout",
                                      2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
-
+    this.numOfReplicationHandlers = 
+      conf.getInt("hbase.regionserver.replication.handler.count", 3);
+    if (numOfReplicationHandlers > 0) {
+      this.replicationQueue = new LinkedBlockingQueue<Call>(maxQueueSize);
+    }
     // Start the listener here and let it bind to the port
     listener = new Listener();
     this.port = listener.getAddress().getPort();
@@ -1613,20 +1628,21 @@ public abstract class HBaseServer implem
   public synchronized void startThreads() {
     responder.start();
     listener.start();
-    handlers = new Handler[handlerCount];
-
-    for (int i = 0; i < handlerCount; i++) {
-      handlers[i] = new Handler(callQueue, i);
-      handlers[i].start();
+    handlers = startHandlers(callQueue, handlerCount);
+    priorityHandlers = startHandlers(priorityCallQueue, priorityHandlerCount);
+    replicationHandlers = startHandlers(replicationQueue, numOfReplicationHandlers);
     }
 
-    if (priorityHandlerCount > 0) {
-      priorityHandlers = new Handler[priorityHandlerCount];
-      for (int i = 0 ; i < priorityHandlerCount; i++) {
-        priorityHandlers[i] = new Handler(priorityCallQueue, i);
-        priorityHandlers[i].start();
-      }
+  private Handler[] startHandlers(BlockingQueue<Call> queue, int numOfHandlers) {
+    if (numOfHandlers <= 0) {
+      return null;
+    }
+    Handler[] handlers = new Handler[numOfHandlers];
+    for (int i = 0; i < numOfHandlers; i++) {
+      handlers[i] = new Handler(queue, i);
+      handlers[i].start();
     }
+    return handlers;
   }
 
   /** Stops the service.  No new calls will be handled after this is called. */
@@ -1634,20 +1650,9 @@ public abstract class HBaseServer implem
   public synchronized void stop() {
     LOG.info("Stopping server on " + port);
     running = false;
-    if (handlers != null) {
-      for (Handler handler : handlers) {
-        if (handler != null) {
-          handler.interrupt();
-        }
-      }
-    }
-    if (priorityHandlers != null) {
-      for (Handler handler : priorityHandlers) {
-        if (handler != null) {
-          handler.interrupt();
-        }
-      }
-    }
+    stopHandlers(handlers);
+    stopHandlers(priorityHandlers);
+    stopHandlers(replicationHandlers);
     listener.interrupt();
     listener.doStop();
     responder.interrupt();
@@ -1657,6 +1662,16 @@ public abstract class HBaseServer implem
     }
   }
 
+  private void stopHandlers(Handler[] handlers) {
+    if (handlers != null) {
+      for (Handler handler : handlers) {
+        if (handler != null) {
+          handler.interrupt();
+        }
+      }
+    }
+  }
+
   /** Wait for the server to be stopped.
    * Does not wait for all subthreads to finish.
    *  See {@link #stop()}.

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1379236&r1=1379235&r2=1379236&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Fri Aug 31 00:08:00 2012
@@ -415,7 +415,7 @@ public class HRegionServer implements HR
         conf.getInt("hbase.regionserver.handler.count", 10),
         conf.getInt("hbase.regionserver.metahandler.count", 10),
         conf.getBoolean("hbase.rpc.verbose", false),
-        conf, QOS_THRESHOLD);
+        conf, HConstants.QOS_THRESHOLD);
     // Set our address.
     this.isa = this.rpcServer.getListenerAddress();
 
@@ -447,9 +447,6 @@ public class HRegionServer implements HR
     }
   }
 
-  private static final int NORMAL_QOS = 0;
-  private static final int QOS_THRESHOLD = 10;  // the line between low and high qos
-  private static final int HIGH_QOS = 100;
 
   @Retention(RetentionPolicy.RUNTIME)
   private @interface QosPriority {
@@ -487,7 +484,7 @@ public class HRegionServer implements HR
 
     @Override
     public Integer apply(Writable from) {
-      if (!(from instanceof Invocation)) return NORMAL_QOS;
+      if (!(from instanceof Invocation)) return HConstants.NORMAL_QOS;
 
       Invocation inv = (Invocation) from;
       String methodName = inv.getMethodName();
@@ -505,13 +502,13 @@ public class HRegionServer implements HR
           scannerId = (Long) inv.getParameters()[0];
         } catch (ClassCastException ignored) {
           // LOG.debug("Low priority: " + from);
-          return NORMAL_QOS; // doh.
+          return HConstants.NORMAL_QOS;
         }
         String scannerIdString = Long.toString(scannerId);
         RegionScanner scanner = scanners.get(scannerIdString);
         if (scanner != null && scanner.getRegionInfo().isMetaRegion()) {
           // LOG.debug("High priority scanner request: " + scannerId);
-          return HIGH_QOS;
+          return HConstants.HIGH_QOS;
         }
       } else if (inv.getParameterClasses().length == 0) {
        // Just let it through.  This is getOnlineRegions, etc.
@@ -521,7 +518,7 @@ public class HRegionServer implements HR
           // LOG.debug("High priority with method: " + methodName +
           // " and region: "
           // + Bytes.toString((byte[]) inv.getParameters()[0]));
-          return HIGH_QOS;
+          return HConstants.HIGH_QOS;
         }
       } else if (inv.getParameterClasses()[0] == MultiAction.class) {
         MultiAction<?> ma = (MultiAction<?>) inv.getParameters()[0];
@@ -537,12 +534,12 @@ public class HRegionServer implements HR
           if (isMetaRegion(region)) {
             // LOG.debug("High priority multi with region: " +
             // Bytes.toString(region));
-            return HIGH_QOS; // short circuit for the win.
+            return HConstants.HIGH_QOS; // short circuit for the win.
           }
         }
       }
       // LOG.debug("Low priority: " + from.toString());
-      return NORMAL_QOS;
+      return HConstants.NORMAL_QOS;
     }
   }
 
@@ -1924,7 +1921,7 @@ public class HRegionServer implements HR
   }
 
   @Override
-  @QosPriority(priority=HIGH_QOS)
+  @QosPriority(priority=HConstants.HIGH_QOS)
   public HRegionInfo getRegionInfo(final byte[] regionName)
   throws NotServingRegionException, IOException {
     checkOpen();
@@ -2604,7 +2601,7 @@ public class HRegionServer implements HR
   }
 
   @Override
-  @QosPriority(priority=HIGH_QOS)
+  @QosPriority(priority=HConstants.HIGH_QOS)
   public void unlockRow(byte[] regionName, long lockId) throws IOException {
     checkOpen();
     NullPointerException npe = null;
@@ -2686,14 +2683,14 @@ public class HRegionServer implements HR
   // Region open/close direct RPCs
 
   @Override
-  @QosPriority(priority=HIGH_QOS)
+  @QosPriority(priority=HConstants.HIGH_QOS)
   public RegionOpeningState openRegion(HRegionInfo region)
   throws IOException {
     return openRegion(region, -1);
   }
 
   @Override
-  @QosPriority(priority = HIGH_QOS)
+  @QosPriority(priority = HConstants.HIGH_QOS)
   public RegionOpeningState openRegion(HRegionInfo region, int versionOfOfflineNode)
       throws IOException {
     return openRegion(region, versionOfOfflineNode, null);
@@ -2761,7 +2758,7 @@ public class HRegionServer implements HR
   }
 
   @Override
-  @QosPriority(priority=HIGH_QOS)
+  @QosPriority(priority=HConstants.HIGH_QOS)
   public void openRegions(List<HRegionInfo> regions)
   throws IOException {
     checkOpen();
@@ -2771,14 +2768,14 @@ public class HRegionServer implements HR
   }
 
   @Override
-  @QosPriority(priority=HIGH_QOS)
+  @QosPriority(priority=HConstants.HIGH_QOS)
   public boolean closeRegion(HRegionInfo region)
   throws IOException {
     return closeRegion(region, true, -1);
   }
 
   @Override
-  @QosPriority(priority=HIGH_QOS)
+  @QosPriority(priority=HConstants.HIGH_QOS)
   public boolean closeRegion(final HRegionInfo region,
     final int versionOfClosingNode)
   throws IOException {
@@ -2786,13 +2783,13 @@ public class HRegionServer implements HR
   }
 
   @Override
-  @QosPriority(priority=HIGH_QOS)
+  @QosPriority(priority=HConstants.HIGH_QOS)
   public boolean closeRegion(HRegionInfo region, final boolean zk)
   throws IOException {
     return closeRegion(region, zk, -1);
   }
 
-  @QosPriority(priority=HIGH_QOS)
+  @QosPriority(priority=HConstants.HIGH_QOS)
   protected boolean closeRegion(HRegionInfo region, final boolean zk,
     final int versionOfClosingNode)
   throws IOException {
@@ -2811,7 +2808,7 @@ public class HRegionServer implements HR
   }
 
   @Override
-  @QosPriority(priority=HIGH_QOS)
+  @QosPriority(priority=HConstants.HIGH_QOS)
   public boolean closeRegion(byte[] encodedRegionName, boolean zk)
     throws IOException {
     return closeRegion(encodedRegionName, false, zk);
@@ -2891,7 +2888,7 @@ public class HRegionServer implements HR
   // Manual remote region administration RPCs
 
   @Override
-  @QosPriority(priority=HIGH_QOS)
+  @QosPriority(priority=HConstants.HIGH_QOS)
   public void flushRegion(HRegionInfo regionInfo)
       throws NotServingRegionException, IOException {
     checkOpen();
@@ -2901,7 +2898,7 @@ public class HRegionServer implements HR
   }
 
   @Override
-  @QosPriority(priority=HIGH_QOS)
+  @QosPriority(priority=HConstants.HIGH_QOS)
   public void splitRegion(HRegionInfo regionInfo)
       throws NotServingRegionException, IOException {
     splitRegion(regionInfo, null);
@@ -2918,7 +2915,7 @@ public class HRegionServer implements HR
   }
 
   @Override
-  @QosPriority(priority=HIGH_QOS)
+  @QosPriority(priority=HConstants.HIGH_QOS)
   public void compactRegion(HRegionInfo regionInfo, boolean major)
       throws NotServingRegionException, IOException {
     checkOpen();
@@ -2964,7 +2961,7 @@ public class HRegionServer implements HR
   }
 
   @Override
-  @QosPriority(priority=HIGH_QOS)
+  @QosPriority(priority=HConstants.HIGH_QOS)
   public List<HRegionInfo> getOnlineRegions() throws IOException {
     checkOpen();
     List<HRegionInfo> list = new ArrayList<HRegionInfo>(onlineRegions.size());
@@ -3161,7 +3158,7 @@ public class HRegionServer implements HR
   }
 
   @Override
-  @QosPriority(priority=HIGH_QOS)
+  @QosPriority(priority=HConstants.HIGH_QOS)
   public ProtocolSignature getProtocolSignature(
       String protocol, long version, int clientMethodsHashCode)
   throws IOException {
@@ -3172,7 +3169,7 @@ public class HRegionServer implements HR
   }
 
   @Override
-  @QosPriority(priority=HIGH_QOS)
+  @QosPriority(priority=HConstants.HIGH_QOS)
   public long getProtocolVersion(final String protocol, final long clientVersion)
   throws IOException {
     if (protocol.equals(HRegionInterface.class.getName())) {
@@ -3334,7 +3331,7 @@ public class HRegionServer implements HR
    * @deprecated Use {@link #getServerName()} instead.
    */
   @Override
-  @QosPriority(priority=HIGH_QOS)
+  @QosPriority(priority=HConstants.HIGH_QOS)
   public HServerInfo getHServerInfo() throws IOException {
     checkOpen();
     return new HServerInfo(new HServerAddress(this.isa),
@@ -3632,7 +3629,7 @@ public class HRegionServer implements HR
   }
 
   @Override
-  @QosPriority(priority=HIGH_QOS)
+  @QosPriority(priority=HConstants.REPLICATION_QOS)
   public void replicateLogEntries(final HLog.Entry[] entries)
   throws IOException {
     checkOpen();