You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2012/08/31 02:08:00 UTC
svn commit: r1379236 - in
/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase: HConstants.java
ipc/HBaseRpcMetrics.java ipc/HBaseServer.java regionserver/HRegionServer.java
Author: larsh
Date: Fri Aug 31 00:08:00 2012
New Revision: 1379236
URL: http://svn.apache.org/viewvc?rev=1379236&view=rev
Log:
HBASE-6165 Replication can overrun .META. scans on cluster re-start (Himanshu Vashishtha)
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/HConstants.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1379236&r1=1379235&r2=1379236&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/HConstants.java Fri Aug 31 00:08:00 2012
@@ -650,6 +650,17 @@ public final class HConstants {
public static final String ENABLE_WAL_COMPRESSION =
"hbase.regionserver.wal.enablecompression";
+ /**
+ * QOS attributes: these attributes are used to demarcate RPC call processing
+ * by different set of handlers. For example, HIGH_QOS tagged methods are
+ * handled by high priority handlers.
+ */
+ public static final int NORMAL_QOS = 0;
+ public static final int QOS_THRESHOLD = 10;
+ public static final int HIGH_QOS = 100;
+ public static final int REPLICATION_QOS = 5; // normal_QOS < replication_QOS < high_QOS
+
+
private HConstants() {
// Can't be instantiated with this ctor.
}
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java?rev=1379236&r1=1379235&r2=1379236&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java Fri Aug 31 00:08:00 2012
@@ -98,6 +98,8 @@ public class HBaseRpcMetrics implements
new MetricsTimeVaryingInt("rpcAuthorizationSuccesses", registry);
public MetricsTimeVaryingRate rpcSlowResponseTime =
new MetricsTimeVaryingRate("RpcSlowResponse", registry);
+ public final MetricsIntValue replicationCallQueueLen =
+ new MetricsIntValue("replicationCallQueueLen", registry);
private void initMethods(Class<? extends VersionedProtocol> protocol) {
for (Method m : protocol.getDeclaredMethods()) {
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1379236&r1=1379235&r2=1379236&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Fri Aug 31 00:08:00 2012
@@ -225,6 +225,10 @@ public abstract class HBaseServer implem
protected int numConnections = 0;
private Handler[] handlers = null;
private Handler[] priorityHandlers = null;
+ /** replication related queue; */
+ private BlockingQueue<Call> replicationQueue;
+ private int numOfReplicationHandlers = 0;
+ private Handler[] replicationHandlers = null;
protected HBaseRPCErrorHandler errorHandler = null;
/**
@@ -1299,8 +1303,11 @@ public abstract class HBaseServer implem
if (priorityCallQueue != null && getQosLevel(param) > highPriorityLevel) {
priorityCallQueue.put(call);
updateCallQueueLenMetrics(priorityCallQueue);
+ } else if (replicationQueue != null && getQosLevel(param) == HConstants.REPLICATION_QOS) {
+ replicationQueue.put(call);
+ updateCallQueueLenMetrics(replicationQueue);
} else {
- callQueue.put(call); // queue the call; maybe blocked here
+ callQueue.put(call); // queue the call; maybe blocked here
updateCallQueueLenMetrics(callQueue);
}
}
@@ -1327,6 +1334,8 @@ public abstract class HBaseServer implem
rpcMetrics.callQueueLen.set(callQueue.size());
} else if (queue == priorityCallQueue) {
rpcMetrics.priorityCallQueueLen.set(priorityCallQueue.size());
+ } else if (queue == replicationQueue) {
+ rpcMetrics.replicationCallQueueLen.set(replicationQueue.size());
} else {
LOG.warn("Unknown call queue");
}
@@ -1345,6 +1354,8 @@ public abstract class HBaseServer implem
if (cq == priorityCallQueue) {
// this is just an amazing hack, but it works.
threadName = "PRI " + threadName;
+ } else if (cq == replicationQueue) {
+ threadName = "REPL " + threadName;
}
this.setName(threadName);
this.status = TaskMonitor.get().createRPCStatus(threadName);
@@ -1513,7 +1524,11 @@ public abstract class HBaseServer implem
this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);
this.purgeTimeout = conf.getLong("ipc.client.call.purge.timeout",
2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
-
+ this.numOfReplicationHandlers =
+ conf.getInt("hbase.regionserver.replication.handler.count", 3);
+ if (numOfReplicationHandlers > 0) {
+ this.replicationQueue = new LinkedBlockingQueue<Call>(maxQueueSize);
+ }
// Start the listener here and let it bind to the port
listener = new Listener();
this.port = listener.getAddress().getPort();
@@ -1613,20 +1628,21 @@ public abstract class HBaseServer implem
public synchronized void startThreads() {
responder.start();
listener.start();
- handlers = new Handler[handlerCount];
-
- for (int i = 0; i < handlerCount; i++) {
- handlers[i] = new Handler(callQueue, i);
- handlers[i].start();
+ handlers = startHandlers(callQueue, handlerCount);
+ priorityHandlers = startHandlers(priorityCallQueue, priorityHandlerCount);
+ replicationHandlers = startHandlers(replicationQueue, numOfReplicationHandlers);
}
- if (priorityHandlerCount > 0) {
- priorityHandlers = new Handler[priorityHandlerCount];
- for (int i = 0 ; i < priorityHandlerCount; i++) {
- priorityHandlers[i] = new Handler(priorityCallQueue, i);
- priorityHandlers[i].start();
- }
+ private Handler[] startHandlers(BlockingQueue<Call> queue, int numOfHandlers) {
+ if (numOfHandlers <= 0) {
+ return null;
+ }
+ Handler[] handlers = new Handler[numOfHandlers];
+ for (int i = 0; i < numOfHandlers; i++) {
+ handlers[i] = new Handler(queue, i);
+ handlers[i].start();
}
+ return handlers;
}
/** Stops the service. No new calls will be handled after this is called. */
@@ -1634,20 +1650,9 @@ public abstract class HBaseServer implem
public synchronized void stop() {
LOG.info("Stopping server on " + port);
running = false;
- if (handlers != null) {
- for (Handler handler : handlers) {
- if (handler != null) {
- handler.interrupt();
- }
- }
- }
- if (priorityHandlers != null) {
- for (Handler handler : priorityHandlers) {
- if (handler != null) {
- handler.interrupt();
- }
- }
- }
+ stopHandlers(handlers);
+ stopHandlers(priorityHandlers);
+ stopHandlers(replicationHandlers);
listener.interrupt();
listener.doStop();
responder.interrupt();
@@ -1657,6 +1662,16 @@ public abstract class HBaseServer implem
}
}
+ private void stopHandlers(Handler[] handlers) {
+ if (handlers != null) {
+ for (Handler handler : handlers) {
+ if (handler != null) {
+ handler.interrupt();
+ }
+ }
+ }
+ }
+
/** Wait for the server to be stopped.
* Does not wait for all subthreads to finish.
* See {@link #stop()}.
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1379236&r1=1379235&r2=1379236&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Fri Aug 31 00:08:00 2012
@@ -415,7 +415,7 @@ public class HRegionServer implements HR
conf.getInt("hbase.regionserver.handler.count", 10),
conf.getInt("hbase.regionserver.metahandler.count", 10),
conf.getBoolean("hbase.rpc.verbose", false),
- conf, QOS_THRESHOLD);
+ conf, HConstants.QOS_THRESHOLD);
// Set our address.
this.isa = this.rpcServer.getListenerAddress();
@@ -447,9 +447,6 @@ public class HRegionServer implements HR
}
}
- private static final int NORMAL_QOS = 0;
- private static final int QOS_THRESHOLD = 10; // the line between low and high qos
- private static final int HIGH_QOS = 100;
@Retention(RetentionPolicy.RUNTIME)
private @interface QosPriority {
@@ -487,7 +484,7 @@ public class HRegionServer implements HR
@Override
public Integer apply(Writable from) {
- if (!(from instanceof Invocation)) return NORMAL_QOS;
+ if (!(from instanceof Invocation)) return HConstants.NORMAL_QOS;
Invocation inv = (Invocation) from;
String methodName = inv.getMethodName();
@@ -505,13 +502,13 @@ public class HRegionServer implements HR
scannerId = (Long) inv.getParameters()[0];
} catch (ClassCastException ignored) {
// LOG.debug("Low priority: " + from);
- return NORMAL_QOS; // doh.
+ return HConstants.NORMAL_QOS;
}
String scannerIdString = Long.toString(scannerId);
RegionScanner scanner = scanners.get(scannerIdString);
if (scanner != null && scanner.getRegionInfo().isMetaRegion()) {
// LOG.debug("High priority scanner request: " + scannerId);
- return HIGH_QOS;
+ return HConstants.HIGH_QOS;
}
} else if (inv.getParameterClasses().length == 0) {
// Just let it through. This is getOnlineRegions, etc.
@@ -521,7 +518,7 @@ public class HRegionServer implements HR
// LOG.debug("High priority with method: " + methodName +
// " and region: "
// + Bytes.toString((byte[]) inv.getParameters()[0]));
- return HIGH_QOS;
+ return HConstants.HIGH_QOS;
}
} else if (inv.getParameterClasses()[0] == MultiAction.class) {
MultiAction<?> ma = (MultiAction<?>) inv.getParameters()[0];
@@ -537,12 +534,12 @@ public class HRegionServer implements HR
if (isMetaRegion(region)) {
// LOG.debug("High priority multi with region: " +
// Bytes.toString(region));
- return HIGH_QOS; // short circuit for the win.
+ return HConstants.HIGH_QOS; // short circuit for the win.
}
}
}
// LOG.debug("Low priority: " + from.toString());
- return NORMAL_QOS;
+ return HConstants.NORMAL_QOS;
}
}
@@ -1924,7 +1921,7 @@ public class HRegionServer implements HR
}
@Override
- @QosPriority(priority=HIGH_QOS)
+ @QosPriority(priority=HConstants.HIGH_QOS)
public HRegionInfo getRegionInfo(final byte[] regionName)
throws NotServingRegionException, IOException {
checkOpen();
@@ -2604,7 +2601,7 @@ public class HRegionServer implements HR
}
@Override
- @QosPriority(priority=HIGH_QOS)
+ @QosPriority(priority=HConstants.HIGH_QOS)
public void unlockRow(byte[] regionName, long lockId) throws IOException {
checkOpen();
NullPointerException npe = null;
@@ -2686,14 +2683,14 @@ public class HRegionServer implements HR
// Region open/close direct RPCs
@Override
- @QosPriority(priority=HIGH_QOS)
+ @QosPriority(priority=HConstants.HIGH_QOS)
public RegionOpeningState openRegion(HRegionInfo region)
throws IOException {
return openRegion(region, -1);
}
@Override
- @QosPriority(priority = HIGH_QOS)
+ @QosPriority(priority = HConstants.HIGH_QOS)
public RegionOpeningState openRegion(HRegionInfo region, int versionOfOfflineNode)
throws IOException {
return openRegion(region, versionOfOfflineNode, null);
@@ -2761,7 +2758,7 @@ public class HRegionServer implements HR
}
@Override
- @QosPriority(priority=HIGH_QOS)
+ @QosPriority(priority=HConstants.HIGH_QOS)
public void openRegions(List<HRegionInfo> regions)
throws IOException {
checkOpen();
@@ -2771,14 +2768,14 @@ public class HRegionServer implements HR
}
@Override
- @QosPriority(priority=HIGH_QOS)
+ @QosPriority(priority=HConstants.HIGH_QOS)
public boolean closeRegion(HRegionInfo region)
throws IOException {
return closeRegion(region, true, -1);
}
@Override
- @QosPriority(priority=HIGH_QOS)
+ @QosPriority(priority=HConstants.HIGH_QOS)
public boolean closeRegion(final HRegionInfo region,
final int versionOfClosingNode)
throws IOException {
@@ -2786,13 +2783,13 @@ public class HRegionServer implements HR
}
@Override
- @QosPriority(priority=HIGH_QOS)
+ @QosPriority(priority=HConstants.HIGH_QOS)
public boolean closeRegion(HRegionInfo region, final boolean zk)
throws IOException {
return closeRegion(region, zk, -1);
}
- @QosPriority(priority=HIGH_QOS)
+ @QosPriority(priority=HConstants.HIGH_QOS)
protected boolean closeRegion(HRegionInfo region, final boolean zk,
final int versionOfClosingNode)
throws IOException {
@@ -2811,7 +2808,7 @@ public class HRegionServer implements HR
}
@Override
- @QosPriority(priority=HIGH_QOS)
+ @QosPriority(priority=HConstants.HIGH_QOS)
public boolean closeRegion(byte[] encodedRegionName, boolean zk)
throws IOException {
return closeRegion(encodedRegionName, false, zk);
@@ -2891,7 +2888,7 @@ public class HRegionServer implements HR
// Manual remote region administration RPCs
@Override
- @QosPriority(priority=HIGH_QOS)
+ @QosPriority(priority=HConstants.HIGH_QOS)
public void flushRegion(HRegionInfo regionInfo)
throws NotServingRegionException, IOException {
checkOpen();
@@ -2901,7 +2898,7 @@ public class HRegionServer implements HR
}
@Override
- @QosPriority(priority=HIGH_QOS)
+ @QosPriority(priority=HConstants.HIGH_QOS)
public void splitRegion(HRegionInfo regionInfo)
throws NotServingRegionException, IOException {
splitRegion(regionInfo, null);
@@ -2918,7 +2915,7 @@ public class HRegionServer implements HR
}
@Override
- @QosPriority(priority=HIGH_QOS)
+ @QosPriority(priority=HConstants.HIGH_QOS)
public void compactRegion(HRegionInfo regionInfo, boolean major)
throws NotServingRegionException, IOException {
checkOpen();
@@ -2964,7 +2961,7 @@ public class HRegionServer implements HR
}
@Override
- @QosPriority(priority=HIGH_QOS)
+ @QosPriority(priority=HConstants.HIGH_QOS)
public List<HRegionInfo> getOnlineRegions() throws IOException {
checkOpen();
List<HRegionInfo> list = new ArrayList<HRegionInfo>(onlineRegions.size());
@@ -3161,7 +3158,7 @@ public class HRegionServer implements HR
}
@Override
- @QosPriority(priority=HIGH_QOS)
+ @QosPriority(priority=HConstants.HIGH_QOS)
public ProtocolSignature getProtocolSignature(
String protocol, long version, int clientMethodsHashCode)
throws IOException {
@@ -3172,7 +3169,7 @@ public class HRegionServer implements HR
}
@Override
- @QosPriority(priority=HIGH_QOS)
+ @QosPriority(priority=HConstants.HIGH_QOS)
public long getProtocolVersion(final String protocol, final long clientVersion)
throws IOException {
if (protocol.equals(HRegionInterface.class.getName())) {
@@ -3334,7 +3331,7 @@ public class HRegionServer implements HR
* @deprecated Use {@link #getServerName()} instead.
*/
@Override
- @QosPriority(priority=HIGH_QOS)
+ @QosPriority(priority=HConstants.HIGH_QOS)
public HServerInfo getHServerInfo() throws IOException {
checkOpen();
return new HServerInfo(new HServerAddress(this.isa),
@@ -3632,7 +3629,7 @@ public class HRegionServer implements HR
}
@Override
- @QosPriority(priority=HIGH_QOS)
+ @QosPriority(priority=HConstants.REPLICATION_QOS)
public void replicateLogEntries(final HLog.Entry[] entries)
throws IOException {
checkOpen();