You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ec...@apache.org on 2012/11/07 00:22:09 UTC
svn commit: r1406396 [5/6] - in /hbase/trunk: dev-support/
hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/
hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/
hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/met...
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1406396&r1=1406395&r2=1406396&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Nov 6 23:22:01 2012
@@ -48,13 +48,11 @@ import java.util.concurrent.ConcurrentHa
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.management.ObjectName;
import com.google.protobuf.Message;
-import org.apache.commons.lang.mutable.MutableDouble;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -67,7 +65,6 @@ import org.apache.hadoop.hbase.DoNotRetr
import org.apache.hadoop.hbase.FailedSanityCheckException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
@@ -107,9 +104,7 @@ import org.apache.hadoop.hbase.executor.
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.fs.HFileSystem;
-import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.io.hfile.CacheStats;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
@@ -191,11 +186,6 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenRootHandler;
-import org.apache.hadoop.hbase.regionserver.metrics.RegionMetricsStorage;
-import org.apache.hadoop.hbase.regionserver.metrics.RegionServerDynamicMetrics;
-import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics;
-import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
-import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics.StoreMetricType;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
@@ -225,6 +215,7 @@ import org.apache.hadoop.net.DNS;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.KeeperException;
+import org.cliffc.high_scale_lib.Counter;
import org.codehaus.jackson.map.ObjectMapper;
import com.google.common.base.Function;
@@ -297,9 +288,8 @@ public class HRegionServer implements C
// Instance of the hbase executor service.
protected ExecutorService service;
- // Request counter.
- // Do we need this? Can't we just sum region counters? St.Ack 20110412
- protected AtomicInteger requestCount = new AtomicInteger();
+ // Request counter. (Includes requests that are not serviced by regions.)
+ final Counter requestCount = new Counter();
// If false, the file system has become unavailable
protected volatile boolean fsOk;
@@ -366,9 +356,7 @@ public class HRegionServer implements C
*/
private final LinkedList<byte[]> reservedSpace = new LinkedList<byte[]>();
- private RegionServerMetrics metrics;
-
- private RegionServerDynamicMetrics dynamicMetrics;
+ private MetricsRegionServer metricsRegionServer;
/*
* Check for compactions requests.
@@ -403,7 +391,7 @@ public class HRegionServer implements C
private final RegionServerAccounting regionServerAccounting;
// Cache configuration and block cache reference
- private final CacheConfig cacheConfig;
+ final CacheConfig cacheConfig;
// reference to the Thrift Server.
volatile private HRegionThriftServer thriftServer;
@@ -446,7 +434,6 @@ public class HRegionServer implements C
*/
private final QosFunction qosFunction;
-
/**
* Starts a HRegionServer at the default location
*
@@ -550,6 +537,10 @@ public class HRegionServer implements C
}
}
+ String getClusterId() {
+ return this.conf.get(HConstants.CLUSTER_ID);
+ }
+
@Retention(RetentionPolicy.RUNTIME)
protected @interface QosPriority {
int priority() default 0;
@@ -858,7 +849,6 @@ public class HRegionServer implements C
break;
}
}
- registerMBean();
// We registered with the Master. Go into run mode.
long lastMsg = 0;
@@ -893,7 +883,6 @@ public class HRegionServer implements C
}
long now = System.currentTimeMillis();
if ((now - lastMsg) >= msgInterval) {
- doMetrics();
tryRegionServerReport(lastMsg, now);
lastMsg = System.currentTimeMillis();
}
@@ -1022,8 +1011,6 @@ public class HRegionServer implements C
void tryRegionServerReport(long reportStartTime, long reportEndTime)
throws IOException {
HBaseProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
- // Why we do this?
- this.requestCount.set(0);
try {
RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
ServerName sn = ServerName.parseVersionedServerName(
@@ -1044,13 +1031,21 @@ public class HRegionServer implements C
}
HBaseProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime) {
+ // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests
+ // per second, and other metrics As long as metrics are part of ServerLoad it's best to use
+ // the wrapper to compute those numbers in one place.
+ // In the long term most of these should be moved off of ServerLoad and the heart beat.
+ // Instead they should be stored in an HBase table so that external visibility into HBase is
+ // improved; Additionally the load balancer will be able to take advantage of a more complete
+ // history.
+ MetricsRegionServerWrapper regionServerWrapper = this.metricsRegionServer.getRegionServerWrapper();
Collection<HRegion> regions = getOnlineRegionsLocalContext();
MemoryUsage memory =
ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
HBaseProtos.ServerLoad.Builder serverLoad = HBaseProtos.ServerLoad.newBuilder();
- serverLoad.setNumberOfRequests((int)metrics.getRequests());
- serverLoad.setTotalNumberOfRequests(requestCount.get());
+ serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond());
+ serverLoad.setTotalNumberOfRequests((int) regionServerWrapper.getTotalRequestCount());
serverLoad.setUsedHeapMB((int)(memory.getUsed() / 1024 / 1024));
serverLoad.setMaxHeapMB((int) (memory.getMax() / 1024 / 1024));
Set<String> coprocessors = this.hlog.getCoprocessorHost().getCoprocessors();
@@ -1205,8 +1200,7 @@ public class HRegionServer implements C
this.tableDescriptors = new FSTableDescriptors(this.fs, this.rootDir, true);
this.hlog = setupWALAndReplication();
// Init in here rather than in constructor after thread name has been set
- this.metrics = new RegionServerMetrics();
- this.dynamicMetrics = RegionServerDynamicMetrics.newInstance();
+ this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this));
startServiceThreads();
LOG.info("Serving as " + this.serverNameFromMasterPOV +
", RPC listening on " + this.isa +
@@ -1441,179 +1435,8 @@ public class HRegionServer implements C
return hlogRoller;
}
- /*
- * @param interval Interval since last time metrics were called.
- */
- protected void doMetrics() {
- try {
- metrics();
- } catch (Throwable e) {
- LOG.warn("Failed metrics", e);
- }
- }
-
- protected void metrics() {
- this.metrics.regions.set(this.onlineRegions.size());
- this.metrics.incrementRequests(this.requestCount.get());
- this.metrics.requests.intervalHeartBeat();
- // Is this too expensive every three seconds getting a lock on onlineRegions
- // and then per store carried? Can I make metrics be sloppier and avoid
- // the synchronizations?
- int stores = 0;
- int storefiles = 0;
- long memstoreSize = 0;
- int readRequestsCount = 0;
- int writeRequestsCount = 0;
- long checkAndMutateChecksFailed = 0;
- long checkAndMutateChecksPassed = 0;
- long storefileIndexSize = 0;
- HDFSBlocksDistribution hdfsBlocksDistribution =
- new HDFSBlocksDistribution();
- long totalStaticIndexSize = 0;
- long totalStaticBloomSize = 0;
- long numPutsWithoutWAL = 0;
- long dataInMemoryWithoutWAL = 0;
- long updatesBlockedMs = 0;
-
- // Note that this is a map of Doubles instead of Longs. This is because we
- // do effective integer division, which would perhaps truncate more than it
- // should because we do it only on one part of our sum at a time. Rather
- // than dividing at the end, where it is difficult to know the proper
- // factor, everything is exact then truncated.
- final Map<String, MutableDouble> tempVals =
- new HashMap<String, MutableDouble>();
-
- for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
- HRegion r = e.getValue();
- memstoreSize += r.memstoreSize.get();
- numPutsWithoutWAL += r.numPutsWithoutWAL.get();
- dataInMemoryWithoutWAL += r.dataInMemoryWithoutWAL.get();
- readRequestsCount += r.readRequestsCount.get();
- writeRequestsCount += r.writeRequestsCount.get();
- checkAndMutateChecksFailed += r.checkAndMutateChecksFailed.get();
- checkAndMutateChecksPassed += r.checkAndMutateChecksPassed.get();
- updatesBlockedMs += r.updatesBlockedMs.get();
- synchronized (r.stores) {
- stores += r.stores.size();
- for (Map.Entry<byte[], Store> ee : r.stores.entrySet()) {
- final Store store = ee.getValue();
- final SchemaMetrics schemaMetrics = store.getSchemaMetrics();
-
- {
- long tmpStorefiles = store.getStorefilesCount();
- schemaMetrics.accumulateStoreMetric(tempVals,
- StoreMetricType.STORE_FILE_COUNT, tmpStorefiles);
- storefiles += tmpStorefiles;
- }
-
-
- {
- long tmpStorefileIndexSize = store.getStorefilesIndexSize();
- schemaMetrics.accumulateStoreMetric(tempVals,
- StoreMetricType.STORE_FILE_INDEX_SIZE,
- (long) (tmpStorefileIndexSize / (1024.0 * 1024)));
- storefileIndexSize += tmpStorefileIndexSize;
- }
-
- {
- long tmpStorefilesSize = store.getStorefilesSize();
- schemaMetrics.accumulateStoreMetric(tempVals,
- StoreMetricType.STORE_FILE_SIZE_MB,
- (long) (tmpStorefilesSize / (1024.0 * 1024)));
- }
-
- {
- long tmpStaticBloomSize = store.getTotalStaticBloomSize();
- schemaMetrics.accumulateStoreMetric(tempVals,
- StoreMetricType.STATIC_BLOOM_SIZE_KB,
- (long) (tmpStaticBloomSize / 1024.0));
- totalStaticBloomSize += tmpStaticBloomSize;
- }
-
- {
- long tmpStaticIndexSize = store.getTotalStaticIndexSize();
- schemaMetrics.accumulateStoreMetric(tempVals,
- StoreMetricType.STATIC_INDEX_SIZE_KB,
- (long) (tmpStaticIndexSize / 1024.0));
- totalStaticIndexSize += tmpStaticIndexSize;
- }
-
- schemaMetrics.accumulateStoreMetric(tempVals,
- StoreMetricType.MEMSTORE_SIZE_MB,
- (long) (store.getMemStoreSize() / (1024.0 * 1024)));
- }
- }
-
- hdfsBlocksDistribution.add(r.getHDFSBlocksDistribution());
- }
-
- for (Entry<String, MutableDouble> e : tempVals.entrySet()) {
- RegionMetricsStorage.setNumericMetric(e.getKey(), e.getValue().longValue());
- }
-
- this.metrics.stores.set(stores);
- this.metrics.storefiles.set(storefiles);
- this.metrics.memstoreSizeMB.set((int) (memstoreSize / (1024 * 1024)));
- this.metrics.mbInMemoryWithoutWAL.set((int) (dataInMemoryWithoutWAL / (1024 * 1024)));
- this.metrics.numPutsWithoutWAL.set(numPutsWithoutWAL);
- this.metrics.storefileIndexSizeMB.set(
- (int) (storefileIndexSize / (1024 * 1024)));
- this.metrics.rootIndexSizeKB.set(
- (int) (storefileIndexSize / 1024));
- this.metrics.totalStaticIndexSizeKB.set(
- (int) (totalStaticIndexSize / 1024));
- this.metrics.totalStaticBloomSizeKB.set(
- (int) (totalStaticBloomSize / 1024));
- this.metrics.readRequestsCount.set(readRequestsCount);
- this.metrics.writeRequestsCount.set(writeRequestsCount);
- this.metrics.checkAndMutateChecksFailed.set(checkAndMutateChecksFailed);
- this.metrics.checkAndMutateChecksPassed.set(checkAndMutateChecksPassed);
- this.metrics.compactionQueueSize.set(compactSplitThread
- .getCompactionQueueSize());
- this.metrics.flushQueueSize.set(cacheFlusher
- .getFlushQueueSize());
- this.metrics.updatesBlockedSeconds.update(updatesBlockedMs > 0 ?
- updatesBlockedMs/1000: 0);
- final long updatesBlockedMsHigherWater = cacheFlusher.getUpdatesBlockedMsHighWater().get();
- this.metrics.updatesBlockedSecondsHighWater.update(updatesBlockedMsHigherWater > 0 ?
- updatesBlockedMsHigherWater/1000: 0);
-
- BlockCache blockCache = cacheConfig.getBlockCache();
- if (blockCache != null) {
- this.metrics.blockCacheCount.set(blockCache.size());
- this.metrics.blockCacheFree.set(blockCache.getFreeSize());
- this.metrics.blockCacheSize.set(blockCache.getCurrentSize());
- CacheStats cacheStats = blockCache.getStats();
- this.metrics.blockCacheHitCount.set(cacheStats.getHitCount());
- this.metrics.blockCacheMissCount.set(cacheStats.getMissCount());
- this.metrics.blockCacheEvictedCount.set(blockCache.getEvictedCount());
- double ratio = blockCache.getStats().getHitRatio();
- int percent = (int) (ratio * 100);
- this.metrics.blockCacheHitRatio.set(percent);
- ratio = blockCache.getStats().getHitCachingRatio();
- percent = (int) (ratio * 100);
- this.metrics.blockCacheHitCachingRatio.set(percent);
- // past N period block cache hit / hit caching ratios
- cacheStats.rollMetricsPeriod();
- ratio = cacheStats.getHitRatioPastNPeriods();
- percent = (int) (ratio * 100);
- this.metrics.blockCacheHitRatioPastNPeriods.set(percent);
- ratio = cacheStats.getHitCachingRatioPastNPeriods();
- percent = (int) (ratio * 100);
- this.metrics.blockCacheHitCachingRatioPastNPeriods.set(percent);
- }
- float localityIndex = hdfsBlocksDistribution.getBlockLocalityIndex(
- getServerName().getHostname());
- int percent = (int) (localityIndex * 100);
- this.metrics.hdfsBlocksLocalityIndex.set(percent);
-
- }
-
- /**
- * @return Region server metrics instance.
- */
- public RegionServerMetrics getMetrics() {
- return this.metrics;
+ public MetricsRegionServer getMetrics() {
+ return this.metricsRegionServer;
}
/**
@@ -1841,9 +1664,6 @@ public class HRegionServer implements C
// java.util.HashSet's toString() method to print the coprocessor names.
LOG.fatal("RegionServer abort: loaded coprocessors are: " +
CoprocessorHost.getLoadedCoprocessors());
- if (this.metrics != null) {
- LOG.info("Dump of metrics: " + this.metrics);
- }
// Do our best to report our abort to the master, but this may not work
try {
if (cause != null) {
@@ -2146,45 +1966,7 @@ public class HRegionServer implements C
}
/**
- * @param encodedRegionName
- * @return JSON Map of labels to values for passed in <code>encodedRegionName</code>
- * @throws IOException
- */
- public byte [] getRegionStats(final String encodedRegionName)
- throws IOException {
- HRegion r = null;
- synchronized (this.onlineRegions) {
- r = this.onlineRegions.get(encodedRegionName);
- }
- if (r == null) return null;
- ObjectMapper mapper = new ObjectMapper();
- int stores = 0;
- int storefiles = 0;
- int storefileSizeMB = 0;
- int memstoreSizeMB = (int) (r.memstoreSize.get() / 1024 / 1024);
- int storefileIndexSizeMB = 0;
- synchronized (r.stores) {
- stores += r.stores.size();
- for (Store store : r.stores.values()) {
- storefiles += store.getStorefilesCount();
- storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
- storefileIndexSizeMB += (int) (store.getStorefilesIndexSize() / 1024 / 1024);
- }
- }
- Map<String, Integer> map = new TreeMap<String, Integer>();
- map.put("stores", stores);
- map.put("storefiles", storefiles);
- map.put("storefileSizeMB", storefileSizeMB);
- map.put("storefileIndexSizeMB", storefileIndexSizeMB);
- map.put("memstoreSizeMB", memstoreSizeMB);
- StringWriter w = new StringWriter();
- mapper.writeValue(w, map);
- w.close();
- return Bytes.toBytes(w.toString());
- }
-
- /**
- * For tests and web ui.
+ * For tests, web ui and metrics.
* This method will only work if HRegionServer is in the same JVM as client;
* HRegion cannot be serialized to cross an rpc.
* @see #getOnlineRegions()
@@ -2218,11 +2000,6 @@ public class HRegionServer implements C
return sortedRegions;
}
- /** @return the request count */
- public AtomicInteger getRequestCount() {
- return this.requestCount;
- }
-
/**
* @return time stamp in millis of when this region server was started
*/
@@ -2498,16 +2275,6 @@ public class HRegionServer implements C
}
/**
- * Register bean with platform management server
- */
- void registerMBean() {
- MXBeanImpl mxBeanInfo = MXBeanImpl.init(this);
- mxBean = MBeanUtil.registerMBean("RegionServer", "RegionServer",
- mxBeanInfo);
- LOG.info("Registered RegionServer MXBean");
- }
-
- /**
* Instantiated as a row lock lease. If the lease times out, the row lock is
* released
*/
@@ -2685,14 +2452,7 @@ public class HRegionServer implements C
if (destination != null){
addToMovedRegions(encodedRegionName, destination);
}
-
- //Clear all of the dynamic metrics as they are now probably useless.
- //This is a clear because dynamic metrics could include metrics per cf and
- //per hfile. Figuring out which cfs, hfiles, and regions are still relevant to
- //this region server would be an onerous task. Instead just clear everything
- //and on the next tick of the metrics everything that is still relevant will be
- //re-added.
- this.dynamicMetrics.clear();
+
return toReturn != null;
}
@@ -2885,8 +2645,9 @@ public class HRegionServer implements C
@Override
public GetResponse get(final RpcController controller,
final GetRequest request) throws ServiceException {
+ long before = EnvironmentEdgeManager.currentTimeMillis();
try {
- requestCount.incrementAndGet();
+ requestCount.increment();
HRegion region = getRegion(request.getRegion());
GetResponse.Builder builder = GetResponse.newBuilder();
ClientProtos.Get get = request.getGet();
@@ -2926,6 +2687,8 @@ public class HRegionServer implements C
return builder.build();
} catch (IOException ie) {
throw new ServiceException(ie);
+ } finally {
+ metricsRegionServer.updateGet(EnvironmentEdgeManager.currentTimeMillis() - before);
}
}
@@ -2940,7 +2703,7 @@ public class HRegionServer implements C
public MutateResponse mutate(final RpcController controller,
final MutateRequest request) throws ServiceException {
try {
- requestCount.incrementAndGet();
+ requestCount.increment();
HRegion region = getRegion(request.getRegion());
MutateResponse.Builder builder = MutateResponse.newBuilder();
Mutate mutate = request.getMutate();
@@ -3073,7 +2836,7 @@ public class HRegionServer implements C
}
throw e;
}
- requestCount.incrementAndGet();
+ requestCount.increment();
try {
int ttl = 0;
@@ -3167,7 +2930,7 @@ public class HRegionServer implements C
for (int i = 0; i < rows
&& currentScanResultSize < maxResultSize; i++) {
// Collect values to be returned here
- boolean moreRows = scanner.next(values, SchemaMetrics.METRIC_NEXTSIZE);
+ boolean moreRows = scanner.next(values);
if (!values.isEmpty()) {
for (KeyValue kv : values) {
currentScanResultSize += kv.heapSize();
@@ -3261,7 +3024,7 @@ public class HRegionServer implements C
throw new DoNotRetryIOException(
"lockRow supports only one row now, not " + request.getRowCount() + " rows");
}
- requestCount.incrementAndGet();
+ requestCount.increment();
HRegion region = getRegion(request.getRegion());
byte[] row = request.getRow(0).toByteArray();
try {
@@ -3292,7 +3055,7 @@ public class HRegionServer implements C
public UnlockRowResponse unlockRow(final RpcController controller,
final UnlockRowRequest request) throws ServiceException {
try {
- requestCount.incrementAndGet();
+ requestCount.increment();
HRegion region = getRegion(request.getRegion());
if (!request.hasLockId()) {
throw new DoNotRetryIOException(
@@ -3327,7 +3090,7 @@ public class HRegionServer implements C
public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
final BulkLoadHFileRequest request) throws ServiceException {
try {
- requestCount.incrementAndGet();
+ requestCount.increment();
HRegion region = getRegion(request.getRegion());
List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>();
for (FamilyPath familyPath: request.getFamilyPathList()) {
@@ -3374,7 +3137,7 @@ public class HRegionServer implements C
public ExecCoprocessorResponse execCoprocessor(final RpcController controller,
final ExecCoprocessorRequest request) throws ServiceException {
try {
- requestCount.incrementAndGet();
+ requestCount.increment();
HRegion region = getRegion(request.getRegion());
ExecCoprocessorResponse.Builder
builder = ExecCoprocessorResponse.newBuilder();
@@ -3392,7 +3155,7 @@ public class HRegionServer implements C
public CoprocessorServiceResponse execService(final RpcController controller,
final CoprocessorServiceRequest request) throws ServiceException {
try {
- requestCount.incrementAndGet();
+ requestCount.increment();
HRegion region = getRegion(request.getRegion());
// ignore the passed in controller (from the serialized call)
ServerRpcController execController = new ServerRpcController();
@@ -3441,7 +3204,7 @@ public class HRegionServer implements C
ActionResult.Builder resultBuilder = null;
List<Mutate> mutates = new ArrayList<Mutate>();
for (ClientProtos.MultiAction actionUnion : request.getActionList()) {
- requestCount.incrementAndGet();
+ requestCount.increment();
try {
Object result = null;
if (actionUnion.hasGet()) {
@@ -3524,7 +3287,7 @@ public class HRegionServer implements C
final GetRegionInfoRequest request) throws ServiceException {
try {
checkOpen();
- requestCount.incrementAndGet();
+ requestCount.increment();
HRegion region = getRegion(request.getRegion());
HRegionInfo info = region.getRegionInfo();
GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder();
@@ -3544,7 +3307,7 @@ public class HRegionServer implements C
final GetStoreFileRequest request) throws ServiceException {
try {
HRegion region = getRegion(request.getRegion());
- requestCount.incrementAndGet();
+ requestCount.increment();
Set<byte[]> columnFamilies = null;
if (request.getFamilyCount() == 0) {
columnFamilies = region.getStores().keySet();
@@ -3571,7 +3334,7 @@ public class HRegionServer implements C
final GetOnlineRegionRequest request) throws ServiceException {
try {
checkOpen();
- requestCount.incrementAndGet();
+ requestCount.increment();
List<HRegionInfo> list = new ArrayList<HRegionInfo>(onlineRegions.size());
for (HRegion region: this.onlineRegions.values()) {
list.add(region.getRegionInfo());
@@ -3602,7 +3365,7 @@ public class HRegionServer implements C
} catch (IOException ie) {
throw new ServiceException(ie);
}
- requestCount.incrementAndGet();
+ requestCount.increment();
OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder();
int regionCount = request.getOpenInfoCount();
Map<String, HTableDescriptor> htds =
@@ -3694,7 +3457,6 @@ public class HRegionServer implements C
try {
checkOpen();
- requestCount.incrementAndGet();
String encodedRegionName =
ProtobufUtil.getRegionEncodedName(request.getRegion());
byte[] encodedName = Bytes.toBytes(encodedRegionName);
@@ -3706,6 +3468,7 @@ public class HRegionServer implements C
checkIfRegionInTransition(encodedName, CLOSE);
}
HRegion region = getRegionByEncodedName(encodedRegionName);
+ requestCount.increment();
LOG.info("Received close region: " + region.getRegionNameAsString() +
". Version of ZK closing node:" + versionOfClosingNode +
". Destination server:" + sn);
@@ -3734,7 +3497,7 @@ public class HRegionServer implements C
final FlushRegionRequest request) throws ServiceException {
try {
checkOpen();
- requestCount.incrementAndGet();
+ requestCount.increment();
HRegion region = getRegion(request.getRegion());
LOG.info("Flushing " + region.getRegionNameAsString());
boolean shouldFlush = true;
@@ -3765,7 +3528,7 @@ public class HRegionServer implements C
final SplitRegionRequest request) throws ServiceException {
try {
checkOpen();
- requestCount.incrementAndGet();
+ requestCount.increment();
HRegion region = getRegion(request.getRegion());
LOG.info("Splitting " + region.getRegionNameAsString());
region.flushcache();
@@ -3794,7 +3557,7 @@ public class HRegionServer implements C
final CompactRegionRequest request) throws ServiceException {
try {
checkOpen();
- requestCount.incrementAndGet();
+ requestCount.increment();
HRegion region = getRegion(request.getRegion());
LOG.info("Compacting " + region.getRegionNameAsString());
boolean major = false;
@@ -3829,7 +3592,7 @@ public class HRegionServer implements C
try {
if (replicationSinkHandler != null) {
checkOpen();
- requestCount.incrementAndGet();
+ requestCount.increment();
HLog.Entry[] entries = ProtobufUtil.toHLogEntries(request.getEntryList());
if (entries != null && entries.length > 0) {
replicationSinkHandler.replicateLogEntries(entries);
@@ -3852,7 +3615,7 @@ public class HRegionServer implements C
public RollWALWriterResponse rollWALWriter(final RpcController controller,
final RollWALWriterRequest request) throws ServiceException {
try {
- requestCount.incrementAndGet();
+ requestCount.increment();
HLog wal = this.getWAL();
byte[][] regionsToFlush = wal.rollWriter(true);
RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder();
@@ -3877,7 +3640,7 @@ public class HRegionServer implements C
@Override
public StopServerResponse stopServer(final RpcController controller,
final StopServerRequest request) throws ServiceException {
- requestCount.incrementAndGet();
+ requestCount.increment();
String reason = request.getReason();
stop(reason);
return StopServerResponse.newBuilder().build();
@@ -3894,7 +3657,7 @@ public class HRegionServer implements C
public GetServerInfoResponse getServerInfo(final RpcController controller,
final GetServerInfoRequest request) throws ServiceException {
ServerName serverName = getServerName();
- requestCount.incrementAndGet();
+ requestCount.increment();
return ResponseConverter.buildGetServerInfoResponse(serverName, webuiport);
}
@@ -3924,6 +3687,7 @@ public class HRegionServer implements C
*/
protected Result append(final HRegion region,
final Mutate mutate) throws IOException {
+ long before = EnvironmentEdgeManager.currentTimeMillis();
Append append = ProtobufUtil.toAppend(mutate);
Result r = null;
if (region.getCoprocessorHost() != null) {
@@ -3936,6 +3700,7 @@ public class HRegionServer implements C
region.getCoprocessorHost().postAppend(append, r);
}
}
+ metricsRegionServer.updateAppend(EnvironmentEdgeManager.currentTimeMillis() - before);
return r;
}
@@ -3949,6 +3714,7 @@ public class HRegionServer implements C
*/
protected Result increment(final HRegion region,
final Mutate mutate) throws IOException {
+ long before = EnvironmentEdgeManager.currentTimeMillis();
Increment increment = ProtobufUtil.toIncrement(mutate);
Result r = null;
if (region.getCoprocessorHost() != null) {
@@ -3961,6 +3727,7 @@ public class HRegionServer implements C
r = region.getCoprocessorHost().postIncrement(increment, r);
}
}
+ metricsRegionServer.updateIncrement(EnvironmentEdgeManager.currentTimeMillis() - before);
return r;
}
@@ -3975,7 +3742,8 @@ public class HRegionServer implements C
final HRegion region, final List<Mutate> mutates) {
@SuppressWarnings("unchecked")
Pair<Mutation, Integer>[] mutationsWithLocks = new Pair[mutates.size()];
-
+ long before = EnvironmentEdgeManager.currentTimeMillis();
+ boolean batchContainsPuts = false, batchContainsDelete = false;
try {
ActionResult.Builder resultBuilder = ActionResult.newBuilder();
NameBytesPair value = ProtobufUtil.toParameter(new Result());
@@ -3987,15 +3755,18 @@ public class HRegionServer implements C
Mutation mutation = null;
if (m.getMutateType() == MutateType.PUT) {
mutation = ProtobufUtil.toPut(m);
+ batchContainsPuts = true;
} else {
mutation = ProtobufUtil.toDelete(m);
+ batchContainsDelete = true;
}
Integer lock = getLockFromId(mutation.getLockId());
mutationsWithLocks[i++] = new Pair<Mutation, Integer>(mutation, lock);
builder.addResult(result);
}
- requestCount.addAndGet(mutates.size());
+
+ requestCount.add(mutates.size());
if (!region.getRegionInfo().isMetaTable()) {
cacheFlusher.reclaimMemStoreMemory();
}
@@ -4031,6 +3802,13 @@ public class HRegionServer implements C
builder.setResult(i, result);
}
}
+ long after = EnvironmentEdgeManager.currentTimeMillis();
+ if (batchContainsPuts) {
+ metricsRegionServer.updatePut(after - before);
+ }
+ if (batchContainsDelete) {
+ metricsRegionServer.updateDelete(after - before);
+ }
}
/**
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1406396&r1=1406395&r2=1406396&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java Tue Nov 6 23:22:01 2012
@@ -66,8 +66,6 @@ import org.apache.hadoop.hbase.monitorin
import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
-import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
-import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize;
@@ -106,7 +104,7 @@ import com.google.common.collect.Lists;
* not be called directly but by an HRegion manager.
*/
@InterfaceAudience.Private
-public class HStore extends SchemaConfigured implements Store {
+public class HStore implements Store {
static final Log LOG = LogFactory.getLog(HStore.class);
protected final MemStore memstore;
@@ -174,9 +172,7 @@ public class HStore extends SchemaConfig
protected HStore(Path basedir, HRegion region, HColumnDescriptor family,
FileSystem fs, Configuration confParam)
throws IOException {
- super(new CompoundConfiguration().add(confParam).add(
- family.getValues()), region.getRegionInfo().getTableNameAsString(),
- Bytes.toString(family.getName()));
+
HRegionInfo info = region.getRegionInfo();
this.fs = fs;
// Assemble the store's home directory.
@@ -260,6 +256,15 @@ public class HStore extends SchemaConfig
return ttl;
}
+ public String getColumnFamilyName() {
+ return this.family.getNameAsString();
+ }
+
+ @Override
+ public String getTableName() {
+ return this.region.getTableDesc().getNameAsString();
+ }
+
/**
* Create this store's homedir
* @param fs
@@ -414,7 +419,6 @@ public class HStore extends SchemaConfig
public StoreFile call() throws IOException {
StoreFile storeFile = new StoreFile(fs, p, conf, cacheConf,
family.getBloomFilterType(), dataBlockEncoder);
- passSchemaMetricsTo(storeFile);
storeFile.createReader();
return storeFile;
}
@@ -573,7 +577,6 @@ public class HStore extends SchemaConfig
StoreFile sf = new StoreFile(fs, dstPath, this.conf, this.cacheConf,
this.family.getBloomFilterType(), this.dataBlockEncoder);
- passSchemaMetricsTo(sf);
StoreFile.Reader r = sf.createReader();
this.storeSize += r.length();
@@ -817,19 +820,11 @@ public class HStore extends SchemaConfig
status.setStatus("Flushing " + this + ": reopening flushed file");
StoreFile sf = new StoreFile(this.fs, dstPath, this.conf, this.cacheConf,
this.family.getBloomFilterType(), this.dataBlockEncoder);
- passSchemaMetricsTo(sf);
StoreFile.Reader r = sf.createReader();
this.storeSize += r.length();
this.totalUncompressedBytes += r.getTotalUncompressedBytes();
- // This increments the metrics associated with total flushed bytes for this
- // family. The overall flush count is stored in the static metrics and
- // retrieved from HRegion.recentFlushes, which is set within
- // HRegion.internalFlushcache, which indirectly calls this to actually do
- // the flushing through the StoreFlusherImpl class
- getSchemaMetrics().updatePersistentStoreMetric(
- SchemaMetrics.StoreMetricType.FLUSH_SIZE, flushedSize.longValue());
if (LOG.isInfoEnabled()) {
LOG.info("Added " + sf + ", entries=" + r.getEntries() +
", sequenceid=" + logCacheFlushId +
@@ -875,11 +870,6 @@ public class HStore extends SchemaConfig
.withBytesPerChecksum(bytesPerChecksum)
.withCompression(compression)
.build();
- // The store file writer's path does not include the CF name, so we need
- // to configure the HFile writer directly.
- SchemaConfigured sc = (SchemaConfigured) w.writer;
- SchemaConfigured.resetSchemaMetricsConf(sc);
- passSchemaMetricsTo(sc);
return w;
}
@@ -1409,8 +1399,8 @@ public class HStore extends SchemaConfig
(forcemajor || isMajorCompaction(compactSelection.getFilesToCompact())) &&
(compactSelection.getFilesToCompact().size() < this.maxFilesToCompact
);
- LOG.debug(this.getHRegionInfo().getEncodedName() + " - " +
- this.getColumnFamilyName() + ": Initiating " +
+ LOG.debug(this.getHRegionInfo().getEncodedName() + " - "
+ + this.getColumnFamilyName() + ": Initiating " +
(majorcompaction ? "major" : "minor") + "compaction");
if (!majorcompaction &&
@@ -1523,7 +1513,6 @@ public class HStore extends SchemaConfig
storeFile = new StoreFile(this.fs, path, this.conf,
this.cacheConf, this.family.getBloomFilterType(),
NoOpDataBlockEncoder.INSTANCE);
- passSchemaMetricsTo(storeFile);
storeFile.createReader();
} catch (IOException e) {
LOG.error("Failed to open store file : " + path
@@ -1575,7 +1564,6 @@ public class HStore extends SchemaConfig
}
result = new StoreFile(this.fs, destPath, this.conf, this.cacheConf,
this.family.getBloomFilterType(), this.dataBlockEncoder);
- passSchemaMetricsTo(result);
result.createReader();
}
try {
@@ -1936,7 +1924,7 @@ public class HStore extends SchemaConfig
@Override
public String toString() {
- return getColumnFamilyName();
+ return this.getColumnFamilyName();
}
@Override
@@ -2125,9 +2113,8 @@ public class HStore extends SchemaConfig
}
public static final long FIXED_OVERHEAD =
- ClassSize.align(SchemaConfigured.SCHEMA_CONFIGURED_UNALIGNED_HEAP_SIZE +
- + (17 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG)
- + (5 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
+ ClassSize.align((19 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG)
+ + (5 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
+ ClassSize.OBJECT + ClassSize.REENTRANT_LOCK
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java?rev=1406396&r1=1406395&r2=1406396&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java Tue Nov 6 23:22:01 2012
@@ -416,7 +416,6 @@ class MemStoreFlusher extends HasThread
server.compactSplitThread.requestCompaction(region, getName());
}
- server.getMetrics().addFlush(region.getRecentFlushInfo());
} catch (DroppedSnapshotException ex) {
// Cache flush can fail in a few places. If it fails in a critical
// section, we get a DroppedSnapshotException and a replay of hlog
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegion.java?rev=1406396&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegion.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegion.java Tue Nov 6 23:22:01 2012
@@ -0,0 +1,66 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.CompatibilityFactory;
+
+
+/**
+ * This is the glue between the HRegion and whatever hadoop shim layer
+ * is loaded (hbase-hadoop1-compat or hbase-hadoop2-compat).
+ */
+@InterfaceAudience.Private
+public class MetricsRegion {
+
+ private MetricsRegionSource source;
+
+ public MetricsRegion(MetricsRegionWrapper wrapper) {
+ source = CompatibilityFactory.getInstance(MetricsRegionServerSourceFactory.class)
+ .createRegion(wrapper);
+ }
+
+ public void close() {
+ source.close();
+ }
+
+ public void updatePut() {
+ source.updatePut();
+ }
+
+ public void updateDelete() {
+ source.updateDelete();
+ }
+
+ public void updateGet() {
+ source.updateGet();
+ }
+
+ public void updateAppend() {
+ source.updateAppend();
+ }
+
+ public void updateIncrement() {
+ source.updateIncrement();
+ }
+
+ MetricsRegionSource getSource() {
+ return source;
+ }
+}
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java?rev=1406396&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java Tue Nov 6 23:22:01 2012
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
+
+/**
+ * This class is for maintaining the various regionserver statistics
+ * and publishing them through the metrics interfaces.
+ * <p>
+ * This class has a number of metrics variables that are publicly accessible;
+ * these variables (objects) have methods to update their values.
+ */
+@InterfaceStability.Evolving
+@InterfaceAudience.Private
+public class MetricsRegionServer {
+ private final Log LOG = LogFactory.getLog(this.getClass());
+ private MetricsRegionServerSource serverSource;
+ private MetricsRegionServerWrapper regionServerWrapper;
+
+ public MetricsRegionServer(MetricsRegionServerWrapper regionServerWrapper) {
+ this.regionServerWrapper = regionServerWrapper;
+ serverSource =
+ CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
+ .createServer(regionServerWrapper);
+ }
+
+ // for unit-test usage
+ public MetricsRegionServerSource getMetricsSource() {
+ return serverSource;
+ }
+
+ public MetricsRegionServerWrapper getRegionServerWrapper() {
+ return regionServerWrapper;
+ }
+
+ public void updatePut(long t){
+ serverSource.updatePut(t);
+ }
+
+ public void updateDelete(long t){
+ serverSource.updateDelete(t);
+ }
+
+ public void updateGet(long t){
+ serverSource.updateGet(t);
+ }
+
+ public void updateIncrement(long t){
+ serverSource.updateIncrement(t);
+ }
+
+ public void updateAppend(long t){
+ serverSource.updateAppend(t);
+ }
+}
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java?rev=1406396&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java Tue Nov 6 23:22:01 2012
@@ -0,0 +1,395 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
+import org.apache.hadoop.hbase.HDFSBlocksDistribution;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.io.hfile.BlockCache;
+import org.apache.hadoop.hbase.io.hfile.CacheStats;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.metrics2.MetricsExecutor;
+
+import java.util.Collection;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Impl for exposing HRegionServer Information through Hadoop's metrics 2 system.
+ */
+@InterfaceAudience.Private
+class MetricsRegionServerWrapperImpl
+ implements MetricsRegionServerWrapper {
+
+ public static final Log LOG = LogFactory.getLog(MetricsRegionServerWrapperImpl.class);
+
+ public static final int PERIOD = 15;
+
+ private final HRegionServer regionServer;
+ private final BlockCache blockCache;
+
+ private volatile long numStores = 0;
+ private volatile long numStoreFiles = 0;
+ private volatile long memstoreSize = 0;
+ private volatile long storeFileSize = 0;
+ private volatile double requestsPerSecond = 0.0;
+ private volatile long readRequestsCount = 0;
+ private volatile long writeRequestsCount = 0;
+ private volatile long checkAndMutateChecksFailed = 0;
+ private volatile long checkAndMutateChecksPassed = 0;
+ private volatile long storefileIndexSize = 0;
+ private volatile long totalStaticIndexSize = 0;
+ private volatile long totalStaticBloomSize = 0;
+ private volatile long numPutsWithoutWAL = 0;
+ private volatile long dataInMemoryWithoutWAL = 0;
+ private volatile int percentFileLocal = 0;
+
+ private CacheStats cacheStats;
+ private ScheduledExecutorService executor;
+ private Runnable runnable;
+
+ public MetricsRegionServerWrapperImpl(final HRegionServer regionServer) {
+ this.regionServer = regionServer;
+ this.blockCache = this.regionServer.cacheConfig.getBlockCache();
+ this.cacheStats = blockCache.getStats();
+ this.executor = CompatibilitySingletonFactory.getInstance(MetricsExecutor.class).getExecutor();
+ this.runnable = new RegionServerMetricsWrapperRunnable();
+ this.executor.scheduleWithFixedDelay(this.runnable, PERIOD, PERIOD, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public String getClusterId() {
+ return regionServer.getClusterId();
+ }
+
+ @Override
+ public long getStartCode() {
+ return regionServer.getStartcode();
+ }
+
+ @Override
+ public String getZookeeperQuorum() {
+ ZooKeeperWatcher zk = regionServer.getZooKeeperWatcher();
+ if (zk == null) {
+ return "";
+ }
+ return zk.getQuorum();
+ }
+
+ @Override
+ public String getCoprocessors() {
+ String[] coprocessors = regionServer.getCoprocessors();
+ if (coprocessors == null || coprocessors.length == 0) {
+ return "";
+ }
+ return StringUtils.join(coprocessors, ", ");
+ }
+
+ @Override
+ public String getServerName() {
+ ServerName serverName = regionServer.getServerName();
+ if (serverName == null) {
+ return "";
+ }
+ return serverName.getServerName();
+ }
+
+ @Override
+ public long getNumOnlineRegions() {
+ Collection<HRegion> onlineRegionsLocalContext = regionServer.getOnlineRegionsLocalContext();
+ if (onlineRegionsLocalContext == null) {
+ return 0;
+ }
+ return onlineRegionsLocalContext.size();
+ }
+
+ @Override
+ public long getTotalRequestCount() {
+ return regionServer.requestCount.get();
+ }
+
+ @Override
+ public int getCompactionQueueSize() {
+ //The thread could be zero. if so assume there is no queue.
+ if (this.regionServer.compactSplitThread == null) {
+ return 0;
+ }
+ return this.regionServer.compactSplitThread.getCompactionQueueSize();
+ }
+
+ @Override
+ public int getFlushQueueSize() {
+ //If there is no flusher there should be no queue.
+ if (this.regionServer.cacheFlusher == null) {
+ return 0;
+ }
+ return this.regionServer.cacheFlusher.getFlushQueueSize();
+ }
+
+ @Override
+ public long getBlockCacheCount() {
+ if (this.blockCache == null) {
+ return 0;
+ }
+ return this.blockCache.size();
+ }
+
+ @Override
+ public long getBlockCacheSize() {
+ if (this.blockCache == null) {
+ return 0;
+ }
+ return this.blockCache.getCurrentSize();
+ }
+
+ @Override
+ public long getBlockCacheFreeSize() {
+ if (this.blockCache == null) {
+ return 0;
+ }
+ return this.blockCache.getFreeSize();
+ }
+
+ @Override
+ public long getBlockCacheHitCount() {
+ if (this.cacheStats == null) {
+ return 0;
+ }
+ return this.cacheStats.getHitCount();
+ }
+
+ @Override
+ public long getBlockCacheMissCount() {
+ if (this.cacheStats == null) {
+ return 0;
+ }
+ return this.cacheStats.getMissCount();
+ }
+
+ @Override
+ public long getBlockCacheEvictedCount() {
+ if (this.cacheStats == null) {
+ return 0;
+ }
+ return this.cacheStats.getEvictedCount();
+ }
+
+ @Override
+ public int getBlockCacheHitPercent() {
+ if (this.cacheStats == null) {
+ return 0;
+ }
+ return (int) (this.cacheStats.getHitRatio() * 100);
+ }
+
+ @Override
+ public int getBlockCacheHitCachingPercent() {
+ if (this.cacheStats == null) {
+ return 0;
+ }
+ return (int) (this.cacheStats.getHitCachingRatio() * 100);
+ }
+
+ @Override public void forceRecompute() {
+ this.runnable.run();
+ }
+
+ @Override
+ public long getNumStores() {
+ return numStores;
+ }
+
+ @Override
+ public long getNumStoreFiles() {
+ return numStoreFiles;
+ }
+
+ @Override
+ public long getMemstoreSize() {
+ return memstoreSize;
+ }
+
+ @Override
+ public long getStoreFileSize() {
+ return storeFileSize;
+ }
+
+ @Override public double getRequestsPerSecond() {
+ return requestsPerSecond;
+ }
+
+ @Override
+ public long getReadRequestsCount() {
+ return readRequestsCount;
+ }
+
+ @Override
+ public long getWriteRequestsCount() {
+ return writeRequestsCount;
+ }
+
+ @Override
+ public long getCheckAndMutateChecksFailed() {
+ return checkAndMutateChecksFailed;
+ }
+
+ @Override
+ public long getCheckAndMutateChecksPassed() {
+ return checkAndMutateChecksPassed;
+ }
+
+ @Override
+ public long getStoreFileIndexSize() {
+ return storefileIndexSize;
+ }
+
+ @Override
+ public long getTotalStaticIndexSize() {
+ return totalStaticIndexSize;
+ }
+
+ @Override
+ public long getTotalStaticBloomSize() {
+ return totalStaticBloomSize;
+ }
+
+ @Override
+ public long getNumPutsWithoutWAL() {
+ return numPutsWithoutWAL;
+ }
+
+ @Override
+ public long getDataInMemoryWithoutWAL() {
+ return dataInMemoryWithoutWAL;
+ }
+
+ @Override
+ public int getPercentFileLocal() {
+ return percentFileLocal;
+ }
+
+ @Override
+ public long getUpdatesBlockedTime() {
+ if (this.regionServer.cacheFlusher == null) {
+ return 0;
+ }
+ return this.regionServer.cacheFlusher.getUpdatesBlockedMsHighWater().get();
+ }
+
+
+ /**
+ * This is the runnable that will be executed on the executor every PERIOD number of seconds
+ * It will take metrics/numbers from all of the regions and use them to compute point in
+ * time metrics.
+ */
+ public class RegionServerMetricsWrapperRunnable implements Runnable {
+
+ private long lastRan = 0;
+ private long lastRequestCount = 0;
+
+ @Override
+ synchronized public void run() {
+
+ cacheStats = blockCache.getStats();
+
+ HDFSBlocksDistribution hdfsBlocksDistribution =
+ new HDFSBlocksDistribution();
+
+ long tempNumStores = 0;
+ long tempNumStoreFiles = 0;
+ long tempMemstoreSize = 0;
+ long tempStoreFileSize = 0;
+ long tempReadRequestsCount = 0;
+ long tempWriteRequestsCount = 0;
+ long tempCheckAndMutateChecksFailed = 0;
+ long tempCheckAndMutateChecksPassed = 0;
+ long tempStorefileIndexSize = 0;
+ long tempTotalStaticIndexSize = 0;
+ long tempTotalStaticBloomSize = 0;
+ long tempNumPutsWithoutWAL = 0;
+ long tempDataInMemoryWithoutWAL = 0;
+ int tempPercentFileLocal = 0;
+
+
+ for (HRegion r : regionServer.getOnlineRegionsLocalContext()) {
+ tempNumPutsWithoutWAL += r.numPutsWithoutWAL.get();
+ tempDataInMemoryWithoutWAL += r.dataInMemoryWithoutWAL.get();
+ tempReadRequestsCount += r.readRequestsCount.get();
+ tempWriteRequestsCount += r.writeRequestsCount.get();
+ tempCheckAndMutateChecksFailed += r.checkAndMutateChecksFailed.get();
+ tempCheckAndMutateChecksPassed += r.checkAndMutateChecksPassed.get();
+ tempNumStores += r.stores.size();
+ for (Store store : r.stores.values()) {
+ tempNumStoreFiles += store.getStorefilesCount();
+ tempMemstoreSize += store.getMemStoreSize();
+ tempStoreFileSize += store.getStorefilesSize();
+ tempStorefileIndexSize += store.getStorefilesIndexSize();
+ tempTotalStaticBloomSize += store.getTotalStaticBloomSize();
+ tempTotalStaticIndexSize += store.getTotalStaticIndexSize();
+ }
+
+ hdfsBlocksDistribution.add(r.getHDFSBlocksDistribution());
+ }
+
+ float localityIndex = hdfsBlocksDistribution.getBlockLocalityIndex(
+ regionServer.getServerName().getHostname());
+ tempPercentFileLocal = (int) (localityIndex * 100);
+
+
+ //Compute the number of requests per second
+ long currentTime = EnvironmentEdgeManager.currentTimeMillis();
+
+ // assume that it took PERIOD seconds to start the executor.
+ // this is a guess but it's a pretty good one.
+ if (lastRan == 0) {
+ lastRan = currentTime - (PERIOD*1000);
+ }
+
+
+ //If we've time traveled keep the last requests per second.
+ if ((currentTime - lastRan) > 10) {
+ long currentRequestCount = getTotalRequestCount();
+ requestsPerSecond = (currentRequestCount - lastRequestCount) / ((currentTime - lastRan) / 1000.0);
+ lastRequestCount = currentRequestCount;
+ }
+ lastRan = currentTime;
+
+ //Copy over computed values so that no thread sees half computed values.
+ numStores = tempNumStores;
+ numStoreFiles = tempNumStoreFiles;
+ memstoreSize = tempMemstoreSize;
+ storeFileSize = tempStoreFileSize;
+ readRequestsCount = tempReadRequestsCount;
+ writeRequestsCount = tempWriteRequestsCount;
+ checkAndMutateChecksFailed = tempCheckAndMutateChecksFailed;
+ checkAndMutateChecksPassed = tempCheckAndMutateChecksPassed;
+ storefileIndexSize = tempStorefileIndexSize;
+ totalStaticIndexSize = tempTotalStaticIndexSize;
+ totalStaticBloomSize = tempTotalStaticBloomSize;
+ numPutsWithoutWAL = tempNumPutsWithoutWAL;
+ dataInMemoryWithoutWAL = tempDataInMemoryWithoutWAL;
+ percentFileLocal = tempPercentFileLocal;
+ }
+ }
+
+}
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java?rev=1406396&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java Tue Nov 6 23:22:01 2012
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
+import org.apache.hadoop.metrics2.MetricsExecutor;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class MetricsRegionWrapperImpl implements MetricsRegionWrapper {
+
+ public static final int PERIOD = 45;
+
+ private final HRegion region;
+ private ScheduledExecutorService executor;
+ private Runnable runnable;
+ private long numStoreFiles;
+ private long memstoreSize;
+ private long storeFileSize;
+
+ public MetricsRegionWrapperImpl(HRegion region) {
+ this.region = region;
+ this.executor = CompatibilitySingletonFactory.getInstance(MetricsExecutor.class).getExecutor();
+ this.runnable = new HRegionMetricsWrapperRunnable();
+ this.executor.scheduleWithFixedDelay(this.runnable, PERIOD, PERIOD, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public String getTableName() {
+ return this.region.getTableDesc().getNameAsString();
+ }
+
+ @Override
+ public String getRegionName() {
+ return this.region.getRegionInfo().getEncodedName();
+ }
+
+ @Override
+ public long getNumStores() {
+ return this.region.stores.size();
+ }
+
+ @Override
+ public long getNumStoreFiles() {
+ return numStoreFiles;
+ }
+
+ @Override
+ public long getMemstoreSize() {
+ return memstoreSize;
+ }
+
+ @Override
+ public long getStoreFileSize() {
+ return storeFileSize;
+ }
+
+ @Override
+ public long getReadRequestCount() {
+ return this.region.getReadRequestsCount();
+ }
+
+ @Override
+ public long getWriteRequestCount() {
+ return this.region.getWriteRequestsCount();
+ }
+
+ public class HRegionMetricsWrapperRunnable implements Runnable {
+
+ @Override
+ public void run() {
+ long tempNumStoreFiles = 0;
+ long tempMemstoreSize = 0;
+ long tempStoreFileSize = 0;
+
+ for (Store store : region.stores.values()) {
+ tempNumStoreFiles += store.getStorefilesCount();
+ tempMemstoreSize += store.getMemStoreSize();
+ tempStoreFileSize += store.getStorefilesSize();
+ }
+
+ numStoreFiles = tempNumStoreFiles;
+ memstoreSize = tempMemstoreSize;
+ storeFileSize = tempStoreFileSize;
+ }
+ }
+
+}
\ No newline at end of file
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java?rev=1406396&r1=1406395&r2=1406396&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java Tue Nov 6 23:22:01 2012
@@ -66,7 +66,6 @@ class SplitRequest implements Runnable {
if (!st.prepare()) return;
try {
st.execute(this.server, this.server);
- this.server.getMetrics().incrementSplitSuccessCount(System.currentTimeMillis() - startTime);
} catch (Exception e) {
if (this.server.isStopping() || this.server.isStopped()) {
LOG.info(
@@ -81,7 +80,6 @@ class SplitRequest implements Runnable {
if (st.rollback(this.server, this.server)) {
LOG.info("Successful rollback of failed split of " +
parent.getRegionNameAsString());
- this.server.getMetrics().incrementSplitFailureCount();
} else {
this.server.abort("Abort; we got an error after point-of-no-return");
}
@@ -102,7 +100,6 @@ class SplitRequest implements Runnable {
} catch (IOException ex) {
LOG.error("Split failed " + this, RemoteExceptionHandler
.checkIOException(ex));
- this.server.getMetrics().incrementSplitFailureCount();
server.checkFileSystem();
} finally {
if (this.parent.getCoprocessorHost() != null) {
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1406396&r1=1406395&r2=1406396&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Tue Nov 6 23:22:01 2012
@@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.io.hfile.
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
-import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics.SchemaAware;
import com.google.common.collect.ImmutableList;
@@ -42,7 +41,7 @@ import com.google.common.collect.Immutab
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-public interface Store extends SchemaAware, HeapSize {
+public interface Store extends HeapSize {
/* The default priority for user-specified compaction requests.
* The user gets top priority unless we have blocking compactions. (Pri <= 0)
@@ -287,4 +286,8 @@ public interface Store extends SchemaAwa
* @return the parent region hosting this store
*/
public HRegion getHRegion();
+
+ public String getColumnFamilyName();
+
+ public String getTableName();
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1406396&r1=1406395&r2=1406396&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Tue Nov 6 23:22:01 2012
@@ -58,8 +58,6 @@ import org.apache.hadoop.hbase.io.hfile.
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.HFileWriterV1;
import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
-import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
-import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
import org.apache.hadoop.hbase.util.ChecksumType;
@@ -80,7 +78,7 @@ import com.google.common.collect.Orderin
/**
* A Store data file. Stores usually have one or more of these files. They
* are produced by flushing the memstore to disk. To
- * create, instantiate a writer using {@link StoreFile#WriterBuilder}
+ * create, instantiate a writer using {@link StoreFile.WriterBuilder}
* and append data. Be sure to add any metadata before calling close on the
* Writer (Use the appendMetadata convenience methods). On close, a StoreFile
* is sitting in the Filesystem. To refer to it, create a StoreFile instance
@@ -91,7 +89,7 @@ import com.google.common.collect.Orderin
* writer and a reader is that we write once but read a lot more.
*/
@InterfaceAudience.LimitedPrivate("Coprocessor")
-public class StoreFile extends SchemaConfigured {
+public class StoreFile {
static final Log LOG = LogFactory.getLog(StoreFile.class.getName());
public static enum BloomType {
@@ -277,7 +275,6 @@ public class StoreFile extends SchemaCon
this.modificationTimeStamp = 0;
}
- SchemaMetrics.configureGlobally(conf);
}
/**
@@ -545,11 +542,6 @@ public class StoreFile extends SchemaCon
dataBlockEncoder.getEncodingInCache());
}
- if (isSchemaConfigured()) {
- SchemaConfigured.resetSchemaMetricsConf(reader);
- passSchemaMetricsTo(reader);
- }
-
computeHDFSBlockDistribution();
// Load up indices and fileinfo. This also loads Bloom filter type.
@@ -1287,7 +1279,7 @@ public class StoreFile extends SchemaCon
/**
* Reader for a StoreFile.
*/
- public static class Reader extends SchemaConfigured {
+ public static class Reader {
static final Log LOG = LogFactory.getLog(Reader.class.getName());
protected BloomFilter generalBloomFilter = null;
@@ -1301,7 +1293,6 @@ public class StoreFile extends SchemaCon
public Reader(FileSystem fs, Path path, CacheConfig cacheConf,
DataBlockEncoding preferredEncodingInCache) throws IOException {
- super(path);
reader = HFile.createReaderWithEncoding(fs, path, cacheConf,
preferredEncodingInCache);
bloomFilterType = BloomType.NONE;
@@ -1310,7 +1301,6 @@ public class StoreFile extends SchemaCon
public Reader(FileSystem fs, Path path, HFileLink hfileLink, long size,
CacheConfig cacheConf, DataBlockEncoding preferredEncodingInCache,
boolean closeIStream) throws IOException {
- super(path);
FSDataInputStream in = hfileLink.open(fs);
FSDataInputStream inNoChecksum = in;
@@ -1584,7 +1574,6 @@ public class StoreFile extends SchemaCon
&& bloomFilter.contains(key, 0, key.length, bloom);
}
- getSchemaMetrics().updateBloomMetrics(exists);
return exists;
}
} catch (IOException e) {
@@ -1728,10 +1717,6 @@ public class StoreFile extends SchemaCon
return reader.indexSize();
}
- public String getColumnFamilyName() {
- return reader.getColumnFamilyName();
- }
-
public BloomType getBloomFilterType() {
return this.bloomFilterType;
}
@@ -1774,11 +1759,6 @@ public class StoreFile extends SchemaCon
public long getMaxTimestamp() {
return timeRangeTracker.maximumTimestamp;
}
-
- @Override
- public void schemaConfigurationChanged() {
- passSchemaMetricsTo((SchemaConfigured) reader);
- }
}
/**
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1406396&r1=1406395&r2=1406396&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Tue Nov 6 23:22:01 2012
@@ -33,8 +33,6 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.regionserver.HStore.ScanInfo;
-import org.apache.hadoop.hbase.regionserver.metrics.RegionMetricsStorage;
-import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -110,7 +108,6 @@ public class StoreScanner extends NonLaz
throws IOException {
this(store, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
scanInfo.getMinVersions());
- initializeMetricNames();
if (columns != null && scan.isRaw()) {
throw new DoNotRetryIOException(
"Cannot specify any column for a raw scan");
@@ -163,7 +160,6 @@ public class StoreScanner extends NonLaz
long smallestReadPoint, long earliestPutTs) throws IOException {
this(store, false, scan, null, scanInfo.getTtl(),
scanInfo.getMinVersions());
- initializeMetricNames();
matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType,
smallestReadPoint, earliestPutTs, oldestUnexpiredTS);
@@ -194,7 +190,6 @@ public class StoreScanner extends NonLaz
throws IOException {
this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
scanInfo.getMinVersions());
- this.initializeMetricNames();
this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS);
@@ -206,23 +201,6 @@ public class StoreScanner extends NonLaz
}
/**
- * Method used internally to initialize metric names throughout the
- * constructors.
- *
- * To be called after the store variable has been initialized!
- */
- private void initializeMetricNames() {
- String tableName = SchemaMetrics.UNKNOWN;
- String family = SchemaMetrics.UNKNOWN;
- if (store != null) {
- tableName = store.getTableName();
- family = Bytes.toString(store.getFamily().getName());
- }
- this.metricNamePrefix =
- SchemaMetrics.generateSchemaMetricsPrefix(tableName, family);
- }
-
- /**
* Get a filtered list of scanners. Assumes we are not in a compaction.
* @return list of scanners to seek
*/
@@ -458,8 +436,7 @@ public class StoreScanner extends NonLaz
}
} finally {
if (cumulativeMetric > 0 && metric != null) {
- RegionMetricsStorage.incrNumericMetric(this.metricNamePrefix + metric,
- cumulativeMetric);
+
}
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java?rev=1406396&r1=1406395&r2=1406396&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java Tue Nov 6 23:22:01 2012
@@ -253,7 +253,6 @@ public class CompactionRequest implement
LOG.info(((completed) ? "completed" : "aborted") + " compaction: " +
this + "; duration=" + StringUtils.formatTimeDiff(now, start));
if (completed) {
- server.getMetrics().addCompaction(now - start, this.totalSize);
// degenerate case: blocked regions require recursive enqueues
if (s.getCompactPriority() <= 0) {
server.compactSplitThread
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java?rev=1406396&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java Tue Nov 6 23:22:01 2012
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSource;
+
+/**
+ * This class is for maintaining the various replication statistics for a sink and publishing them
+ * through the metrics interfaces.
+ */
+@InterfaceAudience.Private
+public class MetricsSink {
+
+ public static final String SINK_AGE_OF_LAST_APPLIED_OP = "sink.ageOfLastAppliedOp";
+ public static final String SINK_APPLIED_BATCHES = "sink.appliedBatches";
+ public static final String SINK_APPLIED_OPS = "sink.appliedOps";
+
+ private MetricsReplicationSource rms;
+
+ public MetricsSink() {
+ rms = CompatibilitySingletonFactory.getInstance(MetricsReplicationSource.class);
+ }
+
+ /**
+ * Set the age of the last applied operation
+ *
+ * @param timestamp The timestamp of the last operation applied.
+ */
+ public void setAgeOfLastAppliedOp(long timestamp) {
+ long age = System.currentTimeMillis() - timestamp;
+ rms.setGauge(SINK_AGE_OF_LAST_APPLIED_OP, age);
+ }
+
+ /**
+ * Convience method to change metrics when a batch of operations are applied.
+ *
+ * @param batchSize
+ */
+ public void applyBatch(long batchSize) {
+ rms.incCounters(SINK_APPLIED_BATCHES, 1);
+ rms.incCounters(SINK_APPLIED_OPS, batchSize);
+ }
+
+}
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java?rev=1406396&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java Tue Nov 6 23:22:01 2012
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+/**
+ * This class is for maintaining the various replication statistics for a source and publishing them
+ * through the metrics interfaces.
+ */
+@InterfaceAudience.Private
+public class MetricsSource {
+
+ public static final String SOURCE_SIZE_OF_LOG_QUEUE = "source.sizeOfLogQueue";
+ public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = "source.ageOfLastShippedOp";
+ public static final String SOURCE_LOG_EDITS_READ = "source.logEditsRead";
+ public static final String SOURCE_LOG_EDITS_FILTERED = "source.logEditsFiltered";
+ public static final String SOURCE_SHIPPED_BATCHES = "source.shippedBatches";
+ public static final String SOURCE_SHIPPED_OPS = "source.shippedOps";
+
+ public static final Log LOG = LogFactory.getLog(MetricsSource.class);
+ private String id;
+
+ private long lastTimestamp = 0;
+ private int lastQueueSize = 0;
+
+ private String sizeOfLogQueKey;
+ private String ageOfLastShippedOpKey;
+ private String logEditsReadKey;
+ private String logEditsFilteredKey;
+ private final String shippedBatchesKey;
+ private final String shippedOpsKey;
+
+ private MetricsReplicationSource rms;
+
+ /**
+ * Constructor used to register the metrics
+ *
+ * @param id Name of the source this class is monitoring
+ */
+ public MetricsSource(String id) {
+ this.id = id;
+
+ sizeOfLogQueKey = "source." + id + ".sizeOfLogQueue";
+ ageOfLastShippedOpKey = "source." + id + ".ageOfLastShippedOp";
+ logEditsReadKey = "source." + id + ".logEditsRead";
+ logEditsFilteredKey = "source." + id + ".logEditsFiltered";
+ shippedBatchesKey = "source." + this.id + ".shippedBatches";
+ shippedOpsKey = "source." + this.id + ".shippedOps";
+ rms = CompatibilitySingletonFactory.getInstance(MetricsReplicationSource.class);
+ }
+
+ /**
+ * Set the age of the last edit that was shipped
+ *
+ * @param timestamp write time of the edit
+ */
+ public void setAgeOfLastShippedOp(long timestamp) {
+ long age = EnvironmentEdgeManager.currentTimeMillis() - timestamp;
+ rms.setGauge(ageOfLastShippedOpKey, age);
+ rms.setGauge(SOURCE_AGE_OF_LAST_SHIPPED_OP, age);
+ this.lastTimestamp = timestamp;
+ }
+
+ /**
+ * Convenience method to use the last given timestamp to refresh the age of the last edit. Used
+ * when replication fails and need to keep that metric accurate.
+ */
+ public void refreshAgeOfLastShippedOp() {
+ if (this.lastTimestamp > 0) {
+ setAgeOfLastShippedOp(this.lastTimestamp);
+ }
+ }
+
+ /**
+ * Set the size of the log queue
+ *
+ * @param size the size.
+ */
+ public void setSizeOfLogQueue(int size) {
+ rms.setGauge(sizeOfLogQueKey, size);
+ rms.incGauge(SOURCE_SIZE_OF_LOG_QUEUE, size - lastQueueSize);
+ lastQueueSize = size;
+ }
+
+ /**
+ * Add on the the number of log edits read
+ *
+ * @param delta the number of log edits read.
+ */
+ private void incrLogEditsRead(long delta) {
+ rms.incCounters(logEditsReadKey, delta);
+ rms.incCounters(SOURCE_LOG_EDITS_READ, delta);
+ }
+
+ /** Increment the number of log edits read by one. */
+ public void incrLogEditsRead() {
+ incrLogEditsRead(1);
+ }
+
+ /**
+ * Add on the number of log edits filtered
+ *
+ * @param delta the number filtered.
+ */
+ private void incrLogEditsFiltered(long delta) {
+ rms.incCounters(logEditsFilteredKey, delta);
+ rms.incCounters(SOURCE_LOG_EDITS_FILTERED, delta);
+ }
+
+ /** The number of log edits filtered out. */
+ public void incrLogEditsFiltered() {
+ incrLogEditsFiltered(1);
+ }
+
+ /**
+ * Convience method to apply changes to metrics do to shipping a batch of logs.
+ *
+ * @param batchSize the size of the batch that was shipped to sinks.
+ */
+ public void shipBatch(long batchSize) {
+ rms.incCounters(shippedBatchesKey, 1);
+ rms.incCounters(SOURCE_SHIPPED_BATCHES, 1);
+ rms.incCounters(shippedOpsKey, batchSize);
+ rms.incCounters(SOURCE_SHIPPED_OPS, batchSize);
+ }
+
+ /** Removes all metrics about this Source. */
+ public void clear() {
+ rms.removeMetric(sizeOfLogQueKey);
+ rms.decGauge(SOURCE_SIZE_OF_LOG_QUEUE, lastQueueSize);
+ lastQueueSize = 0;
+ rms.removeMetric(ageOfLastShippedOpKey);
+
+ rms.removeMetric(logEditsFilteredKey);
+ rms.removeMetric(logEditsReadKey);
+
+ }
+}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java?rev=1406396&r1=1406395&r2=1406396&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java Tue Nov 6 23:22:01 2012
@@ -46,7 +46,6 @@ import org.apache.hadoop.hbase.client.Pu
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.replication.regionserver.metrics.ReplicationSinkMetrics;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
@@ -73,7 +72,7 @@ public class ReplicationSink {
private final Configuration conf;
private final ExecutorService sharedThreadPool;
private final HConnection sharedHtableCon;
- private final ReplicationSinkMetrics metrics;
+ private final MetricsSink metrics;
/**
* Create a sink for replication
@@ -86,7 +85,7 @@ public class ReplicationSink {
throws IOException {
this.conf = HBaseConfiguration.create(conf);
decorateConf();
- this.metrics = new ReplicationSinkMetrics();
+ this.metrics = new MetricsSink();
this.sharedHtableCon = HConnectionManager.createConnection(this.conf);
this.sharedThreadPool = new ThreadPoolExecutor(1,
conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE),
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1406396&r1=1406395&r2=1406396&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Tue Nov 6 23:22:01 2012
@@ -56,12 +56,8 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
-import org.apache.hadoop.hbase.replication.regionserver.metrics.ReplicationSourceMetrics;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.zookeeper.KeeperException;
@@ -141,7 +137,7 @@ public class ReplicationSource extends T
// Indicates if this particular source is running
private volatile boolean running = true;
// Metrics for this source
- private ReplicationSourceMetrics metrics;
+ private MetricsSource metrics;
/**
* Instantiation method used by region servers
@@ -188,7 +184,7 @@ public class ReplicationSource extends T
this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000);
this.fs = fs;
- this.metrics = new ReplicationSourceMetrics(peerClusterZnode);
+ this.metrics = new MetricsSource(peerClusterZnode);
try {
this.clusterId = zkHelper.getUUIDForCluster(zkHelper.getZookeeperWatcher());