You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sk...@apache.org on 2019/07/17 23:32:33 UTC
[phoenix] branch 4.14-HBase-1.3 updated: PHOENIX-5383: Metrics for
the IndexRegionObserver coprocessor
This is an automated email from the ASF dual-hosted git repository.
skadam pushed a commit to branch 4.14-HBase-1.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.14-HBase-1.3 by this push:
new e027095 PHOENIX-5383: Metrics for the IndexRegionObserver coprocessor
e027095 is described below
commit e0270955d04c80367c9d34ce408e15f1bdf99c9c
Author: Priyank Porwal <pp...@salesforce.com>
AuthorDate: Fri Jul 12 09:41:33 2019 -0700
PHOENIX-5383: Metrics for the IndexRegionObserver coprocessor
Signed-off-by: s.kadam <sk...@apache.org>
---
.../phoenix/hbase/index/IndexRegionObserver.java | 110 +++------------------
.../org/apache/phoenix/hbase/index/Indexer.java | 2 +-
.../hbase/index/metrics/MetricsIndexerSource.java | 54 ++++++++++
.../index/metrics/MetricsIndexerSourceFactory.java | 10 +-
.../index/metrics/MetricsIndexerSourceImpl.java | 50 ++++++++++
5 files changed, 125 insertions(+), 101 deletions(-)
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 516a088..7c37c7a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -66,27 +66,20 @@ import org.apache.phoenix.hbase.index.builder.IndexBuilder;
import org.apache.phoenix.hbase.index.covered.IndexMetaData;
import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSource;
import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
-import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
-import org.apache.phoenix.hbase.index.write.IndexFailurePolicy;
import org.apache.phoenix.hbase.index.write.IndexWriter;
import org.apache.phoenix.hbase.index.write.LazyParallelWriterIndexCommitter;
-import org.apache.phoenix.hbase.index.write.RecoveryIndexWriter;
-import org.apache.phoenix.hbase.index.write.recovery.PerRegionIndexWriteCache;
-import org.apache.phoenix.hbase.index.write.recovery.StoreFailuresInCachePolicy;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.index.PhoenixIndexMetaData;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.trace.TracingUtils;
import org.apache.phoenix.trace.util.NullSpan;
import org.apache.phoenix.util.EnvironmentEdgeManager;
-import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.ServerUtil;
import org.apache.phoenix.util.ServerUtil.ConnectionType;
import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
/**
* Do all the work of managing index updates from a single coprocessor. All Puts/Delets are passed
@@ -208,20 +201,11 @@ public class IndexRegionObserver extends BaseRegionObserver {
// The collection of pending data table rows
private Map<ImmutableBytesPtr, PendingRow> pendingRows = new ConcurrentHashMap<>();
- /**
- * cache the failed updates to the various regions. Used for making the WAL recovery mechanisms
- * more robust in the face of recoverying index regions that were on the same server as the
- * primary table region
- */
- private PerRegionIndexWriteCache failedIndexEdits = new PerRegionIndexWriteCache();
-
private MetricsIndexerSource metricSource;
private boolean stopped;
private boolean disabled;
- private long slowIndexWriteThreshold;
private long slowIndexPrepareThreshold;
- private long slowPostOpenThreshold;
private long slowPreIncrementThreshold;
private int rowLockWaitDuration;
@@ -259,7 +243,7 @@ public class IndexRegionObserver extends BaseRegionObserver {
this.lockManager = new LockManager();
// Metrics impl for the Indexer -- avoiding unnecessary indirection for hadoop-1/2 compat
- this.metricSource = MetricsIndexerSourceFactory.getInstance().create();
+ this.metricSource = MetricsIndexerSourceFactory.getInstance().getIndexerSource();
setSlowThresholds(e.getConfiguration());
} catch (NoSuchMethodError ex) {
disabled = true;
@@ -274,10 +258,6 @@ public class IndexRegionObserver extends BaseRegionObserver {
private void setSlowThresholds(Configuration c) {
slowIndexPrepareThreshold = c.getLong(INDEXER_INDEX_WRITE_SLOW_THRESHOLD_KEY,
INDEXER_INDEX_WRITE_SLOW_THRESHOLD_DEFAULT);
- slowIndexWriteThreshold = c.getLong(INDEXER_INDEX_PREPARE_SLOW_THRESHOLD_KEY,
- INDEXER_INDEX_PREPARE_SLOW_THREHSOLD_DEFAULT);
- slowPostOpenThreshold = c.getLong(INDEXER_POST_OPEN_SLOW_THRESHOLD_KEY,
- INDEXER_POST_OPEN_SLOW_THRESHOLD_DEFAULT);
slowPreIncrementThreshold = c.getLong(INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_KEY,
INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_DEFAULT);
}
@@ -356,21 +336,11 @@ public class IndexRegionObserver extends BaseRegionObserver {
if (this.disabled) {
return;
}
- long start = EnvironmentEdgeManager.currentTimeMillis();
try {
preBatchMutateWithExceptions(c, miniBatchOp);
return;
} catch (Throwable t) {
rethrowIndexingException(t);
- } finally {
- long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
- if (duration >= slowIndexPrepareThreshold) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(getCallTooSlowMessage("preBatchMutate", duration, slowIndexPrepareThreshold));
- }
- metricSource.incrementNumSlowIndexPrepareCalls();
- }
- metricSource.updateIndexPrepareTime(duration);
}
throw new RuntimeException(
"Somehow didn't return an index update but also didn't propagate the failure to the client!");
@@ -535,20 +505,11 @@ public class IndexRegionObserver extends BaseRegionObserver {
if (current == null) {
current = NullSpan.INSTANCE;
}
- long start = EnvironmentEdgeManager.currentTimeMillis();
// get the index updates for all elements in this batch
Collection<Pair<Pair<Mutation, byte[]>, byte[]>> indexUpdates =
this.builder.getIndexUpdates(miniBatchOp, mutations);
- long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
- if (duration >= slowIndexPrepareThreshold) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(getCallTooSlowMessage("indexPrepare", duration, slowIndexPrepareThreshold));
- }
- metricSource.incrementNumSlowIndexPrepareCalls();
- }
- metricSource.updateIndexPrepareTime(duration);
current.addTimelineAnnotation("Built index updates, doing preStep");
TracingUtils.addAnnotation(current, "index update count", indexUpdates.size());
byte[] tableName = c.getEnvironment().getRegion().getTableDesc().getTableName().getName();
@@ -651,7 +612,11 @@ public class IndexRegionObserver extends BaseRegionObserver {
if (mutations == null) {
return;
}
+
+ long start = EnvironmentEdgeManager.currentTimeMillis();
prepareIndexMutations(c, miniBatchOp, context, mutations);
+ metricSource.updateIndexPrepareTime(EnvironmentEdgeManager.currentTimeMillis() - start);
+
// Sleep for one millisecond if we have prepared the index updates in less than 1 ms. The sleep is necessary to
// get different timestamps for concurrent batches that share common rows. It is very rare that the index updates
// can be prepared in less than one millisecond
@@ -717,7 +682,6 @@ public class IndexRegionObserver extends BaseRegionObserver {
if (this.disabled) {
return;
}
- long start = EnvironmentEdgeManager.currentTimeMillis();
BatchMutateContext context = getBatchMutateContext(c);
if (context == null) {
return;
@@ -735,22 +699,19 @@ public class IndexRegionObserver extends BaseRegionObserver {
}
} finally {
removeBatchMutateContext(c);
- long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
- if (duration >= slowIndexWriteThreshold) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(getCallTooSlowMessage("postBatchMutateIndispensably", duration, slowIndexWriteThreshold));
- }
- metricSource.incrementNumSlowIndexWriteCalls();
- }
- metricSource.updateIndexWriteTime(duration);
}
}
private void doPost(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context) throws IOException {
+ long start = EnvironmentEdgeManager.currentTimeMillis();
+
try {
doIndexWritesWithExceptions(context, true);
+ metricSource.updatePostIndexUpdateTime(EnvironmentEdgeManager.currentTimeMillis() - start);
return;
} catch (Throwable e) {
+ metricSource.updatePostIndexUpdateFailureTime(EnvironmentEdgeManager.currentTimeMillis() - start);
+ metricSource.incrementPostIndexUpdateFailures();
rethrowIndexingException(e);
}
throw new RuntimeException(
@@ -772,21 +733,12 @@ public class IndexRegionObserver extends BaseRegionObserver {
if (current == null) {
current = NullSpan.INSTANCE;
}
- long start = EnvironmentEdgeManager.currentTimeMillis();
current.addTimelineAnnotation("Actually doing " + (post ? "post" : "pre") + " index update for first time");
if (post) {
postWriter.writeAndHandleFailure(indexUpdates, false, context.clientVersion);
} else {
preWriter.writeAndHandleFailure(indexUpdates, false, context.clientVersion);
}
- long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
- if (duration >= slowIndexWriteThreshold) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(getCallTooSlowMessage("indexWrite", duration, slowIndexWriteThreshold));
- }
- metricSource.incrementNumSlowIndexWriteCalls();
- }
- metricSource.updateIndexWriteTime(duration);
}
}
@@ -805,10 +757,15 @@ public class IndexRegionObserver extends BaseRegionObserver {
private void doPre(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+ long start = EnvironmentEdgeManager.currentTimeMillis();
+
try {
doIndexWritesWithExceptions(context, false);
+ metricSource.updatePreIndexUpdateTime(EnvironmentEdgeManager.currentTimeMillis() - start);
return;
} catch (Throwable e) {
+ metricSource.updatePreIndexUpdateFailureTime(EnvironmentEdgeManager.currentTimeMillis() - start);
+ metricSource.incrementPreIndexUpdateFailures();
removePendingRows(context);
rethrowIndexingException(e);
}
@@ -816,43 +773,6 @@ public class IndexRegionObserver extends BaseRegionObserver {
"Somehow didn't complete the index update, but didn't return succesfully either!");
}
- @Override
- public void postOpen(final ObserverContext<RegionCoprocessorEnvironment> c) {
- Multimap<HTableInterfaceReference, Mutation> updates = failedIndexEdits.getEdits(c.getEnvironment().getRegion());
-
- if (this.disabled) {
- return;
- }
-
- long start = EnvironmentEdgeManager.currentTimeMillis();
- try {
- //if we have no pending edits to complete, then we are done
- if (updates == null || updates.size() == 0) {
- return;
- }
-
- LOG.info("Found some outstanding index updates that didn't succeed during"
- + " WAL replay - attempting to replay now.");
-
- // do the usual preWriter stuff
- try {
- preWriter.writeAndHandleFailure(updates, true, ScanUtil.UNKNOWN_CLIENT_VERSION);
- } catch (IOException e) {
- LOG.error("During WAL replay of outstanding index updates, "
- + "Exception is thrown instead of killing server during index writing", e);
- }
- } finally {
- long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
- if (duration >= slowPostOpenThreshold) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(getCallTooSlowMessage("postOpen", duration, slowPostOpenThreshold));
- }
- metricSource.incrementNumSlowPostOpenCalls();
- }
- metricSource.updatePostOpenTime(duration);
- }
- }
-
/**
* Exposed for testing!
* @return the currently instantiated index builder
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index 1c036ac..05baff4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -222,7 +222,7 @@ public class Indexer extends BaseRegionObserver {
this.lockManager = new LockManager();
// Metrics impl for the Indexer -- avoiding unnecessary indirection for hadoop-1/2 compat
- this.metricSource = MetricsIndexerSourceFactory.getInstance().create();
+ this.metricSource = MetricsIndexerSourceFactory.getInstance().getIndexerSource();
setSlowThresholds(e.getConfiguration());
try {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSource.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSource.java
index e42fccc..19df74c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSource.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSource.java
@@ -64,6 +64,21 @@ public interface MetricsIndexerSource extends BaseSource {
String SLOW_POST_OPEN = "slowPostOpenCalls";
String SLOW_POST_OPEN_DESC = "The number of postOpen calls slower than the configured threshold";
+ String PRE_INDEX_UPDATE_TIME = "preIndexUpdateTime";
+ String PRE_INDEX_UPDATE_TIME_DESC = "Histogram for the time in milliseconds for index updates pre data updates";
+ String POST_INDEX_UPDATE_TIME = "postIndexUpdateTime";
+ String POST_INDEX_UPDATE_TIME_DESC = "Histogram for the time in milliseconds for index updates post data updates";
+
+ String PRE_INDEX_UPDATE_FAILURE_TIME = "preIndexUpdateFailureTime";
+ String PRE_INDEX_UPDATE_FAILURE_TIME_DESC = "Histogram for the time in milliseconds on failures of index updates pre data updates";
+ String POST_INDEX_UPDATE_FAILURE_TIME = "postIndexUpdateFailureTime";
+ String POST_INDEX_UPDATE_FAILURE_TIME_DESC = "Histogram for the time in milliseconds on failures of index updates post data updates";
+
+ String PRE_INDEX_UPDATE_FAILURE = "preIndexUpdateFailure";
+ String PRE_INDEX_UPDATE_FAILURE_DESC = "The number of failures of index updates pre data updates";
+ String POST_INDEX_UPDATE_FAILURE = "postIndexUpdateFailure";
+ String POST_INDEX_UPDATE_FAILURE_DESC = "The number of failures of index updates post data updates";
+
/**
* Updates the index preparation time histogram (preBatchMutate).
*
@@ -147,4 +162,43 @@ public interface MetricsIndexerSource extends BaseSource {
* Increments the number of slow preIncrementAfteRowLock calls.
*/
void incrementSlowDuplicateKeyCheckCalls();
+
+ // Below metrics are introduced by IndexRegionObserver coprocessor
+ /**
+ * Updates the pre index update time histogram.
+ *
+ * @param t time taken in milliseconds
+ */
+ void updatePreIndexUpdateTime(long t);
+
+ /**
+ * Updates the post index update time histogram.
+ *
+ * @param t time taken in milliseconds
+ */
+ void updatePostIndexUpdateTime(long t);
+
+ /**
+ * Updates the pre index update failure time histogram.
+ *
+ * @param t time taken in milliseconds
+ */
+ void updatePreIndexUpdateFailureTime(long t);
+
+ /**
+ * Updates the post index update failure time histogram.
+ *
+ * @param t time taken in milliseconds
+ */
+ void updatePostIndexUpdateFailureTime(long t);
+
+ /**
+ * Increments the number of pre index update failures.
+ */
+ void incrementPreIndexUpdateFailures();
+
+ /**
+ * Increments the number of post index update failures.
+ */
+ void incrementPostIndexUpdateFailures();
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceFactory.java
index 8d97f7b..e373e2f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceFactory.java
@@ -21,7 +21,7 @@ package org.apache.phoenix.hbase.index.metrics;
*/
public class MetricsIndexerSourceFactory {
private static final MetricsIndexerSourceFactory INSTANCE = new MetricsIndexerSourceFactory();
- private MetricsIndexerSource source;
+ private MetricsIndexerSource indexerSource;
private MetricsIndexerSourceFactory() {}
@@ -29,10 +29,10 @@ public class MetricsIndexerSourceFactory {
return INSTANCE;
}
- public synchronized MetricsIndexerSource create() {
- if (INSTANCE.source == null) {
- INSTANCE.source = new MetricsIndexerSourceImpl();
+ public synchronized MetricsIndexerSource getIndexerSource() {
+ if (INSTANCE.indexerSource == null) {
+ INSTANCE.indexerSource = new MetricsIndexerSourceImpl();
}
- return INSTANCE.source;
+ return INSTANCE.indexerSource;
}
}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceImpl.java
index cc82bb2..9bcf9fd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceImpl.java
@@ -40,6 +40,13 @@ public class MetricsIndexerSourceImpl extends BaseSourceImpl implements MetricsI
private final MetricHistogram duplicateKeyTimeHisto;
private final MutableFastCounter slowDuplicateKeyCalls;
+ private final MetricHistogram preIndexUpdateTimeHisto;
+ private final MetricHistogram postIndexUpdateTimeHisto;
+ private final MetricHistogram preIndexUpdateFailureTimeHisto;
+ private final MetricHistogram postIndexUpdateFailureTimeHisto;
+ private final MutableFastCounter preIndexUpdateFailures;
+ private final MutableFastCounter postIndexUpdateFailures;
+
public MetricsIndexerSourceImpl() {
this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT);
}
@@ -62,6 +69,19 @@ public class MetricsIndexerSourceImpl extends BaseSourceImpl implements MetricsI
slowPostOpenCalls = getMetricsRegistry().newCounter(SLOW_POST_OPEN, SLOW_POST_OPEN_DESC, 0L);
duplicateKeyTimeHisto = getMetricsRegistry().newHistogram(DUPLICATE_KEY_TIME, DUPLICATE_KEY_TIME_DESC);
slowDuplicateKeyCalls = getMetricsRegistry().newCounter(SLOW_DUPLICATE_KEY, SLOW_DUPLICATE_KEY_DESC, 0L);
+
+ postIndexUpdateTimeHisto = getMetricsRegistry().newHistogram(
+ POST_INDEX_UPDATE_TIME, POST_INDEX_UPDATE_TIME_DESC);
+ preIndexUpdateTimeHisto = getMetricsRegistry().newHistogram(
+ PRE_INDEX_UPDATE_TIME, PRE_INDEX_UPDATE_TIME_DESC);
+ postIndexUpdateFailureTimeHisto = getMetricsRegistry().newHistogram(
+ POST_INDEX_UPDATE_FAILURE_TIME, POST_INDEX_UPDATE_FAILURE_TIME_DESC);
+ preIndexUpdateFailureTimeHisto = getMetricsRegistry().newHistogram(
+ PRE_INDEX_UPDATE_FAILURE_TIME, PRE_INDEX_UPDATE_FAILURE_TIME_DESC);
+ postIndexUpdateFailures = getMetricsRegistry().newCounter(
+ POST_INDEX_UPDATE_FAILURE, POST_INDEX_UPDATE_FAILURE_DESC, 0L);
+ preIndexUpdateFailures = getMetricsRegistry().newCounter(
+ PRE_INDEX_UPDATE_FAILURE, PRE_INDEX_UPDATE_FAILURE_DESC, 0L);
}
@Override
@@ -133,4 +153,34 @@ public class MetricsIndexerSourceImpl extends BaseSourceImpl implements MetricsI
public void incrementSlowDuplicateKeyCheckCalls() {
slowDuplicateKeyCalls.incr();
}
+
+ @Override
+ public void updatePreIndexUpdateTime(long t) {
+ preIndexUpdateTimeHisto.add(t);
+ }
+
+ @Override
+ public void updatePostIndexUpdateTime(long t) {
+ postIndexUpdateTimeHisto.add(t);
+ }
+
+ @Override
+ public void updatePreIndexUpdateFailureTime(long t) {
+ preIndexUpdateFailureTimeHisto.add(t);
+ }
+
+ @Override
+ public void updatePostIndexUpdateFailureTime(long t) {
+ postIndexUpdateFailureTimeHisto.add(t);
+ }
+
+ @Override
+ public void incrementPreIndexUpdateFailures() {
+ preIndexUpdateFailures.incr();
+ }
+
+ @Override
+ public void incrementPostIndexUpdateFailures() {
+ postIndexUpdateFailures.incr();
+ }
}
\ No newline at end of file