You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by el...@apache.org on 2017/07/19 23:11:16 UTC
[2/3] phoenix git commit: PHOENIX-4042 Add hadoop metrics2-based
Indexer coproc metrics
PHOENIX-4042 Add hadoop metrics2-based Indexer coproc metrics
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/6dea0117
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/6dea0117
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/6dea0117
Branch: refs/heads/4.x-HBase-1.2
Commit: 6dea01173e95c36266b43cc2004bfcd29451ff68
Parents: f3a5242
Author: Josh Elser <el...@apache.org>
Authored: Wed Jul 19 17:02:09 2017 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed Jul 19 18:41:24 2017 -0400
----------------------------------------------------------------------
phoenix-core/pom.xml | 1 -
.../org/apache/phoenix/hbase/index/Indexer.java | 186 ++++++++++++++++---
.../index/metrics/MetricsIndexerSource.java | 150 +++++++++++++++
.../metrics/MetricsIndexerSourceFactory.java | 38 ++++
.../index/metrics/MetricsIndexerSourceImpl.java | 136 ++++++++++++++
pom.xml | 1 -
6 files changed, 480 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6dea0117/phoenix-core/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml
index 72f91ac..275b72f 100644
--- a/phoenix-core/pom.xml
+++ b/phoenix-core/pom.xml
@@ -437,7 +437,6 @@
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop2-compat</artifactId>
- <scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6dea0117/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index 55aeeaa..5a78c94 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -70,6 +70,8 @@ import org.apache.htrace.TraceScope;
import org.apache.phoenix.coprocessor.DelegateRegionCoprocessorEnvironment;
import org.apache.phoenix.hbase.index.builder.IndexBuildManager;
import org.apache.phoenix.hbase.index.builder.IndexBuilder;
+import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSource;
+import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
@@ -82,6 +84,7 @@ import org.apache.phoenix.hbase.index.write.recovery.PerRegionIndexWriteCache;
import org.apache.phoenix.hbase.index.write.recovery.StoreFailuresInCachePolicy;
import org.apache.phoenix.trace.TracingUtils;
import org.apache.phoenix.trace.util.NullSpan;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ServerUtil;
@@ -126,6 +129,17 @@ public class Indexer extends BaseRegionObserver {
private static final String INDEX_RECOVERY_FAILURE_POLICY_KEY = "org.apache.hadoop.hbase.index.recovery.failurepolicy";
+ private static final String INDEXER_INDEX_WRITE_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.post.batch.mutate.threshold";
+ private static final long INDEXER_INDEX_WRITE_SLOW_THRESHOLD_DEFAULT = 3_000;
+ private static final String INDEXER_INDEX_PREPARE_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.pre.batch.mutate.threshold";
+ private static final long INDEXER_INDEX_PREPARE_SLOW_THREHSOLD_DEFAULT = 3_000;
+ private static final String INDEXER_PRE_WAL_RESTORE_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.pre.wal.restore.threshold";
+ private static final long INDEXER_PRE_WAL_RESTORE_SLOW_THRESHOLD_DEFAULT = 3_000;
+ private static final String INDEXER_POST_OPEN_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.open.threshold";
+ private static final long INDEXER_POST_OPEN_SLOW_THRESHOLD_DEFAULT = 3_000;
+ private static final String INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.pre.increment";
+ private static final long INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_DEFAULT = 3_000;
+
/**
* cache the failed updates to the various regions. Used for making the WAL recovery mechanisms
* more robust in the face of recoverying index regions that were on the same server as the
@@ -139,8 +153,15 @@ public class Indexer extends BaseRegionObserver {
*/
private IndexWriter recoveryWriter;
+ private MetricsIndexerSource metricSource;
+
private boolean stopped;
private boolean disabled;
+ private long slowIndexWriteThreshold;
+ private long slowIndexPrepareThreshold;
+ private long slowPreWALRestoreThreshold;
+ private long slowPostOpenThreshold;
+ private long slowPreIncrementThreshold;
public static final String RecoveryFailurePolicyKeyForTesting = INDEX_RECOVERY_FAILURE_POLICY_KEY;
@@ -186,6 +207,11 @@ public class Indexer extends BaseRegionObserver {
DelegateRegionCoprocessorEnvironment indexWriterEnv = new DelegateRegionCoprocessorEnvironment(clonedConfig, env);
// setup the actual index writer
this.writer = new IndexWriter(indexWriterEnv, serverName + "-index-writer");
+
+ // Metrics impl for the Indexer -- avoiding unnecessary indirection for hadoop-1/2 compat
+ this.metricSource = MetricsIndexerSourceFactory.getInstance().create();
+ setSlowThresholds(e.getConfiguration());
+
try {
// get the specified failure policy. We only ever override it in tests, but we need to do it
// here
@@ -207,6 +233,30 @@ public class Indexer extends BaseRegionObserver {
}
}
+ /**
+ * Extracts the slow call threshold values from the configuration.
+ */
+ private void setSlowThresholds(Configuration c) {
+ slowIndexPrepareThreshold = c.getLong(INDEXER_INDEX_WRITE_SLOW_THRESHOLD_KEY,
+ INDEXER_INDEX_WRITE_SLOW_THRESHOLD_DEFAULT);
+ slowIndexWriteThreshold = c.getLong(INDEXER_INDEX_PREPARE_SLOW_THRESHOLD_KEY,
+ INDEXER_INDEX_PREPARE_SLOW_THREHSOLD_DEFAULT);
+ slowPreWALRestoreThreshold = c.getLong(INDEXER_PRE_WAL_RESTORE_SLOW_THRESHOLD_KEY,
+ INDEXER_PRE_WAL_RESTORE_SLOW_THRESHOLD_DEFAULT);
+ slowPostOpenThreshold = c.getLong(INDEXER_POST_OPEN_SLOW_THRESHOLD_KEY,
+ INDEXER_POST_OPEN_SLOW_THRESHOLD_DEFAULT);
+ slowPreIncrementThreshold = c.getLong(INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_KEY,
+ INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_DEFAULT);
+ }
+
+ private String getCallTooSlowMessage(String callName, long duration, long threshold) {
+ StringBuilder sb = new StringBuilder(64);
+ sb.append("(callTooSlow) ").append(callName).append(" duration=").append(duration);
+ sb.append("ms, threshold=").append(threshold).append("ms");
+ return sb.toString();
+ }
+
+
@Override
public void stop(CoprocessorEnvironment e) throws IOException {
if (this.stopped) {
@@ -233,6 +283,7 @@ public class Indexer extends BaseRegionObserver {
@Override
public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> e,
final Increment inc) throws IOException {
+ long start = EnvironmentEdgeManager.currentTimeMillis();
try {
List<Mutation> mutations = this.builder.executeAtomicOp(inc);
if (mutations == null) {
@@ -259,6 +310,15 @@ public class Indexer extends BaseRegionObserver {
"Unable to process ON DUPLICATE IGNORE for " +
e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString() +
"(" + Bytes.toStringBinary(inc.getRow()) + ")", t);
+ } finally {
+ long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
+ if (duration >= slowIndexPrepareThreshold) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getCallTooSlowMessage("preIncrementAfterRowLock", duration, slowPreIncrementThreshold));
+ }
+ metricSource.incrementSlowDuplicateKeyCheckCalls();
+ }
+ metricSource.updateDuplicateKeyCheckTime(duration);
}
}
@@ -269,11 +329,21 @@ public class Indexer extends BaseRegionObserver {
super.preBatchMutate(c, miniBatchOp);
return;
}
+ long start = EnvironmentEdgeManager.currentTimeMillis();
try {
preBatchMutateWithExceptions(c, miniBatchOp);
return;
} catch (Throwable t) {
rethrowIndexingException(t);
+ } finally {
+ long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
+ if (duration >= slowIndexPrepareThreshold) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getCallTooSlowMessage("preBatchMutate", duration, slowIndexPrepareThreshold));
+ }
+ metricSource.incrementNumSlowIndexPrepareCalls();
+ }
+ metricSource.updateIndexPrepareTime(duration);
}
throw new RuntimeException(
"Somehow didn't return an index update but also didn't propagate the failure to the client!");
@@ -343,11 +413,20 @@ public class Indexer extends BaseRegionObserver {
if (current == null) {
current = NullSpan.INSTANCE;
}
+ long start = EnvironmentEdgeManager.currentTimeMillis();
// get the index updates for all elements in this batch
Collection<Pair<Mutation, byte[]>> indexUpdates =
this.builder.getIndexUpdate(miniBatchOp, mutations.values());
+ long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
+ if (duration >= slowIndexPrepareThreshold) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getCallTooSlowMessage("indexPrepare", duration, slowIndexPrepareThreshold));
+ }
+ metricSource.incrementNumSlowIndexPrepareCalls();
+ }
+ metricSource.updateIndexPrepareTime(duration);
current.addTimelineAnnotation("Built index updates, doing preStep");
TracingUtils.addAnnotation(current, "index update count", indexUpdates.size());
@@ -394,15 +473,27 @@ public class Indexer extends BaseRegionObserver {
if (this.disabled) {
super.postBatchMutateIndispensably(c, miniBatchOp, success);
return;
- }
- this.builder.batchCompleted(miniBatchOp);
-
- if (success) { // if miniBatchOp was successfully written, write index updates
- //each batch operation, only the first one will have anything useful, so we can just grab that
- Mutation mutation = miniBatchOp.getOperation(0);
- WALEdit edit = miniBatchOp.getWalEdit(0);
- doPost(edit, mutation, mutation.getDurability());
- }
+ }
+ long start = EnvironmentEdgeManager.currentTimeMillis();
+ try {
+ this.builder.batchCompleted(miniBatchOp);
+
+ if (success) { // if miniBatchOp was successfully written, write index updates
+ //each batch operation, only the first one will have anything useful, so we can just grab that
+ Mutation mutation = miniBatchOp.getOperation(0);
+ WALEdit edit = miniBatchOp.getWalEdit(0);
+ doPost(edit, mutation, mutation.getDurability());
+ }
+ } finally {
+ long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
+ if (duration >= slowIndexWriteThreshold) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getCallTooSlowMessage("postBatchMutateIndispensably", duration, slowIndexWriteThreshold));
+ }
+ metricSource.incrementNumSlowIndexWriteCalls();
+ }
+ metricSource.updateIndexWriteTime(duration);
+ }
}
private void doPost(WALEdit edit, Mutation m, final Durability durability) throws IOException {
@@ -430,6 +521,7 @@ public class Indexer extends BaseRegionObserver {
if (current == null) {
current = NullSpan.INSTANCE;
}
+ long start = EnvironmentEdgeManager.currentTimeMillis();
// there is a little bit of excess here- we iterate all the non-indexed kvs for this check first
// and then do it again later when getting out the index updates. This should be pretty minor
@@ -485,6 +577,15 @@ public class Indexer extends BaseRegionObserver {
ikv.markBatchFinished();
}
}
+
+ long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
+ if (duration >= slowIndexWriteThreshold) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getCallTooSlowMessage("indexWrite", duration, slowIndexWriteThreshold));
+ }
+ metricSource.incrementNumSlowIndexWriteCalls();
+ }
+ metricSource.updateIndexWriteTime(duration);
}
}
@@ -527,23 +628,35 @@ public class Indexer extends BaseRegionObserver {
if (this.disabled) {
super.postOpen(c);
return;
- }
-
- //if we have no pending edits to complete, then we are done
- if (updates == null || updates.size() == 0) {
- return;
}
- LOG.info("Found some outstanding index updates that didn't succeed during"
- + " WAL replay - attempting to replay now.");
-
- // do the usual writer stuff, killing the server again, if we can't manage to make the index
- // writes succeed again
+ long start = EnvironmentEdgeManager.currentTimeMillis();
try {
- writer.writeAndKillYourselfOnFailure(updates, true);
- } catch (IOException e) {
- LOG.error("During WAL replay of outstanding index updates, "
- + "Exception is thrown instead of killing server during index writing", e);
+ //if we have no pending edits to complete, then we are done
+ if (updates == null || updates.size() == 0) {
+ return;
+ }
+
+ LOG.info("Found some outstanding index updates that didn't succeed during"
+ + " WAL replay - attempting to replay now.");
+
+ // do the usual writer stuff, killing the server again, if we can't manage to make the index
+ // writes succeed again
+ try {
+ writer.writeAndKillYourselfOnFailure(updates, true);
+ } catch (IOException e) {
+ LOG.error("During WAL replay of outstanding index updates, "
+ + "Exception is thrown instead of killing server during index writing", e);
+ }
+ } finally {
+ long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
+ if (duration >= slowPostOpenThreshold) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getCallTooSlowMessage("postOpen", duration, slowPostOpenThreshold));
+ }
+ metricSource.incrementNumSlowPostOpenCalls();
+ }
+ metricSource.updatePostOpenTime(duration);
}
}
@@ -553,19 +666,32 @@ public class Indexer extends BaseRegionObserver {
if (this.disabled) {
super.preWALRestore(env, info, logKey, logEdit);
return;
- }
+ }
+
// TODO check the regions in transition. If the server on which the region lives is this one,
// then we should rety that write later in postOpen.
// we might be able to get even smarter here and pre-split the edits that are server-local
// into their own recovered.edits file. This then lets us do a straightforward recovery of each
// region (and more efficiently as we aren't writing quite as hectically from this one place).
- /*
- * Basically, we let the index regions recover for a little while long before retrying in the
- * hopes they come up before the primary table finishes.
- */
- Collection<Pair<Mutation, byte[]>> indexUpdates = extractIndexUpdate(logEdit);
- recoveryWriter.writeAndKillYourselfOnFailure(indexUpdates, true);
+ long start = EnvironmentEdgeManager.currentTimeMillis();
+ try {
+ /*
+ * Basically, we let the index regions recover for a little while long before retrying in the
+ * hopes they come up before the primary table finishes.
+ */
+ Collection<Pair<Mutation, byte[]>> indexUpdates = extractIndexUpdate(logEdit);
+ recoveryWriter.writeAndKillYourselfOnFailure(indexUpdates, true);
+ } finally {
+ long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
+ if (duration >= slowPreWALRestoreThreshold) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getCallTooSlowMessage("preWALRestore", duration, slowPreWALRestoreThreshold));
+ }
+ metricSource.incrementNumSlowPreWALRestoreCalls();
+ }
+ metricSource.updatePreWALRestoreTime(duration);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6dea0117/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSource.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSource.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSource.java
new file mode 100644
index 0000000..e42fccc
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSource.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.metrics;
+
+import org.apache.hadoop.hbase.metrics.BaseSource;
+import org.apache.phoenix.hbase.index.Indexer;
+
+/**
+ * Interface for metrics about {@link Indexer}.
+ */
+public interface MetricsIndexerSource extends BaseSource {
+ // Metrics2 and JMX constants
+ String METRICS_NAME = "PhoenixIndexer";
+ String METRICS_CONTEXT = "phoenix";
+ String METRICS_DESCRIPTION = "Metrics about the Phoenix Indexer";
+ String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME;
+
+ String INDEX_PREPARE_TIME = "indexPrepareTime";
+ String INDEX_PREPARE_TIME_DESC = "Histogram for the time in milliseconds for preparing an index write";
+ String SLOW_INDEX_PREPARE = "slowIndexPrepareCalls";
+ String SLOW_INDEX_PREPARE_DESC = "The number of index preparations slower than the configured threshold";
+
+ String INDEX_WRITE_TIME = "indexWriteTime";
+ String INDEX_WRITE_TIME_DESC = "Histogram for the time in milliseconds for writing an index update";
+ String SLOW_INDEX_WRITE = "slowIndexWriteCalls";
+ String SLOW_INDEX_WRITE_DESC = "The number of index writes slower than the configured threshold";
+
+ String DUPLICATE_KEY_TIME = "duplicateKeyCheckTime";
+ String DUPLICATE_KEY_TIME_DESC = "Histogram for the time in milliseconds to handle ON DUPLICATE keywords";
+ String SLOW_DUPLICATE_KEY = "slowDuplicateKeyCheckCalls";
+ String SLOW_DUPLICATE_KEY_DESC = "The number of on duplicate key checks slower than the configured threshold";
+
+ String PRE_WAL_RESTORE_TIME = "preWALRestoreTime";
+ String PRE_WAL_RESTORE_TIME_DESC = "Histogram for the time in milliseconds for Indexer's preWALRestore";
+ String SLOW_PRE_WAL_RESTORE = "slowPreWALRestoreCalls";
+ String SLOW_PRE_WAL_RESTORE_DESC = "The number of preWALRestore calls slower than the configured threshold";
+
+ String POST_PUT_TIME = "postPutTime";
+ String POST_PUT_TIME_DESC = "Histogram for the time in milliseconds for Indexer's postPut";
+ String SLOW_POST_PUT = "slowPostPutCalls";
+ String SLOW_POST_PUT_DESC = "The number of postPut calls slower than the configured threshold";
+
+ String POST_DELETE_TIME = "postDeleteTime";
+ String POST_DELETE_TIME_DESC = "Histogram for the time in milliseconds for Indexer's postDelete";
+ String SLOW_POST_DELETE = "slowPostDeleteCalls";
+ String SLOW_POST_DELETE_DESC = "The number of postDelete calls slower than the configured threshold";
+
+ String POST_OPEN_TIME = "postOpenTime";
+ String POST_OPEN_TIME_DESC = "Histogram for the time in milliseconds for Indexer's postOpen";
+ String SLOW_POST_OPEN = "slowPostOpenCalls";
+ String SLOW_POST_OPEN_DESC = "The number of postOpen calls slower than the configured threshold";
+
+ /**
+ * Updates the index preparation time histogram (preBatchMutate).
+ *
+ * @param t time taken in milliseconds
+ */
+ void updateIndexPrepareTime(long t);
+
+ /**
+ * Increments the number of slow calls prepare an index write.
+ */
+ void incrementNumSlowIndexPrepareCalls();
+
+ /**
+ * Updates the index write time histogram (postBatchMutate).
+ *
+ * @param t time taken in milliseconds
+ */
+ void updateIndexWriteTime(long t);
+
+ /**
+ * Increments the number of slow calls to write to the index.
+ */
+ void incrementNumSlowIndexWriteCalls();
+
+ /**
+ * Updates the preWALRestore time histogram.
+ *
+ * @param t time taken in milliseconds
+ */
+ void updatePreWALRestoreTime(long t);
+
+ /**
+ * Increments the number of slow preWALRestore calls.
+ */
+ void incrementNumSlowPreWALRestoreCalls();
+
+ /**
+ * Updates the postPut time histogram.
+ *
+ * @param t time taken in milliseconds
+ */
+ void updatePostPutTime(long t);
+
+ /**
+ * Increments the number of slow postPut calls.
+ */
+ void incrementNumSlowPostPutCalls();
+
+ /**
+ * Updates the postDelete time histogram.
+ *
+ * @param t time taken in milliseconds
+ */
+ void updatePostDeleteTime(long t);
+
+ /**
+ * Increments the number of slow postDelete calls.
+ */
+ void incrementNumSlowPostDeleteCalls();
+
+ /**
+ * Updates the postOpen time histogram.
+ *
+ * @param t time taken in milliseconds
+ */
+ void updatePostOpenTime(long t);
+
+ /**
+ * Increments the number of slow postOpen calls.
+ */
+ void incrementNumSlowPostOpenCalls();
+
+ /**
+ * Updates the preIncrementAfterRowLock time histogram.
+ *
+ * @param t time taken in milliseconds
+ */
+ void updateDuplicateKeyCheckTime(long t);
+
+ /**
+ * Increments the number of slow preIncrementAfteRowLock calls.
+ */
+ void incrementSlowDuplicateKeyCheckCalls();
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6dea0117/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceFactory.java
new file mode 100644
index 0000000..8d97f7b
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.metrics;
+
+/**
+ * Factory class for creating {@link MetricsIndexerSource} instances.
+ */
+public class MetricsIndexerSourceFactory {
+ private static final MetricsIndexerSourceFactory INSTANCE = new MetricsIndexerSourceFactory();
+ private MetricsIndexerSource source;
+
+ private MetricsIndexerSourceFactory() {}
+
+ public static MetricsIndexerSourceFactory getInstance() {
+ return INSTANCE;
+ }
+
+ public synchronized MetricsIndexerSource create() {
+ if (INSTANCE.source == null) {
+ INSTANCE.source = new MetricsIndexerSourceImpl();
+ }
+ return INSTANCE.source;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6dea0117/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceImpl.java
new file mode 100644
index 0000000..dd6ba5b
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceImpl.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.metrics;
+
+import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
+import org.apache.hadoop.metrics2.MetricHistogram;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+
+/**
+ * Implementation for tracking Phoenix Indexer metrics.
+ */
+public class MetricsIndexerSourceImpl extends BaseSourceImpl implements MetricsIndexerSource {
+
+ private final MetricHistogram indexPrepareTimeHisto;
+ private final MutableCounterLong slowIndexPrepareCalls;
+ private final MetricHistogram indexWriteTimeHisto;
+ private final MutableCounterLong slowIndexWriteCalls;
+ private final MetricHistogram preWALRestoreTimeHisto;
+ private final MutableCounterLong slowPreWALRestoreCalls;
+ private final MetricHistogram postPutTimeHisto;
+ private final MutableCounterLong slowPostPutCalls;
+ private final MetricHistogram postDeleteTimeHisto;
+ private final MutableCounterLong slowPostDeleteCalls;
+ private final MetricHistogram postOpenTimeHisto;
+ private final MutableCounterLong slowPostOpenCalls;
+ private final MetricHistogram duplicateKeyTimeHisto;
+ private final MutableCounterLong slowDuplicateKeyCalls;
+
+ public MetricsIndexerSourceImpl() {
+ this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT);
+ }
+
+ public MetricsIndexerSourceImpl(String metricsName, String metricsDescription,
+ String metricsContext, String metricsJmxContext) {
+ super(metricsName, metricsDescription, metricsContext, metricsJmxContext);
+
+ indexPrepareTimeHisto = getMetricsRegistry().newHistogram(INDEX_PREPARE_TIME, INDEX_PREPARE_TIME_DESC);
+ slowIndexPrepareCalls = getMetricsRegistry().newCounter(SLOW_INDEX_PREPARE, SLOW_INDEX_PREPARE_DESC, 0L);
+ indexWriteTimeHisto = getMetricsRegistry().newHistogram(INDEX_WRITE_TIME, INDEX_WRITE_TIME_DESC);
+ slowIndexWriteCalls = getMetricsRegistry().newCounter(SLOW_INDEX_WRITE, SLOW_INDEX_WRITE_DESC, 0L);
+ preWALRestoreTimeHisto = getMetricsRegistry().newHistogram(PRE_WAL_RESTORE_TIME, PRE_WAL_RESTORE_TIME_DESC);
+ slowPreWALRestoreCalls = getMetricsRegistry().newCounter(SLOW_PRE_WAL_RESTORE, SLOW_PRE_WAL_RESTORE_DESC, 0L);
+ postPutTimeHisto = getMetricsRegistry().newHistogram(POST_PUT_TIME, POST_PUT_TIME_DESC);
+ slowPostPutCalls = getMetricsRegistry().newCounter(SLOW_POST_PUT, SLOW_POST_PUT_DESC, 0L);
+ postDeleteTimeHisto = getMetricsRegistry().newHistogram(POST_DELETE_TIME, POST_DELETE_TIME_DESC);
+ slowPostDeleteCalls = getMetricsRegistry().newCounter(SLOW_POST_DELETE, SLOW_POST_DELETE_DESC, 0L);
+ postOpenTimeHisto = getMetricsRegistry().newHistogram(POST_OPEN_TIME, POST_OPEN_TIME_DESC);
+ slowPostOpenCalls = getMetricsRegistry().newCounter(SLOW_POST_OPEN, SLOW_POST_OPEN_DESC, 0L);
+ duplicateKeyTimeHisto = getMetricsRegistry().newHistogram(DUPLICATE_KEY_TIME, DUPLICATE_KEY_TIME_DESC);
+ slowDuplicateKeyCalls = getMetricsRegistry().newCounter(SLOW_DUPLICATE_KEY, SLOW_DUPLICATE_KEY_DESC, 0L);
+ }
+
+ @Override
+ public void updateIndexPrepareTime(long t) {
+ indexPrepareTimeHisto.add(t);
+ }
+
+ @Override
+ public void updateIndexWriteTime(long t) {
+ indexWriteTimeHisto.add(t);
+ }
+
+ @Override
+ public void updatePreWALRestoreTime(long t) {
+ preWALRestoreTimeHisto.add(t);
+ }
+
+ @Override
+ public void updatePostPutTime(long t) {
+ postPutTimeHisto.add(t);
+ }
+
+ @Override
+ public void updatePostDeleteTime(long t) {
+ postDeleteTimeHisto.add(t);
+ }
+
+ @Override
+ public void updatePostOpenTime(long t) {
+ postOpenTimeHisto.add(t);
+ }
+
+ @Override
+ public void incrementNumSlowIndexPrepareCalls() {
+ slowIndexPrepareCalls.incr();
+ }
+
+ @Override
+ public void incrementNumSlowIndexWriteCalls() {
+ slowIndexWriteCalls.incr();
+ }
+
+ @Override
+ public void incrementNumSlowPreWALRestoreCalls() {
+ slowPreWALRestoreCalls.incr();
+ }
+
+ @Override
+ public void incrementNumSlowPostPutCalls() {
+ slowPostPutCalls.incr();
+ }
+
+ @Override
+ public void incrementNumSlowPostDeleteCalls() {
+ slowPostDeleteCalls.incr();
+ }
+
+ @Override
+ public void incrementNumSlowPostOpenCalls() {
+ slowPostOpenCalls.incr();
+ }
+
+ @Override
+ public void updateDuplicateKeyCheckTime(long t) {
+ duplicateKeyTimeHisto.add(t);
+ }
+
+ @Override
+ public void incrementSlowDuplicateKeyCheckCalls() {
+ slowDuplicateKeyCalls.incr();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6dea0117/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 92999ba..5957138 100644
--- a/pom.xml
+++ b/pom.xml
@@ -664,7 +664,6 @@
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop2-compat</artifactId>
<version>${hbase.version}</version>
- <scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>