You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sk...@apache.org on 2019/07/17 23:27:22 UTC

[phoenix] branch 4.x-HBase-1.4 updated: PHOENIX-5383: Metrics for the IndexRegionObserver coprocessor

This is an automated email from the ASF dual-hosted git repository.

skadam pushed a commit to branch 4.x-HBase-1.4
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x-HBase-1.4 by this push:
     new fa287f1  PHOENIX-5383: Metrics for the IndexRegionObserver coprocessor
fa287f1 is described below

commit fa287f135ce81508182dc6b7eca7ab49bfbeff8a
Author: Priyank Porwal <pp...@salesforce.com>
AuthorDate: Fri Jul 12 09:41:33 2019 -0700

    PHOENIX-5383: Metrics for the IndexRegionObserver coprocessor
    
    Signed-off-by: s.kadam <sk...@apache.org>
---
 .../phoenix/hbase/index/IndexRegionObserver.java   | 110 +++------------------
 .../org/apache/phoenix/hbase/index/Indexer.java    |   2 +-
 .../hbase/index/metrics/MetricsIndexerSource.java  |  54 ++++++++++
 .../index/metrics/MetricsIndexerSourceFactory.java |  10 +-
 .../index/metrics/MetricsIndexerSourceImpl.java    |  50 ++++++++++
 5 files changed, 125 insertions(+), 101 deletions(-)

diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 516a088..7c37c7a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -66,27 +66,20 @@ import org.apache.phoenix.hbase.index.builder.IndexBuilder;
 import org.apache.phoenix.hbase.index.covered.IndexMetaData;
 import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSource;
 import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
-import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
-import org.apache.phoenix.hbase.index.write.IndexFailurePolicy;
 import org.apache.phoenix.hbase.index.write.IndexWriter;
 import org.apache.phoenix.hbase.index.write.LazyParallelWriterIndexCommitter;
-import org.apache.phoenix.hbase.index.write.RecoveryIndexWriter;
-import org.apache.phoenix.hbase.index.write.recovery.PerRegionIndexWriteCache;
-import org.apache.phoenix.hbase.index.write.recovery.StoreFailuresInCachePolicy;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.PhoenixIndexMetaData;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.trace.TracingUtils;
 import org.apache.phoenix.trace.util.NullSpan;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
-import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.ServerUtil.ConnectionType;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
 
 /**
  * Do all the work of managing index updates from a single coprocessor. All Puts/Delets are passed
@@ -208,20 +201,11 @@ public class IndexRegionObserver extends BaseRegionObserver {
   // The collection of pending data table rows
   private Map<ImmutableBytesPtr, PendingRow> pendingRows = new ConcurrentHashMap<>();
 
-  /**
-   * cache the failed updates to the various regions. Used for making the WAL recovery mechanisms
-   * more robust in the face of recoverying index regions that were on the same server as the
-   * primary table region
-   */
-  private PerRegionIndexWriteCache failedIndexEdits = new PerRegionIndexWriteCache();
-
   private MetricsIndexerSource metricSource;
 
   private boolean stopped;
   private boolean disabled;
-  private long slowIndexWriteThreshold;
   private long slowIndexPrepareThreshold;
-  private long slowPostOpenThreshold;
   private long slowPreIncrementThreshold;
   private int rowLockWaitDuration;
 
@@ -259,7 +243,7 @@ public class IndexRegionObserver extends BaseRegionObserver {
           this.lockManager = new LockManager();
 
           // Metrics impl for the Indexer -- avoiding unnecessary indirection for hadoop-1/2 compat
-          this.metricSource = MetricsIndexerSourceFactory.getInstance().create();
+          this.metricSource = MetricsIndexerSourceFactory.getInstance().getIndexerSource();
           setSlowThresholds(e.getConfiguration());
       } catch (NoSuchMethodError ex) {
           disabled = true;
@@ -274,10 +258,6 @@ public class IndexRegionObserver extends BaseRegionObserver {
   private void setSlowThresholds(Configuration c) {
       slowIndexPrepareThreshold = c.getLong(INDEXER_INDEX_WRITE_SLOW_THRESHOLD_KEY,
           INDEXER_INDEX_WRITE_SLOW_THRESHOLD_DEFAULT);
-      slowIndexWriteThreshold = c.getLong(INDEXER_INDEX_PREPARE_SLOW_THRESHOLD_KEY,
-          INDEXER_INDEX_PREPARE_SLOW_THREHSOLD_DEFAULT);
-      slowPostOpenThreshold = c.getLong(INDEXER_POST_OPEN_SLOW_THRESHOLD_KEY,
-          INDEXER_POST_OPEN_SLOW_THRESHOLD_DEFAULT);
       slowPreIncrementThreshold = c.getLong(INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_KEY,
           INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_DEFAULT);
   }
@@ -356,21 +336,11 @@ public class IndexRegionObserver extends BaseRegionObserver {
       if (this.disabled) {
           return;
       }
-      long start = EnvironmentEdgeManager.currentTimeMillis();
       try {
           preBatchMutateWithExceptions(c, miniBatchOp);
           return;
       } catch (Throwable t) {
           rethrowIndexingException(t);
-      } finally {
-          long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
-          if (duration >= slowIndexPrepareThreshold) {
-              if (LOG.isDebugEnabled()) {
-                  LOG.debug(getCallTooSlowMessage("preBatchMutate", duration, slowIndexPrepareThreshold));
-              }
-              metricSource.incrementNumSlowIndexPrepareCalls();
-          }
-          metricSource.updateIndexPrepareTime(duration);
       }
       throw new RuntimeException(
         "Somehow didn't return an index update but also didn't propagate the failure to the client!");
@@ -535,20 +505,11 @@ public class IndexRegionObserver extends BaseRegionObserver {
           if (current == null) {
               current = NullSpan.INSTANCE;
           }
-          long start = EnvironmentEdgeManager.currentTimeMillis();
 
           // get the index updates for all elements in this batch
           Collection<Pair<Pair<Mutation, byte[]>, byte[]>> indexUpdates =
                   this.builder.getIndexUpdates(miniBatchOp, mutations);
 
-          long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
-          if (duration >= slowIndexPrepareThreshold) {
-              if (LOG.isDebugEnabled()) {
-                  LOG.debug(getCallTooSlowMessage("indexPrepare", duration, slowIndexPrepareThreshold));
-              }
-              metricSource.incrementNumSlowIndexPrepareCalls();
-          }
-          metricSource.updateIndexPrepareTime(duration);
           current.addTimelineAnnotation("Built index updates, doing preStep");
           TracingUtils.addAnnotation(current, "index update count", indexUpdates.size());
           byte[] tableName = c.getEnvironment().getRegion().getTableDesc().getTableName().getName();
@@ -651,7 +612,11 @@ public class IndexRegionObserver extends BaseRegionObserver {
       if (mutations == null) {
           return;
       }
+
+      long start = EnvironmentEdgeManager.currentTimeMillis();
       prepareIndexMutations(c, miniBatchOp, context, mutations);
+      metricSource.updateIndexPrepareTime(EnvironmentEdgeManager.currentTimeMillis() - start);
+
       // Sleep for one millisecond if we have prepared the index updates in less than 1 ms. The sleep is necessary to
       // get different timestamps for concurrent batches that share common rows. It is very rare that the index updates
       // can be prepared in less than one millisecond
@@ -717,7 +682,6 @@ public class IndexRegionObserver extends BaseRegionObserver {
       if (this.disabled) {
           return;
       }
-      long start = EnvironmentEdgeManager.currentTimeMillis();
       BatchMutateContext context = getBatchMutateContext(c);
       if (context == null) {
           return;
@@ -735,22 +699,19 @@ public class IndexRegionObserver extends BaseRegionObserver {
           }
        } finally {
            removeBatchMutateContext(c);
-           long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
-           if (duration >= slowIndexWriteThreshold) {
-               if (LOG.isDebugEnabled()) {
-                   LOG.debug(getCallTooSlowMessage("postBatchMutateIndispensably", duration, slowIndexWriteThreshold));
-               }
-               metricSource.incrementNumSlowIndexWriteCalls();
-           }
-           metricSource.updateIndexWriteTime(duration);
        }
   }
 
   private void doPost(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context) throws IOException {
+      long start = EnvironmentEdgeManager.currentTimeMillis();
+
       try {
           doIndexWritesWithExceptions(context, true);
+          metricSource.updatePostIndexUpdateTime(EnvironmentEdgeManager.currentTimeMillis() - start);
           return;
       } catch (Throwable e) {
+          metricSource.updatePostIndexUpdateFailureTime(EnvironmentEdgeManager.currentTimeMillis() - start);
+          metricSource.incrementPostIndexUpdateFailures();
           rethrowIndexingException(e);
       }
       throw new RuntimeException(
@@ -772,21 +733,12 @@ public class IndexRegionObserver extends BaseRegionObserver {
           if (current == null) {
               current = NullSpan.INSTANCE;
           }
-          long start = EnvironmentEdgeManager.currentTimeMillis();
           current.addTimelineAnnotation("Actually doing " + (post ? "post" : "pre") + " index update for first time");
           if (post) {
               postWriter.writeAndHandleFailure(indexUpdates, false, context.clientVersion);
           } else {
               preWriter.writeAndHandleFailure(indexUpdates, false, context.clientVersion);
           }
-          long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
-          if (duration >= slowIndexWriteThreshold) {
-              if (LOG.isDebugEnabled()) {
-                  LOG.debug(getCallTooSlowMessage("indexWrite", duration, slowIndexWriteThreshold));
-              }
-              metricSource.incrementNumSlowIndexWriteCalls();
-          }
-          metricSource.updateIndexWriteTime(duration);
       }
   }
 
@@ -805,10 +757,15 @@ public class IndexRegionObserver extends BaseRegionObserver {
 
   private void doPre(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context,
                      MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+      long start = EnvironmentEdgeManager.currentTimeMillis();
+
       try {
           doIndexWritesWithExceptions(context, false);
+          metricSource.updatePreIndexUpdateTime(EnvironmentEdgeManager.currentTimeMillis() - start);
           return;
       } catch (Throwable e) {
+          metricSource.updatePreIndexUpdateFailureTime(EnvironmentEdgeManager.currentTimeMillis() - start);
+          metricSource.incrementPreIndexUpdateFailures();
           removePendingRows(context);
           rethrowIndexingException(e);
       }
@@ -816,43 +773,6 @@ public class IndexRegionObserver extends BaseRegionObserver {
               "Somehow didn't complete the index update, but didn't return succesfully either!");
   }
 
-  @Override
-  public void postOpen(final ObserverContext<RegionCoprocessorEnvironment> c) {
-    Multimap<HTableInterfaceReference, Mutation> updates = failedIndexEdits.getEdits(c.getEnvironment().getRegion());
-
-    if (this.disabled) {
-        return;
-    }
-
-    long start = EnvironmentEdgeManager.currentTimeMillis();
-    try {
-        //if we have no pending edits to complete, then we are done
-        if (updates == null || updates.size() == 0) {
-          return;
-        }
-
-        LOG.info("Found some outstanding index updates that didn't succeed during"
-                + " WAL replay - attempting to replay now.");
-
-        // do the usual preWriter stuff
-        try {
-            preWriter.writeAndHandleFailure(updates, true, ScanUtil.UNKNOWN_CLIENT_VERSION);
-        } catch (IOException e) {
-                LOG.error("During WAL replay of outstanding index updates, "
-                        + "Exception is thrown instead of killing server during index writing", e);
-        }
-    } finally {
-         long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
-         if (duration >= slowPostOpenThreshold) {
-             if (LOG.isDebugEnabled()) {
-                 LOG.debug(getCallTooSlowMessage("postOpen", duration, slowPostOpenThreshold));
-             }
-             metricSource.incrementNumSlowPostOpenCalls();
-         }
-         metricSource.updatePostOpenTime(duration);
-    }
-  }
-
   /**
    * Exposed for testing!
    * @return the currently instantiated index builder
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index 66c4594..ff8b555 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -222,7 +222,7 @@ public class Indexer extends BaseRegionObserver {
         this.lockManager = new LockManager();
 
         // Metrics impl for the Indexer -- avoiding unnecessary indirection for hadoop-1/2 compat
-        this.metricSource = MetricsIndexerSourceFactory.getInstance().create();
+        this.metricSource = MetricsIndexerSourceFactory.getInstance().getIndexerSource();
         setSlowThresholds(e.getConfiguration());
 
         try {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSource.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSource.java
index e42fccc..19df74c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSource.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSource.java
@@ -64,6 +64,21 @@ public interface MetricsIndexerSource extends BaseSource {
   String SLOW_POST_OPEN = "slowPostOpenCalls";
   String SLOW_POST_OPEN_DESC = "The number of postOpen calls slower than the configured threshold";
 
+  String PRE_INDEX_UPDATE_TIME = "preIndexUpdateTime";
+  String PRE_INDEX_UPDATE_TIME_DESC = "Histogram for the time in milliseconds for index updates pre data updates";
+  String POST_INDEX_UPDATE_TIME = "postIndexUpdateTime";
+  String POST_INDEX_UPDATE_TIME_DESC = "Histogram for the time in milliseconds for index updates post data updates";
+
+  String PRE_INDEX_UPDATE_FAILURE_TIME = "preIndexUpdateFailureTime";
+  String PRE_INDEX_UPDATE_FAILURE_TIME_DESC = "Histogram for the time in milliseconds on failures of index updates pre data updates";
+  String POST_INDEX_UPDATE_FAILURE_TIME = "postIndexUpdateFailureTime";
+  String POST_INDEX_UPDATE_FAILURE_TIME_DESC = "Histogram for the time in milliseconds on failures of index updates post data updates";
+
+  String PRE_INDEX_UPDATE_FAILURE = "preIndexUpdateFailure";
+  String PRE_INDEX_UPDATE_FAILURE_DESC = "The number of failures of index updates pre data updates";
+  String POST_INDEX_UPDATE_FAILURE = "postIndexUpdateFailure";
+  String POST_INDEX_UPDATE_FAILURE_DESC = "The number of failures of index updates post data updates";
+
   /**
    * Updates the index preparation time histogram (preBatchMutate).
    *
@@ -147,4 +162,43 @@ public interface MetricsIndexerSource extends BaseSource {
    * Increments the number of slow preIncrementAfteRowLock calls.
    */
   void incrementSlowDuplicateKeyCheckCalls();
+
+  // Below metrics are introduced by IndexRegionObserver coprocessor
+  /**
+   * Updates the pre index update time histogram.
+   *
+   * @param t time taken in milliseconds
+   */
+  void updatePreIndexUpdateTime(long t);
+
+  /**
+   * Updates the post index update time histogram.
+   *
+   * @param t time taken in milliseconds
+   */
+  void updatePostIndexUpdateTime(long t);
+
+  /**
+   * Updates the pre index update failure time histogram.
+   *
+   * @param t time taken in milliseconds
+   */
+  void updatePreIndexUpdateFailureTime(long t);
+
+  /**
+   * Updates the post index update failure time histogram.
+   *
+   * @param t time taken in milliseconds
+   */
+  void updatePostIndexUpdateFailureTime(long t);
+
+  /**
+   * Increments the number of pre index update failures.
+   */
+  void incrementPreIndexUpdateFailures();
+
+  /**
+   * Increments the number of post index update failures.
+   */
+  void incrementPostIndexUpdateFailures();
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceFactory.java
index 8d97f7b..e373e2f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceFactory.java
@@ -21,7 +21,7 @@ package org.apache.phoenix.hbase.index.metrics;
  */
 public class MetricsIndexerSourceFactory {
   private static final MetricsIndexerSourceFactory INSTANCE = new MetricsIndexerSourceFactory();
-  private MetricsIndexerSource source;
+  private MetricsIndexerSource indexerSource;
 
   private MetricsIndexerSourceFactory() {}
 
@@ -29,10 +29,10 @@ public class MetricsIndexerSourceFactory {
     return INSTANCE;
   }
 
-  public synchronized MetricsIndexerSource create() {
-    if (INSTANCE.source == null) {
-      INSTANCE.source = new MetricsIndexerSourceImpl();
+  public synchronized MetricsIndexerSource getIndexerSource() {
+    if (INSTANCE.indexerSource == null) {
+      INSTANCE.indexerSource = new MetricsIndexerSourceImpl();
     }
-    return INSTANCE.source;
+    return INSTANCE.indexerSource;
   }
 }
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceImpl.java
index cc82bb2..9bcf9fd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceImpl.java
@@ -40,6 +40,13 @@ public class MetricsIndexerSourceImpl extends BaseSourceImpl implements MetricsI
     private final MetricHistogram duplicateKeyTimeHisto;
     private final MutableFastCounter slowDuplicateKeyCalls;
 
+    private final MetricHistogram preIndexUpdateTimeHisto;
+    private final MetricHistogram postIndexUpdateTimeHisto;
+    private final MetricHistogram preIndexUpdateFailureTimeHisto;
+    private final MetricHistogram postIndexUpdateFailureTimeHisto;
+    private final MutableFastCounter preIndexUpdateFailures;
+    private final MutableFastCounter postIndexUpdateFailures;
+
     public MetricsIndexerSourceImpl() {
         this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT);
     }
@@ -62,6 +69,19 @@ public class MetricsIndexerSourceImpl extends BaseSourceImpl implements MetricsI
         slowPostOpenCalls = getMetricsRegistry().newCounter(SLOW_POST_OPEN, SLOW_POST_OPEN_DESC, 0L);
         duplicateKeyTimeHisto = getMetricsRegistry().newHistogram(DUPLICATE_KEY_TIME, DUPLICATE_KEY_TIME_DESC);
         slowDuplicateKeyCalls = getMetricsRegistry().newCounter(SLOW_DUPLICATE_KEY, SLOW_DUPLICATE_KEY_DESC, 0L);
+
+        postIndexUpdateTimeHisto = getMetricsRegistry().newHistogram(
+                POST_INDEX_UPDATE_TIME, POST_INDEX_UPDATE_TIME_DESC);
+        preIndexUpdateTimeHisto = getMetricsRegistry().newHistogram(
+                PRE_INDEX_UPDATE_TIME, PRE_INDEX_UPDATE_TIME_DESC);
+        postIndexUpdateFailureTimeHisto = getMetricsRegistry().newHistogram(
+                POST_INDEX_UPDATE_FAILURE_TIME, POST_INDEX_UPDATE_FAILURE_TIME_DESC);
+        preIndexUpdateFailureTimeHisto = getMetricsRegistry().newHistogram(
+                PRE_INDEX_UPDATE_FAILURE_TIME, PRE_INDEX_UPDATE_FAILURE_TIME_DESC);
+        postIndexUpdateFailures = getMetricsRegistry().newCounter(
+                POST_INDEX_UPDATE_FAILURE, POST_INDEX_UPDATE_FAILURE_DESC, 0L);
+        preIndexUpdateFailures = getMetricsRegistry().newCounter(
+                PRE_INDEX_UPDATE_FAILURE, PRE_INDEX_UPDATE_FAILURE_DESC, 0L);
     }
 
     @Override
@@ -133,4 +153,34 @@ public class MetricsIndexerSourceImpl extends BaseSourceImpl implements MetricsI
     public void incrementSlowDuplicateKeyCheckCalls() {
         slowDuplicateKeyCalls.incr();
     }
+
+    @Override
+    public void updatePreIndexUpdateTime(long t) {
+        preIndexUpdateTimeHisto.add(t);
+    }
+
+    @Override
+    public void updatePostIndexUpdateTime(long t) {
+        postIndexUpdateTimeHisto.add(t);
+    }
+
+    @Override
+    public void updatePreIndexUpdateFailureTime(long t) {
+        preIndexUpdateFailureTimeHisto.add(t);
+    }
+
+    @Override
+    public void updatePostIndexUpdateFailureTime(long t) {
+        postIndexUpdateFailureTimeHisto.add(t);
+    }
+
+    @Override
+    public void incrementPreIndexUpdateFailures() {
+        preIndexUpdateFailures.incr();
+    }
+
+    @Override
+    public void incrementPostIndexUpdateFailures() {
+        postIndexUpdateFailures.incr();
+    }
 }
\ No newline at end of file