You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2017/05/26 16:52:01 UTC
phoenix git commit: PHOENIX-3248 Enable HBase server-side scan
metrics to be returned to client and surfaced through metrics (Karan Mehta)
Repository: phoenix
Updated Branches:
refs/heads/4.x-HBase-1.1 92d8cf271 -> b553f96a2
PHOENIX-3248 Enable HBase server-side scan metrics to be returned to client and surfaced through metrics (Karan Mehta)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b553f96a
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b553f96a
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b553f96a
Branch: refs/heads/4.x-HBase-1.1
Commit: b553f96a21b603d02083cd1c32fca95036a80bb5
Parents: 92d8cf2
Author: Samarth Jain <sa...@apache.org>
Authored: Fri May 26 09:51:57 2017 -0700
Committer: Samarth Jain <sa...@apache.org>
Committed: Fri May 26 09:51:57 2017 -0700
----------------------------------------------------------------------
.../DelayedTableResultIteratorFactory.java | 18 +++--
.../phoenix/monitoring/PhoenixMetricsIT.java | 35 ++++-----
.../phoenix/iterate/ChunkedResultIterator.java | 11 +--
.../DefaultTableResultIteratorFactory.java | 12 ++--
.../phoenix/iterate/ParallelIterators.java | 11 ++-
.../phoenix/iterate/ScanningResultIterator.java | 76 +++++++++++++-------
.../apache/phoenix/iterate/SerialIterators.java | 13 +++-
.../phoenix/iterate/TableResultIterator.java | 11 +--
.../iterate/TableResultIteratorFactory.java | 6 +-
.../phoenix/mapreduce/PhoenixRecordReader.java | 16 +++--
.../phoenix/monitoring/GlobalClientMetrics.java | 7 +-
.../apache/phoenix/monitoring/MetricType.java | 17 ++++-
.../phoenix/monitoring/ReadMetricQueue.java | 4 ++
.../hive/mapreduce/PhoenixRecordReader.java | 11 +--
14 files changed, 158 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b553f96a/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java b/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java
index 55bed91..5e13982 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java
@@ -26,7 +26,7 @@ import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.MutationState;
-import org.apache.phoenix.monitoring.CombinableMetric;
+import org.apache.phoenix.monitoring.ScanMetricsHolder;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.tuple.Tuple;
@@ -39,14 +39,18 @@ public class DelayedTableResultIteratorFactory implements TableResultIteratorFac
}
@Override
- public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef, Scan scan,
- CombinableMetric scanMetrics, long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException {
- return new DelayedTableResultIterator(mutationState, tableRef, scan, scanMetrics, renewLeaseThreshold, plan, scanGrouper);
+ public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef,
+ Scan scan, ScanMetricsHolder scanMetricsHolder, long renewLeaseThreshold,
+ QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException {
+ return new DelayedTableResultIterator(mutationState, tableRef, scan, scanMetricsHolder,
+ renewLeaseThreshold, plan, scanGrouper);
}
-
+
private class DelayedTableResultIterator extends TableResultIterator {
- public DelayedTableResultIterator (MutationState mutationState, TableRef tableRef, Scan scan, CombinableMetric scanMetrics, long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException {
- super(mutationState, scan, scanMetrics, renewLeaseThreshold, plan, scanGrouper);
+ public DelayedTableResultIterator(MutationState mutationState, TableRef tableRef, Scan scan,
+ ScanMetricsHolder scanMetricsHolder, long renewLeaseThreshold, QueryPlan plan,
+ ParallelScanGrouper scanGrouper) throws SQLException {
+ super(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, plan, scanGrouper);
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b553f96a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
index 2838f04..69ad1ff 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
@@ -76,12 +76,14 @@ import com.google.common.collect.Sets;
public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
- private static final List<String> mutationMetricsToSkip = Lists
- .newArrayList(MetricType.MUTATION_COMMIT_TIME.name());
- private static final List<String> readMetricsToSkip = Lists.newArrayList(MetricType.TASK_QUEUE_WAIT_TIME.name(),
- MetricType.TASK_EXECUTION_TIME.name(), MetricType.TASK_END_TO_END_TIME.name());
+ private static final List<String> mutationMetricsToSkip =
+ Lists.newArrayList(MetricType.MUTATION_COMMIT_TIME.name());
+ private static final List<String> readMetricsToSkip =
+ Lists.newArrayList(MetricType.TASK_QUEUE_WAIT_TIME.name(),
+ MetricType.TASK_EXECUTION_TIME.name(), MetricType.TASK_END_TO_END_TIME.name(),
+ MetricType.COUNT_MILLS_BETWEEN_NEXTS.name());
private static final String CUSTOM_URL_STRING = "SESSION";
- private static final AtomicInteger numConnections = new AtomicInteger(0);
+ private static final AtomicInteger numConnections = new AtomicInteger(0);
@BeforeClass
public static void doSetup() throws Exception {
@@ -230,7 +232,7 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
resultSetBeingTested.close();
Set<String> expectedTableNames = Sets.newHashSet(tableName);
assertReadMetricValuesForSelectSql(Lists.newArrayList(numRows), Lists.newArrayList(numExpectedTasks),
- resultSetBeingTested, expectedTableNames);
+ resultSetBeingTested, expectedTableNames);
}
@Test
@@ -617,7 +619,7 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
assertMetricsHaveSameValues(metrics.get(index1), metrics.get(index3), mutationMetricsToSkip);
}
}
-
+
@Test
public void testOpenConnectionsCounter() throws Exception {
long numOpenConnections = GLOBAL_OPEN_PHOENIX_CONNECTIONS.getMetric().getValue();
@@ -626,7 +628,7 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
}
assertEquals(numOpenConnections, GLOBAL_OPEN_PHOENIX_CONNECTIONS.getMetric().getValue());
}
-
+
private void createTableAndInsertValues(boolean commit, int numRows, Connection conn, String tableName)
throws SQLException {
String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)";
@@ -684,20 +686,14 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
String tableName = entry.getKey();
expectedTableNames.remove(tableName);
Map<String, Long> metricValues = entry.getValue();
- boolean scanMetricsPresent = false;
boolean taskCounterMetricsPresent = false;
boolean taskExecutionTimeMetricsPresent = false;
boolean memoryMetricsPresent = false;
for (Entry<String, Long> pair : metricValues.entrySet()) {
String metricName = pair.getKey();
long metricValue = pair.getValue();
- long n = numRows.get(counter);
long numTask = numExpectedTasks.get(counter);
- if (metricName.equals(SCAN_BYTES.name())) {
- // we are using a SCAN_BYTES_DELTA of 1. So number of scan bytes read should be number of rows read
- assertEquals(n, metricValue);
- scanMetricsPresent = true;
- } else if (metricName.equals(TASK_EXECUTED_COUNTER.name())) {
+ if (metricName.equals(TASK_EXECUTED_COUNTER.name())) {
assertEquals(numTask, metricValue);
taskCounterMetricsPresent = true;
} else if (metricName.equals(TASK_EXECUTION_TIME.name())) {
@@ -709,7 +705,6 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
}
}
counter++;
- assertTrue(scanMetricsPresent);
assertTrue(taskCounterMetricsPresent);
assertTrue(taskExecutionTimeMetricsPresent);
assertTrue(memoryMetricsPresent);
@@ -822,7 +817,7 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
}
}
}
-
+
@Test
public void testGetConnectionsForSameUrlConcurrently() throws Exception {
// establish url and quorum. Need to use PhoenixDriver and not PhoenixTestDriver
@@ -940,7 +935,7 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
exec.shutdownNow();
}
}
-
+
@Test
public void testGetConnectionsWithDifferentJDBCParamsConcurrently() throws Exception {
DriverManager.registerDriver(PhoenixDriver.INSTANCE);
@@ -977,9 +972,9 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
c.close();
} catch (Exception ignore) {}
}
- }
+ }
}
-
+
private static class GetConnectionCallable implements Callable<Connection> {
private final String url;
GetConnectionCallable(String url) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b553f96a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
index a12d40c..8595fd4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
@@ -19,7 +19,6 @@
package org.apache.phoenix.iterate;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;
-import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
import java.sql.SQLException;
import java.util.List;
@@ -30,6 +29,8 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.monitoring.ReadMetricQueue;
+import org.apache.phoenix.monitoring.ScanMetricsHolder;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.TableRef;
@@ -144,12 +145,14 @@ public class ChunkedResultIterator implements PeekingResultIterator {
}
if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("Get next chunked result iterator over " + tableRef.getTable().getPhysicalName().getString() + " with " + scan, ScanUtil.getCustomAnnotations(scan)));
String tableName = tableRef.getTable().getPhysicalName().getString();
+ ReadMetricQueue readMetrics = context.getReadMetricsQueue();
+ ScanMetricsHolder scanMetricsHolder = ScanMetricsHolder.getInstance(readMetrics, tableName, scan,
+ readMetrics.isRequestMetricsEnabled());
long renewLeaseThreshold = context.getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds();
ResultIterator singleChunkResultIterator =
new SingleChunkResultIterator(new TableResultIterator(mutationState, scan,
- context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName),
- renewLeaseThreshold, plan, DefaultParallelScanGrouper.getInstance()),
- chunkSize);
+ scanMetricsHolder, renewLeaseThreshold, plan,
+ DefaultParallelScanGrouper.getInstance()), chunkSize);
resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan, tableName, plan);
}
return resultIterator;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b553f96a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java
index b720b56..976b839 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java
@@ -22,15 +22,17 @@ import java.sql.SQLException;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.execute.MutationState;
-import org.apache.phoenix.monitoring.CombinableMetric;
+import org.apache.phoenix.monitoring.ScanMetricsHolder;
import org.apache.phoenix.schema.TableRef;
public class DefaultTableResultIteratorFactory implements TableResultIteratorFactory {
- @Override
- public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef, Scan scan,
- CombinableMetric scanMetrics, long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException {
- return new TableResultIterator(mutationState, scan, scanMetrics, renewLeaseThreshold, plan, scanGrouper);
+ @Override
+ public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef,
+ Scan scan, ScanMetricsHolder scanMetricsHolder, long renewLeaseThreshold,
+ QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException {
+ return new TableResultIterator(mutationState, scan, scanMetricsHolder, renewLeaseThreshold,
+ plan, scanGrouper);
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b553f96a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index 8c9b689..f0360e2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -30,9 +30,9 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.job.JobManager.JobCallable;
-import org.apache.phoenix.monitoring.CombinableMetric;
import org.apache.phoenix.monitoring.MetricType;
import org.apache.phoenix.monitoring.ReadMetricQueue;
+import org.apache.phoenix.monitoring.ScanMetricsHolder;
import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
import org.apache.phoenix.trace.util.Tracing;
import org.apache.phoenix.util.LogUtil;
@@ -97,11 +97,16 @@ public class ParallelIterators extends BaseResultIterators {
context.getOverallQueryMetrics().updateNumParallelScans(numScans);
GLOBAL_NUM_PARALLEL_SCANS.update(numScans);
final long renewLeaseThreshold = context.getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds();
+ boolean isRequestMetricsEnabled = readMetrics.isRequestMetricsEnabled();
for (final ScanLocator scanLocation : scanLocations) {
final Scan scan = scanLocation.getScan();
- final CombinableMetric scanMetrics = readMetrics.allotMetric(MetricType.SCAN_BYTES, physicalTableName);
+ final ScanMetricsHolder scanMetricsHolder = ScanMetricsHolder.getInstance(readMetrics, physicalTableName,
+ scan, isRequestMetricsEnabled);
final TaskExecutionMetricsHolder taskMetrics = new TaskExecutionMetricsHolder(readMetrics, physicalTableName);
- final TableResultIterator tableResultItr = context.getConnection().getTableResultIteratorFactory().newIterator(mutationState, tableRef, scan, scanMetrics, renewLeaseThreshold, plan, scanGrouper);
+ final TableResultIterator tableResultItr =
+ context.getConnection().getTableResultIteratorFactory().newIterator(
+ mutationState, tableRef, scan, scanMetricsHolder, renewLeaseThreshold, plan,
+ scanGrouper);
context.getConnection().addIteratorForLeaseRenewal(tableResultItr);
Future<PeekingResultIterator> future = executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b553f96a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
index 7f865ed..8ee00e9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
@@ -22,33 +22,74 @@ import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SCAN_BYTE
import java.io.IOException;
import java.sql.SQLException;
import java.util.List;
+import java.util.Map;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.phoenix.monitoring.CombinableMetric;
-import org.apache.phoenix.monitoring.CombinableMetric.NoOpRequestMetric;
-import org.apache.phoenix.monitoring.GlobalClientMetrics;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.monitoring.ScanMetricsHolder;
import org.apache.phoenix.schema.tuple.ResultTuple;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.util.ServerUtil;
public class ScanningResultIterator implements ResultIterator {
private final ResultScanner scanner;
- private final CombinableMetric scanMetrics;
+ private final Scan scan;
+ private final ScanMetricsHolder scanMetricsHolder;
+ boolean scanMetricsUpdated;
+ boolean scanMetricsEnabled;
- public ScanningResultIterator(ResultScanner scanner, CombinableMetric scanMetrics) {
+ // These metric names are how HBase refers them
+ // Since HBase stores these strings as static final, we are using the same here
+ static final String RPC_CALLS_METRIC_NAME = "RPC_CALLS";
+ static final String REMOTE_RPC_CALLS_METRIC_NAME = "REMOTE_RPC_CALLS";
+ static final String MILLIS_BETWEEN_NEXTS_METRIC_NAME = "MILLIS_BETWEEN_NEXTS";
+ static final String NOT_SERVING_REGION_EXCEPTION_METRIC_NAME = "NOT_SERVING_REGION_EXCEPTION";
+ static final String BYTES_IN_RESULTS_METRIC_NAME = "BYTES_IN_RESULTS";
+ static final String BYTES_IN_REMOTE_RESULTS_METRIC_NAME = "BYTES_IN_REMOTE_RESULTS";
+ static final String REGIONS_SCANNED_METRIC_NAME = "REGIONS_SCANNED";
+ static final String RPC_RETRIES_METRIC_NAME = "RPC_RETRIES";
+ static final String REMOTE_RPC_RETRIES_METRIC_NAME = "REMOTE_RPC_RETRIES";
+ static final String COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME = "ROWS_SCANNED";
+ static final String COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME = "ROWS_FILTERED";
+ static final String GLOBAL_BYTES_IN_RESULTS_METRIC_NAME = "BYTES_IN_RESULTS";
+
+ public ScanningResultIterator(ResultScanner scanner, Scan scan, ScanMetricsHolder scanMetricsHolder) {
this.scanner = scanner;
- this.scanMetrics = scanMetrics;
+ this.scan = scan;
+ this.scanMetricsHolder = scanMetricsHolder;
+ scanMetricsUpdated = false;
+ scanMetricsEnabled = scan.isScanMetricsEnabled();
}
@Override
public void close() throws SQLException {
+ getScanMetrics();
scanner.close();
}
+ private void getScanMetrics() {
+
+ if (!scanMetricsUpdated && scanMetricsEnabled) {
+ Map<String, Long> scanMetricsMap = scan.getScanMetrics().getMetricsMap();
+ scanMetricsHolder.getCountOfRPCcalls().change(scanMetricsMap.get(RPC_CALLS_METRIC_NAME));
+ scanMetricsHolder.getCountOfRemoteRPCcalls().change(scanMetricsMap.get(REMOTE_RPC_CALLS_METRIC_NAME));
+ scanMetricsHolder.getSumOfMillisSecBetweenNexts().change(scanMetricsMap.get(MILLIS_BETWEEN_NEXTS_METRIC_NAME));
+ scanMetricsHolder.getCountOfNSRE().change(scanMetricsMap.get(NOT_SERVING_REGION_EXCEPTION_METRIC_NAME));
+ scanMetricsHolder.getCountOfBytesInResults().change(scanMetricsMap.get(BYTES_IN_RESULTS_METRIC_NAME));
+ scanMetricsHolder.getCountOfBytesInRemoteResults().change(scanMetricsMap.get(BYTES_IN_REMOTE_RESULTS_METRIC_NAME));
+ scanMetricsHolder.getCountOfRegions().change(scanMetricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+ scanMetricsHolder.getCountOfRPCRetries().change(scanMetricsMap.get(RPC_RETRIES_METRIC_NAME));
+ scanMetricsHolder.getCountOfRemoteRPCRetries().change(scanMetricsMap.get(REMOTE_RPC_RETRIES_METRIC_NAME));
+ scanMetricsHolder.getCountOfRowsScanned().change(scanMetricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+ scanMetricsHolder.getCountOfRowsFiltered().change(scanMetricsMap.get(COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME));
+
+ GLOBAL_SCAN_BYTES.update(scanMetricsMap.get(GLOBAL_BYTES_IN_RESULTS_METRIC_NAME));
+ scanMetricsUpdated = true;
+ }
+
+ }
+
@Override
public Tuple next() throws SQLException {
try {
@@ -57,7 +98,6 @@ public class ScanningResultIterator implements ResultIterator {
close(); // Free up resources early
return null;
}
- calculateScanSize(result);
// TODO: use ResultTuple.setResult(result)?
// Need to create a new one if holding on to it (i.e. OrderedResultIterator)
return new ResultTuple(result);
@@ -75,22 +115,6 @@ public class ScanningResultIterator implements ResultIterator {
return "ScanningResultIterator [scanner=" + scanner + "]";
}
- private void calculateScanSize(Result result) {
- if (GlobalClientMetrics.isMetricsEnabled() || scanMetrics != NoOpRequestMetric.INSTANCE) {
- if (result != null) {
- Cell[] cells = result.rawCells();
- long scanResultSize = 0;
- for (Cell cell : cells) {
- KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
- scanResultSize += kv.heapSize();
- }
- scanMetrics.change(scanResultSize);
- GLOBAL_SCAN_BYTES.update(scanResultSize);
- }
- }
- }
-
-
public ResultScanner getScanner() {
return scanner;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b553f96a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
index d8f7f40..eb0c949 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
@@ -17,8 +17,6 @@
*/
package org.apache.phoenix.iterate;
-import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
-
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
@@ -33,6 +31,8 @@ import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.job.JobManager.JobCallable;
+import org.apache.phoenix.monitoring.ReadMetricQueue;
+import org.apache.phoenix.monitoring.ScanMetricsHolder;
import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
import org.apache.phoenix.parse.HintNode;
import org.apache.phoenix.query.QueryConstants;
@@ -168,12 +168,19 @@ public class SerialIterators extends BaseResultIterators {
if (index >= scans.size()) {
return EMPTY_ITERATOR;
}
+ ReadMetricQueue readMetrics = context.getReadMetricsQueue();
+ boolean isRequestMetricsEnabled = readMetrics.isRequestMetricsEnabled();
while (index < scans.size()) {
Scan currentScan = scans.get(index++);
if (remainingOffset != null) {
currentScan.setAttribute(BaseScannerRegionObserver.SCAN_OFFSET, PInteger.INSTANCE.toBytes(remainingOffset));
}
- TableResultIterator itr = new TableResultIterator(mutationState, currentScan, context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), renewLeaseThreshold, plan, scanGrouper);
+ ScanMetricsHolder scanMetricsHolder =
+ ScanMetricsHolder.getInstance(readMetrics, tableName, currentScan,
+ isRequestMetricsEnabled);
+ TableResultIterator itr =
+ new TableResultIterator(mutationState, currentScan, scanMetricsHolder,
+ renewLeaseThreshold, plan, scanGrouper);
PeekingResultIterator peekingItr = iteratorFactory.newIterator(context, itr, currentScan, tableName, plan);
Tuple tuple;
if ((tuple = peekingItr.peek()) == null) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b553f96a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index c6fcc1d..f854996 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.monitoring.CombinableMetric;
+import org.apache.phoenix.monitoring.ScanMetricsHolder;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
import org.apache.phoenix.schema.tuple.Tuple;
@@ -64,7 +65,7 @@ import com.google.common.annotations.VisibleForTesting;
public class TableResultIterator implements ResultIterator {
private final Scan scan;
private final HTableInterface htable;
- private final CombinableMetric scanMetrics;
+ private final ScanMetricsHolder scanMetricsHolder;
private static final ResultIterator UNINITIALIZED_SCANNER = ResultIterator.EMPTY_ITERATOR;
private final long renewLeaseThreshold;
private final QueryPlan plan;
@@ -85,7 +86,7 @@ public class TableResultIterator implements ResultIterator {
@VisibleForTesting // Exposed for testing. DON'T USE ANYWHERE ELSE!
TableResultIterator() {
- this.scanMetrics = null;
+ this.scanMetricsHolder = null;
this.renewLeaseThreshold = 0;
this.htable = null;
this.scan = null;
@@ -97,10 +98,10 @@ public class TableResultIterator implements ResultIterator {
RENEWED, NOT_RENEWED, CLOSED, UNINITIALIZED, THRESHOLD_NOT_REACHED, LOCK_NOT_ACQUIRED, NOT_SUPPORTED
};
- public TableResultIterator(MutationState mutationState, Scan scan, CombinableMetric scanMetrics,
+ public TableResultIterator(MutationState mutationState, Scan scan, ScanMetricsHolder scanMetricsHolder,
long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException {
this.scan = scan;
- this.scanMetrics = scanMetrics;
+ this.scanMetricsHolder = scanMetricsHolder;
this.plan = plan;
PTable table = plan.getTableRef().getTable();
htable = mutationState.getHTable(table);
@@ -186,7 +187,7 @@ public class TableResultIterator implements ResultIterator {
if (delegate == UNINITIALIZED_SCANNER) {
try {
this.scanIterator =
- new ScanningResultIterator(htable.getScanner(scan), scanMetrics);
+ new ScanningResultIterator(htable.getScanner(scan), scan, scanMetricsHolder);
} catch (IOException e) {
Closeables.closeQuietly(htable);
throw ServerUtil.parseServerException(e);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b553f96a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java
index 8d7b54d..c23e342 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java
@@ -23,8 +23,12 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.monitoring.CombinableMetric;
+import org.apache.phoenix.monitoring.ScanMetricsHolder;
import org.apache.phoenix.schema.TableRef;
public interface TableResultIteratorFactory {
- public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef, Scan scan, CombinableMetric scanMetrics, long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException;
+ public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef,
+ Scan scan, ScanMetricsHolder scanMetricsHolder, long renewLeaseThreshold,
+ QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException;
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b553f96a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
index 17d9b6a..d4d6734 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
@@ -17,8 +17,6 @@
*/
package org.apache.phoenix.mapreduce;
-import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
-
import java.io.IOException;
import java.sql.SQLException;
import java.util.List;
@@ -47,13 +45,13 @@ import org.apache.phoenix.iterate.SequenceResultIterator;
import org.apache.phoenix.iterate.TableResultIterator;
import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.monitoring.ReadMetricQueue;
+import org.apache.phoenix.monitoring.ScanMetricsHolder;
+import org.apache.phoenix.query.ConnectionQueryServices;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
-import org.apache.phoenix.query.ConnectionQueryServices;
-
/**
* {@link RecordReader} implementation that iterates over the the records.
*/
@@ -119,10 +117,18 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null
services.clearTableRegionCache(tableNameBytes);
long renewScannerLeaseThreshold = queryPlan.getContext().getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds();
+ boolean isRequestMetricsEnabled = readMetrics.isRequestMetricsEnabled();
for (Scan scan : scans) {
// For MR, skip the region boundary check exception if we encounter a split. ref: PHOENIX-2599
scan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, Bytes.toBytes(true));
- final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext().getConnection().getMutationState(), scan, readMetrics.allotMetric(SCAN_BYTES, tableName), renewScannerLeaseThreshold, queryPlan, MapReduceParallelScanGrouper.getInstance());
+ ScanMetricsHolder scanMetricsHolder =
+ ScanMetricsHolder.getInstance(readMetrics, tableName, scan,
+ isRequestMetricsEnabled);
+ final TableResultIterator tableResultIterator =
+ new TableResultIterator(
+ queryPlan.getContext().getConnection().getMutationState(), scan,
+ scanMetricsHolder, renewScannerLeaseThreshold, queryPlan,
+ MapReduceParallelScanGrouper.getInstance());
PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator);
iterators.add(peekingResultIterator);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b553f96a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
index b5f9422..6ba677a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
@@ -18,8 +18,6 @@
package org.apache.phoenix.monitoring;
import static org.apache.phoenix.monitoring.MetricType.HCONNECTIONS_COUNTER;
-import static org.apache.phoenix.monitoring.MetricType.OPEN_PHOENIX_CONNECTIONS_COUNTER;
-import static org.apache.phoenix.monitoring.MetricType.QUERY_FAILED_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.MEMORY_CHUNK_BYTES;
import static org.apache.phoenix.monitoring.MetricType.MEMORY_WAIT_TIME;
import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_SIZE;
@@ -27,10 +25,11 @@ import static org.apache.phoenix.monitoring.MetricType.MUTATION_BYTES;
import static org.apache.phoenix.monitoring.MetricType.MUTATION_COMMIT_TIME;
import static org.apache.phoenix.monitoring.MetricType.MUTATION_SQL_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.NUM_PARALLEL_SCANS;
+import static org.apache.phoenix.monitoring.MetricType.OPEN_PHOENIX_CONNECTIONS_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_FAILED_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.QUERY_SERVICES_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.QUERY_TIME;
import static org.apache.phoenix.monitoring.MetricType.QUERY_TIMEOUT_COUNTER;
-import static org.apache.phoenix.monitoring.MetricType.TASK_REJECTED_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
import static org.apache.phoenix.monitoring.MetricType.SELECT_SQL_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.SPOOL_FILE_COUNTER;
@@ -41,6 +40,8 @@ import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTION_TIME;
import static org.apache.phoenix.monitoring.MetricType.TASK_QUEUE_WAIT_TIME;
import static org.apache.phoenix.monitoring.MetricType.PHOENIX_CONNECTIONS_THROTTLED_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.TASK_REJECTED_COUNTER;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b553f96a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
index 7b21de5..a18d4ce 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
@@ -46,8 +46,19 @@ public enum MetricType {
HCONNECTIONS_COUNTER("Number of HConnections created by phoenix driver"),
PHOENIX_CONNECTIONS_THROTTLED_COUNTER("Number of client Phoenix connections prevented from opening " +
"because there are already too many to that target cluster."),
- PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER("Number of requests for Phoenix connections, whether successful or not.");
-
+ PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER("Number of requests for Phoenix connections, whether successful or not."),
+ COUNT_RPC_CALLS("Number of RPC calls"),
+ COUNT_REMOTE_RPC_CALLS("Number of remote RPC calls"),
+ COUNT_MILLS_BETWEEN_NEXTS("Sum of milliseconds between sequential next calls"),
+ COUNT_NOT_SERVING_REGION_EXCEPTION("Number of NotServingRegionException caught"),
+ COUNT_BYTES_REGION_SERVER_RESULTS("Number of bytes in Result objects from region servers"),
+ COUNT_BYTES_IN_REMOTE_RESULTS("Number of bytes in Result objects from remote region servers"),
+ COUNT_SCANNED_REGIONS("Number of regions scanned"),
+ COUNT_RPC_RETRIES("Number of RPC retries"),
+ COUNT_REMOTE_RPC_RETRIES("Number of remote RPC retries"),
+ COUNT_ROWS_SCANNED("Number of rows scanned"),
+ COUNT_ROWS_FILTERED("Number of rows filtered");
+
private final String description;
private MetricType(String description) {
@@ -56,6 +67,6 @@ public enum MetricType {
public String description() {
return description;
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b553f96a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
index e6c6be2..0e985ed 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
@@ -177,4 +177,8 @@ public class ReadMetricQueue {
return q;
}
+ public boolean isRequestMetricsEnabled() {
+ return isRequestMetricsEnabled;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b553f96a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java
index ca27686..c10c4d1 100644
--- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java
@@ -17,8 +17,6 @@
*/
package org.apache.phoenix.hive.mapreduce;
-import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
-
import java.io.IOException;
import java.sql.SQLException;
import java.util.List;
@@ -49,6 +47,7 @@ import org.apache.phoenix.iterate.SequenceResultIterator;
import org.apache.phoenix.iterate.TableResultIterator;
import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.monitoring.ReadMetricQueue;
+import org.apache.phoenix.monitoring.ScanMetricsHolder;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
@@ -113,12 +112,14 @@ public class PhoenixRecordReader<T extends DBWritable> implements
String tableName = queryPlan.getTableRef().getTable().getPhysicalName().getString();
long renewScannerLeaseThreshold = queryPlan.getContext().getConnection()
.getQueryServices().getRenewLeaseThresholdMilliSeconds();
+ boolean isRequestMetricsEnabled = readMetrics.isRequestMetricsEnabled();
for (Scan scan : scans) {
scan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, Bytes
.toBytes(true));
- final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan
- .getContext().getConnection().getMutationState(), scan,
- readMetrics.allotMetric(SCAN_BYTES, tableName), renewScannerLeaseThreshold, queryPlan, MapReduceParallelScanGrouper.getInstance() );
+ ScanMetricsHolder scanMetricsHolder = ScanMetricsHolder.getInstance(readMetrics, tableName, scan, isRequestMetricsEnabled);
+ final TableResultIterator tableResultIterator = new TableResultIterator(
+ queryPlan.getContext().getConnection().getMutationState(), scan, scanMetricsHolder,
+ renewScannerLeaseThreshold, queryPlan, MapReduceParallelScanGrouper.getInstance());
PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap
(tableResultIterator);