You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/10/22 21:08:24 UTC
svn commit: r1401012 - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/ipc/
main/java/org/apache/hadoop/hbase/regionserver/
main/java/org/apache/hadoop/hbase/regionserver/metrics/
main/java/org/apache/hadoop/hbase/util/ test/java/or...
Author: tedyu
Date: Mon Oct 22 19:08:23 2012
New Revision: 1401012
URL: http://svn.apache.org/viewvc?rev=1401012&view=rev
Log:
HBASE-6728 revert
Removed:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/SizeBasedThrottler.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestSizeBasedThrottler.java
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerDynamicMetrics.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1401012&r1=1401011&r2=1401012&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Mon Oct 22 19:08:23 2012
@@ -26,7 +26,6 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.io.InterruptedIOException;
import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@@ -53,12 +52,12 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslException;
@@ -69,29 +68,28 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
-import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestHeader;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader.Status;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader.Status;
+import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.AuthMethod;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslStatus;
-import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.ByteBufferOutputStream;
-import org.apache.hadoop.hbase.util.SizeBasedThrottler;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.ipc.RPC.VersionMismatch;
import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.ipc.RPC.VersionMismatch;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
@@ -99,19 +97,20 @@ import org.apache.hadoop.security.author
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.hadoop.security.token.SecretManager;
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.util.StringUtils;
+
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.Message;
+
import org.cliffc.high_scale_lib.Counter;
import org.cloudera.htrace.Sampler;
import org.cloudera.htrace.Span;
-import org.cloudera.htrace.Trace;
import org.cloudera.htrace.TraceInfo;
import org.cloudera.htrace.impl.NullSpan;
-
-import com.google.common.base.Function;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.protobuf.Message;
+import org.cloudera.htrace.Trace;
/** A client for an IPC service. IPC calls take a single Protobuf message as a
* parameter, and return a single Protobuf message as their value. A service runs on
@@ -257,14 +256,6 @@ public abstract class HBaseServer implem
protected final boolean tcpKeepAlive; // if T then use keepalives
protected final long purgeTimeout; // in milliseconds
- // responseQueuesSizeThrottler is shared among all responseQueues,
- // it bounds memory occupied by responses in all responseQueues
- final SizeBasedThrottler responseQueuesSizeThrottler;
-
- // RESPONSE_QUEUE_MAX_SIZE limits total size of responses in every response queue
- private static final long DEFAULT_RESPONSE_QUEUES_MAX_SIZE = 1024 * 1024 * 1024; // 1G
- private static final String RESPONSE_QUEUES_MAX_SIZE = "ipc.server.response.queue.maxsize";
-
volatile protected boolean running = true; // true while server runs
protected BlockingQueue<Call> callQueue; // queued calls
protected final Counter callQueueSize = new Counter();
@@ -996,7 +987,7 @@ public abstract class HBaseServer implem
//
// Extract the first call
//
- call = responseQueue.peek();
+ call = responseQueue.removeFirst();
SocketChannel channel = call.connection.channel;
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + ": responding to #" + call.id + " from " +
@@ -1007,13 +998,9 @@ public abstract class HBaseServer implem
//
int numBytes = channelWrite(channel, call.response);
if (numBytes < 0) {
- // Error flag is set, so returning here closes connection and
- // clears responseQueue.
return true;
}
if (!call.response.hasRemaining()) {
- responseQueue.poll();
- responseQueuesSizeThrottler.decrease(call.response.limit());
responseQueueLen--;
call.connection.decRpcCount();
//noinspection RedundantIfStatement
@@ -1027,6 +1014,12 @@ public abstract class HBaseServer implem
call.connection + " Wrote " + numBytes + " bytes.");
}
} else {
+ //
+ // If we were unable to write the entire response out, then
+ // insert in Selector queue.
+ //
+ call.connection.responseQueue.addFirst(call);
+
if (inHandler) {
// set the serve time when the response has to be sent later
call.timestamp = System.currentTimeMillis();
@@ -1081,31 +1074,15 @@ public abstract class HBaseServer implem
responseQueueLen++;
boolean doRegister = false;
- boolean closed;
- try {
- responseQueuesSizeThrottler.increase(call.response.remaining());
- } catch (InterruptedException ie) {
- throw new InterruptedIOException(ie.getMessage());
- }
synchronized (call.connection.responseQueue) {
- closed = call.connection.closed;
- if (!closed) {
- call.connection.responseQueue.addLast(call);
-
- if (call.connection.responseQueue.size() == 1) {
- doRegister = !processResponse(call.connection.responseQueue, false);
- }
+ call.connection.responseQueue.addLast(call);
+ if (call.connection.responseQueue.size() == 1) {
+ doRegister = !processResponse(call.connection.responseQueue, false);
}
}
if (doRegister) {
enqueueInSelector(call);
}
- if (closed) {
- // Connection was closed when we tried to submit response, but we
- // increased responseQueues size already. It shoud be
- // decreased here.
- responseQueuesSizeThrottler.decrease(call.response.remaining());
- }
}
private synchronized void incPending() { // call waiting to be enqueued.
@@ -1130,8 +1107,6 @@ public abstract class HBaseServer implem
//version are read
private boolean headerRead = false; //if the connection header that
//follows version is read.
-
- protected volatile boolean closed = false; // indicates if connection was closed
protected SocketChannel channel;
private ByteBuffer data;
private ByteBuffer dataLengthBuffer;
@@ -1716,7 +1691,6 @@ public abstract class HBaseServer implem
}
protected synchronized void close() {
- closed = true;
disposeSasl();
data = null;
dataLengthBuffer = null;
@@ -1972,9 +1946,6 @@ public abstract class HBaseServer implem
this.delayedCalls = new AtomicInteger(0);
- this.responseQueuesSizeThrottler = new SizeBasedThrottler(
- conf.getLong(RESPONSE_QUEUES_MAX_SIZE, DEFAULT_RESPONSE_QUEUES_MAX_SIZE));
-
// Create the responder here
responder = new Responder();
this.authorize =
@@ -2019,14 +1990,6 @@ public abstract class HBaseServer implem
}
}
connection.close();
- long bytes = 0;
- synchronized (connection.responseQueue) {
- for (Call c : connection.responseQueue) {
- bytes += c.response.limit();
- }
- connection.responseQueue.clear();
- }
- responseQueuesSizeThrottler.decrease(bytes);
rpcMetrics.numOpenConnections.set(numConnections);
}
@@ -2281,8 +2244,4 @@ public abstract class HBaseServer implem
public static RpcCallContext getCurrentCall() {
return CurCall.get();
}
-
- public long getResponseQueueSize(){
- return responseQueuesSizeThrottler.getCurrentValue();
- }
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1401012&r1=1401011&r2=1401012&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Mon Oct 22 19:08:23 2012
@@ -53,6 +53,7 @@ import java.util.concurrent.locks.Reentr
import javax.management.ObjectName;
+import com.google.protobuf.Message;
import org.apache.commons.lang.mutable.MutableDouble;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -102,8 +103,8 @@ import org.apache.hadoop.hbase.client.co
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
-import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@@ -112,7 +113,6 @@ import org.apache.hadoop.hbase.ipc.Copro
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
import org.apache.hadoop.hbase.ipc.HBaseRpcMetrics;
-import org.apache.hadoop.hbase.ipc.HBaseServer;
import org.apache.hadoop.hbase.ipc.ProtocolSignature;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
@@ -152,8 +152,6 @@ import org.apache.hadoop.hbase.protobuf.
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
@@ -198,8 +196,8 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics.StoreMetricType;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
+import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
@@ -230,10 +228,12 @@ import org.codehaus.jackson.map.ObjectMa
import com.google.common.base.Function;
import com.google.protobuf.ByteString;
-import com.google.protobuf.Message;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
+import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
+import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
+
/**
* HRegionServer makes a set of HRegions available to clients. It checks in with
* the HMaster. There are many HRegionServers in a single HBase deployment.
@@ -262,9 +262,6 @@ public class HRegionServer implements C
protected long maxScannerResultSize;
- // Server to handle client requests.
- private HBaseServer server;
-
// Cache flushing
protected MemStoreFlusher cacheFlusher;
@@ -521,7 +518,6 @@ public class HRegionServer implements C
conf.getInt("hbase.regionserver.metahandler.count", 10),
conf.getBoolean("hbase.rpc.verbose", false),
conf, HConstants.QOS_THRESHOLD);
- if (rpcServer instanceof HBaseServer) server = (HBaseServer) rpcServer;
// Set our address.
this.isa = this.rpcServer.getListenerAddress();
@@ -1201,7 +1197,7 @@ public class HRegionServer implements C
this.hlog = setupWALAndReplication();
// Init in here rather than in constructor after thread name has been set
this.metrics = new RegionServerMetrics();
- this.dynamicMetrics = RegionServerDynamicMetrics.newInstance(this);
+ this.dynamicMetrics = RegionServerDynamicMetrics.newInstance();
startServiceThreads();
LOG.info("Serving as " + this.serverNameFromMasterPOV +
", RPC listening on " + this.isa +
@@ -4135,11 +4131,4 @@ public class HRegionServer implements C
private String getMyEphemeralNodePath() {
return ZKUtil.joinZNode(this.zooKeeper.rsZNode, getServerName().toString());
}
-
- public long getResponseQueueSize(){
- if (server != null) {
- return server.getResponseQueueSize();
- }
- return 0;
- }
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerDynamicMetrics.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerDynamicMetrics.java?rev=1401012&r1=1401011&r2=1401012&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerDynamicMetrics.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerDynamicMetrics.java Mon Oct 22 19:08:23 2012
@@ -30,7 +30,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.MetricsRecord;
@@ -60,7 +59,6 @@ public class RegionServerDynamicMetrics
private MetricsContext context;
private final RegionServerDynamicStatistics rsDynamicStatistics;
private Method updateMbeanInfoIfMetricsListChanged = null;
- private HRegionServer regionServer;
private static final Log LOG =
LogFactory.getLog(RegionServerDynamicStatistics.class);
@@ -76,14 +74,13 @@ public class RegionServerDynamicMetrics
*/
public final MetricsRegistry registry = new MetricsRegistry();
- private RegionServerDynamicMetrics(HRegionServer regionServer) {
+ private RegionServerDynamicMetrics() {
this.context = MetricsUtil.getContext("hbase-dynamic");
this.metricsRecord = MetricsUtil.createRecord(
this.context,
"RegionServerDynamicStatistics");
context.registerUpdater(this);
this.rsDynamicStatistics = new RegionServerDynamicStatistics(this.registry);
- this.regionServer = regionServer;
try {
updateMbeanInfoIfMetricsListChanged =
this.rsDynamicStatistics.getClass().getSuperclass()
@@ -95,9 +92,9 @@ public class RegionServerDynamicMetrics
}
}
- public static RegionServerDynamicMetrics newInstance(HRegionServer regionServer) {
+ public static RegionServerDynamicMetrics newInstance() {
RegionServerDynamicMetrics metrics =
- new RegionServerDynamicMetrics(regionServer);
+ new RegionServerDynamicMetrics();
return metrics;
}
@@ -187,13 +184,6 @@ public class RegionServerDynamicMetrics
for (Entry<String, AtomicLong> entry : RegionMetricsStorage.getNumericMetrics().entrySet()) {
this.setNumericMetric(entry.getKey(), entry.getValue().getAndSet(0));
}
-
- /* export estimated size of all response queues */
- if (regionServer != null) {
- long responseQueueSize = regionServer.getResponseQueueSize();
- this.setNumericMetric("responseQueuesSize", responseQueueSize);
- }
-
/* get dynamically created numeric metrics, and push the metrics.
* These ones aren't to be reset; they are cumulative. */
for (Entry<String, AtomicLong> entry : RegionMetricsStorage.getNumericPersistentMetrics().entrySet()) {