You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by el...@apache.org on 2017/07/19 23:11:16 UTC

[2/3] phoenix git commit: PHOENIX-4042 Add hadoop metrics2-based Indexer coproc metrics

PHOENIX-4042 Add hadoop metrics2-based Indexer coproc metrics


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/6dea0117
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/6dea0117
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/6dea0117

Branch: refs/heads/4.x-HBase-1.2
Commit: 6dea01173e95c36266b43cc2004bfcd29451ff68
Parents: f3a5242
Author: Josh Elser <el...@apache.org>
Authored: Wed Jul 19 17:02:09 2017 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed Jul 19 18:41:24 2017 -0400

----------------------------------------------------------------------
 phoenix-core/pom.xml                            |   1 -
 .../org/apache/phoenix/hbase/index/Indexer.java | 186 ++++++++++++++++---
 .../index/metrics/MetricsIndexerSource.java     | 150 +++++++++++++++
 .../metrics/MetricsIndexerSourceFactory.java    |  38 ++++
 .../index/metrics/MetricsIndexerSourceImpl.java | 136 ++++++++++++++
 pom.xml                                         |   1 -
 6 files changed, 480 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/6dea0117/phoenix-core/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml
index 72f91ac..275b72f 100644
--- a/phoenix-core/pom.xml
+++ b/phoenix-core/pom.xml
@@ -437,7 +437,6 @@
     <dependency>
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-hadoop2-compat</artifactId>
-      <scope>test</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.hbase</groupId>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6dea0117/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index 55aeeaa..5a78c94 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -70,6 +70,8 @@ import org.apache.htrace.TraceScope;
 import org.apache.phoenix.coprocessor.DelegateRegionCoprocessorEnvironment;
 import org.apache.phoenix.hbase.index.builder.IndexBuildManager;
 import org.apache.phoenix.hbase.index.builder.IndexBuilder;
+import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSource;
+import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
 import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
@@ -82,6 +84,7 @@ import org.apache.phoenix.hbase.index.write.recovery.PerRegionIndexWriteCache;
 import org.apache.phoenix.hbase.index.write.recovery.StoreFailuresInCachePolicy;
 import org.apache.phoenix.trace.TracingUtils;
 import org.apache.phoenix.trace.util.NullSpan;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ServerUtil;
 
@@ -126,6 +129,17 @@ public class Indexer extends BaseRegionObserver {
 
   private static final String INDEX_RECOVERY_FAILURE_POLICY_KEY = "org.apache.hadoop.hbase.index.recovery.failurepolicy";
 
+  private static final String INDEXER_INDEX_WRITE_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.post.batch.mutate.threshold";
+  private static final long INDEXER_INDEX_WRITE_SLOW_THRESHOLD_DEFAULT = 3_000;
+  private static final String INDEXER_INDEX_PREPARE_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.pre.batch.mutate.threshold";
+  private static final long INDEXER_INDEX_PREPARE_SLOW_THREHSOLD_DEFAULT = 3_000;
+  private static final String INDEXER_PRE_WAL_RESTORE_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.pre.wal.restore.threshold";
+  private static final long INDEXER_PRE_WAL_RESTORE_SLOW_THRESHOLD_DEFAULT = 3_000;
+  private static final String INDEXER_POST_OPEN_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.open.threshold";
+  private static final long INDEXER_POST_OPEN_SLOW_THRESHOLD_DEFAULT = 3_000;
+  private static final String INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.pre.increment";
+  private static final long INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_DEFAULT = 3_000;
+
   /**
    * cache the failed updates to the various regions. Used for making the WAL recovery mechanisms
    * more robust in the face of recoverying index regions that were on the same server as the
@@ -139,8 +153,15 @@ public class Indexer extends BaseRegionObserver {
    */
   private IndexWriter recoveryWriter;
 
+  private MetricsIndexerSource metricSource;
+
   private boolean stopped;
   private boolean disabled;
+  private long slowIndexWriteThreshold;
+  private long slowIndexPrepareThreshold;
+  private long slowPreWALRestoreThreshold;
+  private long slowPostOpenThreshold;
+  private long slowPreIncrementThreshold;
 
   public static final String RecoveryFailurePolicyKeyForTesting = INDEX_RECOVERY_FAILURE_POLICY_KEY;
 
@@ -186,6 +207,11 @@ public class Indexer extends BaseRegionObserver {
         DelegateRegionCoprocessorEnvironment indexWriterEnv = new DelegateRegionCoprocessorEnvironment(clonedConfig, env);
         // setup the actual index writer
         this.writer = new IndexWriter(indexWriterEnv, serverName + "-index-writer");
+
+        // Metrics impl for the Indexer -- avoiding unnecessary indirection for hadoop-1/2 compat
+        this.metricSource = MetricsIndexerSourceFactory.getInstance().create();
+        setSlowThresholds(e.getConfiguration());
+
         try {
           // get the specified failure policy. We only ever override it in tests, but we need to do it
           // here
@@ -207,6 +233,30 @@ public class Indexer extends BaseRegionObserver {
       }
   }
 
+  /**
+   * Extracts the slow call threshold values from the configuration.
+   */
+  private void setSlowThresholds(Configuration c) {
+      slowIndexPrepareThreshold = c.getLong(INDEXER_INDEX_WRITE_SLOW_THRESHOLD_KEY,
+          INDEXER_INDEX_WRITE_SLOW_THRESHOLD_DEFAULT);
+      slowIndexWriteThreshold = c.getLong(INDEXER_INDEX_PREPARE_SLOW_THRESHOLD_KEY,
+          INDEXER_INDEX_PREPARE_SLOW_THREHSOLD_DEFAULT);
+      slowPreWALRestoreThreshold = c.getLong(INDEXER_PRE_WAL_RESTORE_SLOW_THRESHOLD_KEY,
+          INDEXER_PRE_WAL_RESTORE_SLOW_THRESHOLD_DEFAULT);
+      slowPostOpenThreshold = c.getLong(INDEXER_POST_OPEN_SLOW_THRESHOLD_KEY,
+          INDEXER_POST_OPEN_SLOW_THRESHOLD_DEFAULT);
+      slowPreIncrementThreshold = c.getLong(INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_KEY,
+          INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_DEFAULT);
+  }
+
+  private String getCallTooSlowMessage(String callName, long duration, long threshold) {
+      StringBuilder sb = new StringBuilder(64);
+      sb.append("(callTooSlow) ").append(callName).append(" duration=").append(duration);
+      sb.append("ms, threshold=").append(threshold).append("ms");
+      return sb.toString();
+  }
+
+
   @Override
   public void stop(CoprocessorEnvironment e) throws IOException {
     if (this.stopped) {
@@ -233,6 +283,7 @@ public class Indexer extends BaseRegionObserver {
   @Override
   public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> e,
           final Increment inc) throws IOException {
+      long start = EnvironmentEdgeManager.currentTimeMillis();
       try {
           List<Mutation> mutations = this.builder.executeAtomicOp(inc);
           if (mutations == null) {
@@ -259,6 +310,15 @@ public class Indexer extends BaseRegionObserver {
                   "Unable to process ON DUPLICATE IGNORE for " + 
                   e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString() + 
                   "(" + Bytes.toStringBinary(inc.getRow()) + ")", t);
+      } finally {
+          long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
+          if (duration >= slowIndexPrepareThreshold) {
+              if (LOG.isDebugEnabled()) {
+                  LOG.debug(getCallTooSlowMessage("preIncrementAfterRowLock", duration, slowPreIncrementThreshold));
+              }
+              metricSource.incrementSlowDuplicateKeyCheckCalls();
+          }
+          metricSource.updateDuplicateKeyCheckTime(duration);
       }
   }
 
@@ -269,11 +329,21 @@ public class Indexer extends BaseRegionObserver {
           super.preBatchMutate(c, miniBatchOp);
           return;
       }
+      long start = EnvironmentEdgeManager.currentTimeMillis();
       try {
           preBatchMutateWithExceptions(c, miniBatchOp);
           return;
       } catch (Throwable t) {
           rethrowIndexingException(t);
+      } finally {
+          long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
+          if (duration >= slowIndexPrepareThreshold) {
+              if (LOG.isDebugEnabled()) {
+                  LOG.debug(getCallTooSlowMessage("preBatchMutate", duration, slowIndexPrepareThreshold));
+              }
+              metricSource.incrementNumSlowIndexPrepareCalls();
+          }
+          metricSource.updateIndexPrepareTime(duration);
       }
       throw new RuntimeException(
         "Somehow didn't return an index update but also didn't propagate the failure to the client!");
@@ -343,11 +413,20 @@ public class Indexer extends BaseRegionObserver {
           if (current == null) {
               current = NullSpan.INSTANCE;
           }
+          long start = EnvironmentEdgeManager.currentTimeMillis();
 
           // get the index updates for all elements in this batch
           Collection<Pair<Mutation, byte[]>> indexUpdates =
                   this.builder.getIndexUpdate(miniBatchOp, mutations.values());
 
+          long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
+          if (duration >= slowIndexPrepareThreshold) {
+              if (LOG.isDebugEnabled()) {
+                  LOG.debug(getCallTooSlowMessage("indexPrepare", duration, slowIndexPrepareThreshold));
+              }
+              metricSource.incrementNumSlowIndexPrepareCalls();
+          }
+          metricSource.updateIndexPrepareTime(duration);
           current.addTimelineAnnotation("Built index updates, doing preStep");
           TracingUtils.addAnnotation(current, "index update count", indexUpdates.size());
 
@@ -394,15 +473,27 @@ public class Indexer extends BaseRegionObserver {
       if (this.disabled) {
           super.postBatchMutateIndispensably(c, miniBatchOp, success);
           return;
-        }
-    this.builder.batchCompleted(miniBatchOp);
-    
-    if (success) { // if miniBatchOp was successfully written, write index updates
-        //each batch operation, only the first one will have anything useful, so we can just grab that
-        Mutation mutation = miniBatchOp.getOperation(0);
-        WALEdit edit = miniBatchOp.getWalEdit(0);
-        doPost(edit, mutation, mutation.getDurability());
-    }
+      }
+      long start = EnvironmentEdgeManager.currentTimeMillis();
+      try {
+          this.builder.batchCompleted(miniBatchOp);
+
+          if (success) { // if miniBatchOp was successfully written, write index updates
+              //each batch operation, only the first one will have anything useful, so we can just grab that
+              Mutation mutation = miniBatchOp.getOperation(0);
+              WALEdit edit = miniBatchOp.getWalEdit(0);
+              doPost(edit, mutation, mutation.getDurability());
+          }
+       } finally {
+           long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
+           if (duration >= slowIndexWriteThreshold) {
+               if (LOG.isDebugEnabled()) {
+                   LOG.debug(getCallTooSlowMessage("postBatchMutateIndispensably", duration, slowIndexWriteThreshold));
+               }
+               metricSource.incrementNumSlowIndexWriteCalls();
+           }
+           metricSource.updateIndexWriteTime(duration);
+       }
   }
 
   private void doPost(WALEdit edit, Mutation m, final Durability durability) throws IOException {
@@ -430,6 +521,7 @@ public class Indexer extends BaseRegionObserver {
           if (current == null) {
               current = NullSpan.INSTANCE;
           }
+          long start = EnvironmentEdgeManager.currentTimeMillis();
 
           // there is a little bit of excess here- we iterate all the non-indexed kvs for this check first
           // and then do it again later when getting out the index updates. This should be pretty minor
@@ -485,6 +577,15 @@ public class Indexer extends BaseRegionObserver {
                   ikv.markBatchFinished();
               }
           }
+
+          long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
+          if (duration >= slowIndexWriteThreshold) {
+              if (LOG.isDebugEnabled()) {
+                  LOG.debug(getCallTooSlowMessage("indexWrite", duration, slowIndexWriteThreshold));
+              }
+              metricSource.incrementNumSlowIndexWriteCalls();
+          }
+          metricSource.updateIndexWriteTime(duration);
       }
   }
 
@@ -527,23 +628,35 @@ public class Indexer extends BaseRegionObserver {
     if (this.disabled) {
         super.postOpen(c);
         return;
-      }
-
-    //if we have no pending edits to complete, then we are done
-    if (updates == null || updates.size() == 0) {
-      return;
     }
 
-    LOG.info("Found some outstanding index updates that didn't succeed during"
-            + " WAL replay - attempting to replay now.");
-    
-    // do the usual writer stuff, killing the server again, if we can't manage to make the index
-    // writes succeed again
+    long start = EnvironmentEdgeManager.currentTimeMillis();
     try {
-        writer.writeAndKillYourselfOnFailure(updates, true);
-    } catch (IOException e) {
-            LOG.error("During WAL replay of outstanding index updates, "
-                    + "Exception is thrown instead of killing server during index writing", e);
+        //if we have no pending edits to complete, then we are done
+        if (updates == null || updates.size() == 0) {
+          return;
+        }
+
+        LOG.info("Found some outstanding index updates that didn't succeed during"
+                + " WAL replay - attempting to replay now.");
+
+        // do the usual writer stuff, killing the server again, if we can't manage to make the index
+        // writes succeed again
+        try {
+            writer.writeAndKillYourselfOnFailure(updates, true);
+        } catch (IOException e) {
+                LOG.error("During WAL replay of outstanding index updates, "
+                        + "Exception is thrown instead of killing server during index writing", e);
+        }
+    } finally {
+         long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
+         if (duration >= slowPostOpenThreshold) {
+             if (LOG.isDebugEnabled()) {
+                 LOG.debug(getCallTooSlowMessage("postOpen", duration, slowPostOpenThreshold));
+             }
+             metricSource.incrementNumSlowPostOpenCalls();
+         }
+         metricSource.updatePostOpenTime(duration);
     }
   }
 
@@ -553,19 +666,32 @@ public class Indexer extends BaseRegionObserver {
       if (this.disabled) {
           super.preWALRestore(env, info, logKey, logEdit);
           return;
-        }
+      }
+
     // TODO check the regions in transition. If the server on which the region lives is this one,
     // then we should rety that write later in postOpen.
     // we might be able to get even smarter here and pre-split the edits that are server-local
     // into their own recovered.edits file. This then lets us do a straightforward recovery of each
     // region (and more efficiently as we aren't writing quite as hectically from this one place).
 
-    /*
-     * Basically, we let the index regions recover for a little while long before retrying in the
-     * hopes they come up before the primary table finishes.
-     */
-    Collection<Pair<Mutation, byte[]>> indexUpdates = extractIndexUpdate(logEdit);
-    recoveryWriter.writeAndKillYourselfOnFailure(indexUpdates, true);
+      long start = EnvironmentEdgeManager.currentTimeMillis();
+      try {
+          /*
+           * Basically, we let the index regions recover for a little while long before retrying in the
+           * hopes they come up before the primary table finishes.
+           */
+          Collection<Pair<Mutation, byte[]>> indexUpdates = extractIndexUpdate(logEdit);
+          recoveryWriter.writeAndKillYourselfOnFailure(indexUpdates, true);
+      } finally {
+          long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
+          if (duration >= slowPreWALRestoreThreshold) {
+              if (LOG.isDebugEnabled()) {
+                  LOG.debug(getCallTooSlowMessage("preWALRestore", duration, slowPreWALRestoreThreshold));
+              }
+              metricSource.incrementNumSlowPreWALRestoreCalls();
+          }
+          metricSource.updatePreWALRestoreTime(duration);
+      }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6dea0117/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSource.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSource.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSource.java
new file mode 100644
index 0000000..e42fccc
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSource.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.metrics;
+
+import org.apache.hadoop.hbase.metrics.BaseSource;
+import org.apache.phoenix.hbase.index.Indexer;
+
+/**
+ * Interface for metrics about {@link Indexer}.
+ */
+public interface MetricsIndexerSource extends BaseSource {
+  // Metrics2 and JMX constants
+  String METRICS_NAME = "PhoenixIndexer";
+  String METRICS_CONTEXT = "phoenix";
+  String METRICS_DESCRIPTION = "Metrics about the Phoenix Indexer";
+  String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME;
+
+  String INDEX_PREPARE_TIME = "indexPrepareTime";
+  String INDEX_PREPARE_TIME_DESC = "Histogram for the time in milliseconds for preparing an index write";
+  String SLOW_INDEX_PREPARE = "slowIndexPrepareCalls";
+  String SLOW_INDEX_PREPARE_DESC = "The number of index preparations slower than the configured threshold";
+
+  String INDEX_WRITE_TIME = "indexWriteTime";
+  String INDEX_WRITE_TIME_DESC = "Histogram for the time in milliseconds for writing an index update";
+  String SLOW_INDEX_WRITE = "slowIndexWriteCalls";
+  String SLOW_INDEX_WRITE_DESC = "The number of index writes slower than the configured threshold";
+
+  String DUPLICATE_KEY_TIME = "duplicateKeyCheckTime";
+  String DUPLICATE_KEY_TIME_DESC = "Histogram for the time in milliseconds to handle ON DUPLICATE keywords";
+  String SLOW_DUPLICATE_KEY = "slowDuplicateKeyCheckCalls";
+  String SLOW_DUPLICATE_KEY_DESC = "The number of on duplicate key checks slower than the configured threshold";
+
+  String PRE_WAL_RESTORE_TIME = "preWALRestoreTime";
+  String PRE_WAL_RESTORE_TIME_DESC = "Histogram for the time in milliseconds for Indexer's preWALRestore";
+  String SLOW_PRE_WAL_RESTORE = "slowPreWALRestoreCalls";
+  String SLOW_PRE_WAL_RESTORE_DESC = "The number of preWALRestore calls slower than the configured threshold";
+
+  String POST_PUT_TIME = "postPutTime";
+  String POST_PUT_TIME_DESC = "Histogram for the time in milliseconds for Indexer's postPut";
+  String SLOW_POST_PUT = "slowPostPutCalls";
+  String SLOW_POST_PUT_DESC = "The number of postPut calls slower than the configured threshold";
+
+  String POST_DELETE_TIME = "postDeleteTime";
+  String POST_DELETE_TIME_DESC = "Histogram for the time in milliseconds for Indexer's postDelete";
+  String SLOW_POST_DELETE = "slowPostDeleteCalls";
+  String SLOW_POST_DELETE_DESC = "The number of postDelete calls slower than the configured threshold";
+
+  String POST_OPEN_TIME = "postOpenTime";
+  String POST_OPEN_TIME_DESC = "Histogram for the time in milliseconds for Indexer's postOpen";
+  String SLOW_POST_OPEN = "slowPostOpenCalls";
+  String SLOW_POST_OPEN_DESC = "The number of postOpen calls slower than the configured threshold";
+
+  /**
+   * Updates the index preparation time histogram (preBatchMutate).
+   *
+   * @param t time taken in milliseconds
+   */
+  void updateIndexPrepareTime(long t);
+
+  /**
+   * Increments the number of slow calls prepare an index write.
+   */
+  void incrementNumSlowIndexPrepareCalls();
+
+  /**
+   * Updates the index write time histogram (postBatchMutate).
+   *
+   * @param t time taken in milliseconds
+   */
+  void updateIndexWriteTime(long t);
+
+  /**
+   * Increments the number of slow calls to write to the index.
+   */
+  void incrementNumSlowIndexWriteCalls();
+
+  /**
+   * Updates the preWALRestore time histogram.
+   *
+   * @param t time taken in milliseconds
+   */
+  void updatePreWALRestoreTime(long t);
+
+  /**
+   * Increments the number of slow preWALRestore calls.
+   */
+  void incrementNumSlowPreWALRestoreCalls();
+
+  /**
+   * Updates the postPut time histogram.
+   *
+   * @param t time taken in milliseconds
+   */
+  void updatePostPutTime(long t);
+
+  /**
+   * Increments the number of slow postPut calls.
+   */
+  void incrementNumSlowPostPutCalls();
+
+  /**
+   * Updates the postDelete time histogram.
+   *
+   * @param t time taken in milliseconds
+   */
+  void updatePostDeleteTime(long t);
+
+  /**
+   * Increments the number of slow postDelete calls.
+   */
+  void incrementNumSlowPostDeleteCalls();
+
+  /**
+   * Updates the postOpen time histogram.
+   *
+   * @param t time taken in milliseconds
+   */
+  void updatePostOpenTime(long t);
+
+  /**
+   * Increments the number of slow postOpen calls.
+   */
+  void incrementNumSlowPostOpenCalls();
+
+  /**
+   * Updates the preIncrementAfterRowLock time histogram.
+   *
+   * @param t time taken in milliseconds
+   */
+  void updateDuplicateKeyCheckTime(long t);
+
+  /**
+   * Increments the number of slow preIncrementAfteRowLock calls.
+   */
+  void incrementSlowDuplicateKeyCheckCalls();
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6dea0117/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceFactory.java
new file mode 100644
index 0000000..8d97f7b
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.metrics;
+
+/**
+ * Factory class for creating {@link MetricsIndexerSource} instances.
+ */
+public class MetricsIndexerSourceFactory {
+  private static final MetricsIndexerSourceFactory INSTANCE = new MetricsIndexerSourceFactory();
+  private MetricsIndexerSource source;
+
+  private MetricsIndexerSourceFactory() {}
+
+  public static MetricsIndexerSourceFactory getInstance() {
+    return INSTANCE;
+  }
+
+  public synchronized MetricsIndexerSource create() {
+    if (INSTANCE.source == null) {
+      INSTANCE.source = new MetricsIndexerSourceImpl();
+    }
+    return INSTANCE.source;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6dea0117/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceImpl.java
new file mode 100644
index 0000000..dd6ba5b
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceImpl.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.metrics;
+
+import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
+import org.apache.hadoop.metrics2.MetricHistogram;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+
+/**
+ * Implementation for tracking Phoenix Indexer metrics.
+ */
+public class MetricsIndexerSourceImpl extends BaseSourceImpl implements MetricsIndexerSource {
+
+    private final MetricHistogram indexPrepareTimeHisto;
+    private final MutableCounterLong slowIndexPrepareCalls;
+    private final MetricHistogram indexWriteTimeHisto;
+    private final MutableCounterLong slowIndexWriteCalls;
+    private final MetricHistogram preWALRestoreTimeHisto;
+    private final MutableCounterLong slowPreWALRestoreCalls;
+    private final MetricHistogram postPutTimeHisto;
+    private final MutableCounterLong slowPostPutCalls;
+    private final MetricHistogram postDeleteTimeHisto;
+    private final MutableCounterLong slowPostDeleteCalls;
+    private final MetricHistogram postOpenTimeHisto;
+    private final MutableCounterLong slowPostOpenCalls;
+    private final MetricHistogram duplicateKeyTimeHisto;
+    private final MutableCounterLong slowDuplicateKeyCalls;
+
+    public MetricsIndexerSourceImpl() {
+        this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT);
+    }
+
+    public MetricsIndexerSourceImpl(String metricsName, String metricsDescription,
+        String metricsContext, String metricsJmxContext) {
+        super(metricsName, metricsDescription, metricsContext, metricsJmxContext);
+
+        indexPrepareTimeHisto = getMetricsRegistry().newHistogram(INDEX_PREPARE_TIME, INDEX_PREPARE_TIME_DESC);
+        slowIndexPrepareCalls = getMetricsRegistry().newCounter(SLOW_INDEX_PREPARE, SLOW_INDEX_PREPARE_DESC, 0L);
+        indexWriteTimeHisto = getMetricsRegistry().newHistogram(INDEX_WRITE_TIME, INDEX_WRITE_TIME_DESC);
+        slowIndexWriteCalls = getMetricsRegistry().newCounter(SLOW_INDEX_WRITE, SLOW_INDEX_WRITE_DESC, 0L);
+        preWALRestoreTimeHisto = getMetricsRegistry().newHistogram(PRE_WAL_RESTORE_TIME, PRE_WAL_RESTORE_TIME_DESC);
+        slowPreWALRestoreCalls = getMetricsRegistry().newCounter(SLOW_PRE_WAL_RESTORE, SLOW_PRE_WAL_RESTORE_DESC, 0L);
+        postPutTimeHisto = getMetricsRegistry().newHistogram(POST_PUT_TIME, POST_PUT_TIME_DESC);
+        slowPostPutCalls = getMetricsRegistry().newCounter(SLOW_POST_PUT, SLOW_POST_PUT_DESC, 0L);
+        postDeleteTimeHisto = getMetricsRegistry().newHistogram(POST_DELETE_TIME, POST_DELETE_TIME_DESC);
+        slowPostDeleteCalls = getMetricsRegistry().newCounter(SLOW_POST_DELETE, SLOW_POST_DELETE_DESC, 0L);
+        postOpenTimeHisto = getMetricsRegistry().newHistogram(POST_OPEN_TIME, POST_OPEN_TIME_DESC);
+        slowPostOpenCalls = getMetricsRegistry().newCounter(SLOW_POST_OPEN, SLOW_POST_OPEN_DESC, 0L);
+        duplicateKeyTimeHisto = getMetricsRegistry().newHistogram(DUPLICATE_KEY_TIME, DUPLICATE_KEY_TIME_DESC);
+        slowDuplicateKeyCalls = getMetricsRegistry().newCounter(SLOW_DUPLICATE_KEY, SLOW_DUPLICATE_KEY_DESC, 0L);
+    }
+
+    @Override
+    public void updateIndexPrepareTime(long t) {
+        indexPrepareTimeHisto.add(t);
+    }
+
+    @Override
+    public void updateIndexWriteTime(long t) {
+        indexWriteTimeHisto.add(t);
+    }
+
+    @Override
+    public void updatePreWALRestoreTime(long t) {
+        preWALRestoreTimeHisto.add(t);
+    }
+
+    @Override
+    public void updatePostPutTime(long t) {
+        postPutTimeHisto.add(t);
+    }
+
+    @Override
+    public void updatePostDeleteTime(long t) {
+        postDeleteTimeHisto.add(t);
+    }
+
+    @Override
+    public void updatePostOpenTime(long t) {
+        postOpenTimeHisto.add(t);
+    }
+
+    @Override
+    public void incrementNumSlowIndexPrepareCalls() {
+        slowIndexPrepareCalls.incr();
+    }
+
+    @Override
+    public void incrementNumSlowIndexWriteCalls() {
+        slowIndexWriteCalls.incr();
+    }
+
+    @Override
+    public void incrementNumSlowPreWALRestoreCalls() {
+        slowPreWALRestoreCalls.incr();
+    }
+
+    @Override
+    public void incrementNumSlowPostPutCalls() {
+        slowPostPutCalls.incr();
+    }
+
+    @Override
+    public void incrementNumSlowPostDeleteCalls() {
+        slowPostDeleteCalls.incr();
+    }
+
+    @Override
+    public void incrementNumSlowPostOpenCalls() {
+        slowPostOpenCalls.incr();
+    }
+
+    @Override
+    public void updateDuplicateKeyCheckTime(long t) {
+        duplicateKeyTimeHisto.add(t);
+    }
+
+    @Override
+    public void incrementSlowDuplicateKeyCheckCalls() {
+        slowDuplicateKeyCalls.incr();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6dea0117/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 92999ba..5957138 100644
--- a/pom.xml
+++ b/pom.xml
@@ -664,7 +664,6 @@
         <groupId>org.apache.hbase</groupId>
         <artifactId>hbase-hadoop2-compat</artifactId>
         <version>${hbase.version}</version>
-        <scope>test</scope>
       </dependency>
       <dependency>
         <groupId>org.apache.hbase</groupId>