You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2012/08/31 02:05:37 UTC
svn commit: r1379235 - in /hbase/trunk:
hbase-common/src/main/java/org/apache/hadoop/hbase/
hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/
hbase-server/src/test/java/org/apache/h...
Author: larsh
Date: Fri Aug 31 00:05:36 2012
New Revision: 1379235
URL: http://svn.apache.org/viewvc?rev=1379235&view=rev
Log:
HBASE-6165 Replication can overrun .META. scans on cluster re-start (Himanshu Vashishtha)
Modified:
hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java
Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1379235&r1=1379234&r2=1379235&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java Fri Aug 31 00:05:36 2012
@@ -701,6 +701,16 @@ public final class HConstants {
/** Configuration key for the directory to backup HFiles for a table */
public static final String HFILE_ARCHIVE_DIRECTORY = "hbase.table.archive.directory";
+ /**
+ * QOS attributes: these attributes are used to demarcate RPC call processing
+ * by different set of handlers. For example, HIGH_QOS tagged methods are
+ * handled by high priority handlers.
+ */
+ public static final int NORMAL_QOS = 0;
+ public static final int QOS_THRESHOLD = 10;
+ public static final int HIGH_QOS = 100;
+ public static final int REPLICATION_QOS = 5; // normal_QOS < replication_QOS < high_QOS
+
private HConstants() {
// Can't be instantiated with this ctor.
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java?rev=1379235&r1=1379234&r2=1379235&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java Fri Aug 31 00:05:36 2012
@@ -109,6 +109,8 @@ public class HBaseRpcMetrics implements
new MetricsTimeVaryingInt("rpcAuthorizationSuccesses", registry);
public MetricsTimeVaryingRate rpcSlowResponseTime =
new MetricsTimeVaryingRate("RpcSlowResponse", registry);
+ public final MetricsIntValue replicationCallQueueLen =
+ new MetricsIntValue("replicationCallQueueLen", registry);
private void initMethods(Class<? extends VersionedProtocol> protocol) {
for (Method m : protocol.getDeclaredMethods()) {
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1379235&r1=1379234&r2=1379235&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Fri Aug 31 00:05:36 2012
@@ -275,6 +275,11 @@ public abstract class HBaseServer implem
protected int numConnections = 0;
private Handler[] handlers = null;
private Handler[] priorityHandlers = null;
+ /** replication related queue; */
+ private BlockingQueue<Call> replicationQueue;
+ private int numOfReplicationHandlers = 0;
+ private Handler[] replicationHandlers = null;
+
protected HBaseRPCErrorHandler errorHandler = null;
/**
@@ -1650,6 +1655,10 @@ public abstract class HBaseServer implem
if (priorityCallQueue != null && getQosLevel(rpcRequestBody) > highPriorityLevel) {
priorityCallQueue.put(call);
updateCallQueueLenMetrics(priorityCallQueue);
+ } else if (replicationQueue != null
+ && getQosLevel(rpcRequestBody) == HConstants.REPLICATION_QOS) {
+ replicationQueue.put(call);
+ updateCallQueueLenMetrics(replicationQueue);
} else {
callQueue.put(call); // queue the call; maybe blocked here
updateCallQueueLenMetrics(callQueue);
@@ -1732,6 +1741,8 @@ public abstract class HBaseServer implem
rpcMetrics.callQueueLen.set(callQueue.size());
} else if (queue == priorityCallQueue) {
rpcMetrics.priorityCallQueueLen.set(priorityCallQueue.size());
+ } else if (queue == replicationQueue) {
+ rpcMetrics.replicationCallQueueLen.set(replicationQueue.size());
} else {
LOG.warn("Unknown call queue");
}
@@ -1751,6 +1762,8 @@ public abstract class HBaseServer implem
if (cq == priorityCallQueue) {
// this is just an amazing hack, but it works.
threadName = "PRI " + threadName;
+ } else if (cq == replicationQueue) {
+ threadName = "REPL " + threadName;
}
this.setName(threadName);
this.status = TaskMonitor.get().createRPCStatus(threadName);
@@ -1917,7 +1930,10 @@ public abstract class HBaseServer implem
this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);
this.purgeTimeout = conf.getLong("ipc.client.call.purge.timeout",
2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
-
+ this.numOfReplicationHandlers = conf.getInt("hbase.regionserver.replication.handler.count", 3);
+ if (numOfReplicationHandlers > 0) {
+ this.replicationQueue = new LinkedBlockingQueue<Call>(maxQueueSize);
+ }
// Start the listener here and let it bind to the port
listener = new Listener();
this.port = listener.getAddress().getPort();
@@ -2011,22 +2027,23 @@ public abstract class HBaseServer implem
public synchronized void startThreads() {
responder.start();
listener.start();
- handlers = new Handler[handlerCount];
+ handlers = startHandlers(callQueue, handlerCount);
+ priorityHandlers = startHandlers(priorityCallQueue, priorityHandlerCount);
+ replicationHandlers = startHandlers(replicationQueue, numOfReplicationHandlers);
+ }
- for (int i = 0; i < handlerCount; i++) {
- handlers[i] = new Handler(callQueue, i);
+ private Handler[] startHandlers(BlockingQueue<Call> queue, int numOfHandlers) {
+ if (numOfHandlers <= 0) {
+ return null;
+ }
+ Handler[] handlers = new Handler[numOfHandlers];
+ for (int i = 0; i < numOfHandlers; i++) {
+ handlers[i] = new Handler(queue, i);
handlers[i].start();
}
-
- if (priorityHandlerCount > 0) {
- priorityHandlers = new Handler[priorityHandlerCount];
- for (int i = 0 ; i < priorityHandlerCount; i++) {
- priorityHandlers[i] = new Handler(priorityCallQueue, i);
- priorityHandlers[i].start();
- }
- }
+ return handlers;
}
-
+
public SecretManager<? extends TokenIdentifier> getSecretManager() {
return this.secretManager;
}
@@ -2040,20 +2057,9 @@ public abstract class HBaseServer implem
public synchronized void stop() {
LOG.info("Stopping server on " + port);
running = false;
- if (handlers != null) {
- for (Handler handler : handlers) {
- if (handler != null) {
- handler.interrupt();
- }
- }
- }
- if (priorityHandlers != null) {
- for (Handler handler : priorityHandlers) {
- if (handler != null) {
- handler.interrupt();
- }
- }
- }
+ stopHandlers(handlers);
+ stopHandlers(priorityHandlers);
+ stopHandlers(replicationHandlers);
listener.interrupt();
listener.doStop();
responder.interrupt();
@@ -2063,6 +2069,16 @@ public abstract class HBaseServer implem
}
}
+ private void stopHandlers(Handler[] handlers) {
+ if (handlers != null) {
+ for (Handler handler : handlers) {
+ if (handler != null) {
+ handler.interrupt();
+ }
+ }
+ }
+ }
+
/** Wait for the server to be stopped.
* Does not wait for all subthreads to finish.
* See {@link #stop()}.
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1379235&r1=1379234&r2=1379235&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Fri Aug 31 00:05:36 2012
@@ -27,13 +27,10 @@ import java.lang.annotation.RetentionPol
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryUsage;
import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.BindException;
import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@@ -43,13 +40,11 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.SortedSet;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
-import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -96,7 +91,6 @@ import org.apache.hadoop.hbase.client.De
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.Increment;
-import org.apache.hadoop.hbase.client.MultiAction;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
@@ -117,7 +111,6 @@ import org.apache.hadoop.hbase.ipc.Copro
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
import org.apache.hadoop.hbase.ipc.HBaseRpcMetrics;
-import org.apache.hadoop.hbase.ipc.Invocation;
import org.apache.hadoop.hbase.ipc.ProtocolSignature;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
@@ -209,7 +202,6 @@ import org.apache.hadoop.hbase.zookeeper
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.metrics.util.MBeanUtil;
import org.apache.hadoop.net.DNS;
@@ -231,7 +223,6 @@ import org.apache.hadoop.hbase.RegionSer
import com.google.common.base.Function;
import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.RpcController;
@@ -305,10 +296,6 @@ public class HRegionServer implements C
protected volatile boolean fsOk;
protected HFileSystem fs;
- protected static final int NORMAL_QOS = 0;
- protected static final int QOS_THRESHOLD = 10; // the line between low and high qos
- protected static final int HIGH_QOS = 100;
-
// Set when a report to the master comes back with a message asking us to
// shutdown. Also set by call to stop when debugging or running unit tests
// of HRegionServer in isolation.
@@ -522,7 +509,7 @@ public class HRegionServer implements C
conf.getInt("hbase.regionserver.handler.count", 10),
conf.getInt("hbase.regionserver.metahandler.count", 10),
conf.getBoolean("hbase.rpc.verbose", false),
- conf, QOS_THRESHOLD);
+ conf, HConstants.QOS_THRESHOLD);
// Set our address.
this.isa = this.rpcServer.getListenerAddress();
@@ -674,7 +661,7 @@ public class HRegionServer implements C
}
if (rpcArgClass == null || from.getRequest().isEmpty()) {
- return NORMAL_QOS;
+ return HConstants.NORMAL_QOS;
}
Object deserializedRequestObj = null;
//check whether the request has reference to Meta region
@@ -690,7 +677,7 @@ public class HRegionServer implements C
if (LOG.isDebugEnabled()) {
LOG.debug("High priority: " + from.toString());
}
- return HIGH_QOS;
+ return HConstants.HIGH_QOS;
}
} catch (Exception ex) {
throw new RuntimeException(ex);
@@ -699,20 +686,20 @@ public class HRegionServer implements C
if (methodName.equals("scan")) { // scanner methods...
ScanRequest request = (ScanRequest)deserializedRequestObj;
if (!request.hasScannerId()) {
- return NORMAL_QOS;
+ return HConstants.NORMAL_QOS;
}
RegionScanner scanner = hRegionServer.getScanner(request.getScannerId());
if (scanner != null && scanner.getRegionInfo().isMetaRegion()) {
if (LOG.isDebugEnabled()) {
LOG.debug("High priority scanner request: " + request.getScannerId());
}
- return HIGH_QOS;
+ return HConstants.HIGH_QOS;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Low priority: " + from.toString());
}
- return NORMAL_QOS;
+ return HConstants.NORMAL_QOS;
}
}
@@ -2182,7 +2169,7 @@ public class HRegionServer implements C
}
@Override
- @QosPriority(priority=HIGH_QOS)
+ @QosPriority(priority=HConstants.HIGH_QOS)
public ProtocolSignature getProtocolSignature(
String protocol, long version, int clientMethodsHashCode)
throws IOException {
@@ -2195,7 +2182,7 @@ public class HRegionServer implements C
}
@Override
- @QosPriority(priority=HIGH_QOS)
+ @QosPriority(priority=HConstants.HIGH_QOS)
public long getProtocolVersion(final String protocol, final long clientVersion)
throws IOException {
if (protocol.equals(ClientProtocol.class.getName())) {
@@ -3187,7 +3174,7 @@ public class HRegionServer implements C
* @throws ServiceException
*/
@Override
- @QosPriority(priority=HIGH_QOS)
+ @QosPriority(priority=HConstants.HIGH_QOS)
public UnlockRowResponse unlockRow(final RpcController controller,
final UnlockRowRequest request) throws ServiceException {
try {
@@ -3393,7 +3380,7 @@ public class HRegionServer implements C
// Start Admin methods
@Override
- @QosPriority(priority=HIGH_QOS)
+ @QosPriority(priority=HConstants.HIGH_QOS)
public GetRegionInfoResponse getRegionInfo(final RpcController controller,
final GetRegionInfoRequest request) throws ServiceException {
try {
@@ -3440,7 +3427,7 @@ public class HRegionServer implements C
}
@Override
- @QosPriority(priority=HIGH_QOS)
+ @QosPriority(priority=HConstants.HIGH_QOS)
public GetOnlineRegionResponse getOnlineRegion(final RpcController controller,
final GetOnlineRegionRequest request) throws ServiceException {
try {
@@ -3468,7 +3455,7 @@ public class HRegionServer implements C
* @throws ServiceException
*/
@Override
- @QosPriority(priority=HIGH_QOS)
+ @QosPriority(priority=HConstants.HIGH_QOS)
public OpenRegionResponse openRegion(final RpcController controller, final OpenRegionRequest request)
throws ServiceException {
int versionOfOfflineNode = -1;
@@ -3555,7 +3542,7 @@ public class HRegionServer implements C
* @throws ServiceException
*/
@Override
- @QosPriority(priority=HIGH_QOS)
+ @QosPriority(priority=HConstants.HIGH_QOS)
public CloseRegionResponse closeRegion(final RpcController controller,
final CloseRegionRequest request) throws ServiceException {
int versionOfClosingNode = -1;
@@ -3594,7 +3581,7 @@ public class HRegionServer implements C
* @throws ServiceException
*/
@Override
- @QosPriority(priority=HIGH_QOS)
+ @QosPriority(priority=HConstants.HIGH_QOS)
public FlushRegionResponse flushRegion(final RpcController controller,
final FlushRegionRequest request) throws ServiceException {
try {
@@ -3625,7 +3612,7 @@ public class HRegionServer implements C
* @throws ServiceException
*/
@Override
- @QosPriority(priority=HIGH_QOS)
+ @QosPriority(priority=HConstants.HIGH_QOS)
public SplitRegionResponse splitRegion(final RpcController controller,
final SplitRegionRequest request) throws ServiceException {
try {
@@ -3654,7 +3641,7 @@ public class HRegionServer implements C
* @throws ServiceException
*/
@Override
- @QosPriority(priority=HIGH_QOS)
+ @QosPriority(priority=HConstants.HIGH_QOS)
public CompactRegionResponse compactRegion(final RpcController controller,
final CompactRegionRequest request) throws ServiceException {
try {
@@ -3688,7 +3675,7 @@ public class HRegionServer implements C
* @throws ServiceException
*/
@Override
- @QosPriority(priority=HIGH_QOS)
+ @QosPriority(priority=HConstants.REPLICATION_QOS)
public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
final ReplicateWALEntryRequest request) throws ServiceException {
try {
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java?rev=1379235&r1=1379234&r2=1379235&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java Fri Aug 31 00:05:36 2012
@@ -25,6 +25,7 @@ import static org.junit.Assert.*;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
@@ -85,7 +86,7 @@ public class TestPriorityRpc {
Mockito.when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo);
Mockito.when(mockRegionInfo.isMetaRegion()).thenReturn(true);
qosFunction.setRegionServer(mockRS);
- assertTrue (qosFunction.apply(rpcRequest) == HRegionServer.HIGH_QOS);
+ assertTrue (qosFunction.apply(rpcRequest) == HConstants.HIGH_QOS);
}
@Test
@@ -99,7 +100,7 @@ public class TestPriorityRpc {
rpcRequestBuilder.setRequestClassName(GetOnlineRegionRequest.class.getCanonicalName());
RpcRequestBody rpcRequest = rpcRequestBuilder.build();
QosFunction qosFunc = regionServer.getQosFunction();
- assertTrue (qosFunc.apply(rpcRequest) == HRegionServer.NORMAL_QOS);
+ assertTrue (qosFunc.apply(rpcRequest) == HConstants.NORMAL_QOS);
}
@Test
@@ -112,7 +113,7 @@ public class TestPriorityRpc {
ByteString requestBody = scanBuilder.build().toByteString();
rpcRequestBuilder.setRequest(requestBody);
RpcRequestBody rpcRequest = rpcRequestBuilder.build();
- assertTrue (qosFunction.apply(rpcRequest) == HRegionServer.NORMAL_QOS);
+ assertTrue (qosFunction.apply(rpcRequest) == HConstants.NORMAL_QOS);
//build a scan request with scannerID
scanBuilder = ScanRequest.newBuilder();
@@ -134,11 +135,11 @@ public class TestPriorityRpc {
qosFunction.setRegionServer(mockRS);
- assertTrue (qosFunction.apply(rpcRequest) == HRegionServer.HIGH_QOS);
+ assertTrue (qosFunction.apply(rpcRequest) == HConstants.HIGH_QOS);
//the same as above but with non-meta region
Mockito.when(mockRegionInfo.isMetaRegion()).thenReturn(false);
- assertTrue (qosFunction.apply(rpcRequest) == HRegionServer.NORMAL_QOS);
+ assertTrue (qosFunction.apply(rpcRequest) == HConstants.NORMAL_QOS);
}
@org.junit.Rule