You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2015/06/27 01:29:03 UTC
[1/4] phoenix git commit: PHOENIX-1819 Build a framework to capture
and report phoenix client side request level metrics
Repository: phoenix
Updated Branches:
refs/heads/4.x-HBase-0.98 006aec5a3 -> 7e29d57bd
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
index 7004f3c..18f914e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -27,6 +27,7 @@ import java.io.Reader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
+import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.util.ArrayList;
@@ -34,6 +35,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
@@ -61,8 +63,9 @@ import org.apache.phoenix.expression.OrderByExpression;
import org.apache.phoenix.expression.RowKeyColumnExpression;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
-import org.apache.phoenix.monitoring.Metric;
-import org.apache.phoenix.monitoring.PhoenixMetrics;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.monitoring.GlobalClientMetrics;
+import org.apache.phoenix.monitoring.GlobalMetric;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.AmbiguousColumnException;
import org.apache.phoenix.schema.ColumnNotFoundException;
@@ -142,7 +145,7 @@ public class PhoenixRuntime {
public static final String ANNOTATION_ATTRIB_PREFIX = "phoenix.annotation.";
/**
- * Use this connection property to explicity enable or disable auto-commit on a new connection.
+ * Use this connection property to explicitly enable or disable auto-commit on a new connection.
*/
public static final String AUTO_COMMIT_ATTRIB = "AutoCommit";
@@ -152,6 +155,11 @@ public class PhoenixRuntime {
* upserting data into them, and getting the uncommitted state through {@link #getUncommittedData(Connection)}
*/
public final static String CONNECTIONLESS = "none";
+
+ /**
+ * Use this connection property to explicitly enable or disable request level metric collection.
+ */
+ public static final String REQUEST_METRIC_ATTRIB = "RequestMetric";
private static final String HEADER_IN_LINE = "in-line";
private static final String SQL_FILE_EXT = ".sql";
@@ -980,9 +988,162 @@ public class PhoenixRuntime {
}
/**
- * Exposes the various internal phoenix metrics.
+ * Exposes the various internal phoenix metrics collected at the client JVM level.
+ */
+ public static Collection<GlobalMetric> getGlobalPhoenixClientMetrics() {
+ return GlobalClientMetrics.getMetrics();
+ }
+
+ /**
+ *
+ * @return whether or not the global client metrics are being collected
*/
- public static Collection<Metric> getInternalPhoenixMetrics() {
- return PhoenixMetrics.getMetrics();
+ public static boolean areGlobalClientMetricsBeingCollected() {
+ return GlobalClientMetrics.isMetricsEnabled();
}
-}
+
+ /**
+ * Method to expose the metrics associated with performing reads using the passed result set. A typical pattern is:
+ *
+ * <pre>
+ * {@code
+ * Map<String, Map<String, Long>> overAllQueryMetrics = null;
+ * Map<String, Map<String, Long>> requestReadMetrics = null;
+ * try (ResultSet rs = stmt.executeQuery()) {
+ * while(rs.next()) {
+ * .....
+ * }
+ * overAllQueryMetrics = PhoenixRuntime.getOverAllReadRequestMetrics(rs);
+ * requestReadMetrics = PhoenixRuntime.getRequestReadMetrics(rs);
+ * PhoenixRuntime.resetMetrics(rs);
+ * }
+ * </pre>
+ *
+ * @param rs
+ * result set to get the metrics for
+ * @return a map of (table name) -> (map of (metric name) -> (metric value))
+ * @throws SQLException
+ */
+ public static Map<String, Map<String, Long>> getRequestReadMetrics(ResultSet rs) throws SQLException {
+ PhoenixResultSet resultSet = rs.unwrap(PhoenixResultSet.class);
+ return resultSet.getReadMetrics();
+ }
+
+ /**
+ * Method to expose the overall metrics associated with executing a query via phoenix. A typical pattern of
+ * accessing request level read metrics and overall read query metrics is:
+ *
+ * <pre>
+ * {@code
+ * Map<String, Map<String, Long>> overAllQueryMetrics = null;
+ * Map<String, Map<String, Long>> requestReadMetrics = null;
+ * try (ResultSet rs = stmt.executeQuery()) {
+ * while(rs.next()) {
+ * .....
+ * }
+ * overAllQueryMetrics = PhoenixRuntime.getOverAllReadRequestMetrics(rs);
+ * requestReadMetrics = PhoenixRuntime.getRequestReadMetrics(rs);
+ * PhoenixRuntime.resetMetrics(rs);
+ * }
+ * </pre>
+ *
+ * @param rs
+ * result set to get the metrics for
+ * @return a map of metric name -> metric value
+ * @throws SQLException
+ */
+ public static Map<String, Long> getOverAllReadRequestMetrics(ResultSet rs) throws SQLException {
+ PhoenixResultSet resultSet = rs.unwrap(PhoenixResultSet.class);
+ return resultSet.getOverAllRequestReadMetrics();
+ }
+
+ /**
+ * Method to expose the metrics associated with sending over mutations to HBase. These metrics are updated when
+ * commit is called on the passed connection. Mutation metrics are accumulated for the connection till
+ * {@link #resetMetrics(Connection)} is called or the connection is closed. Example usage:
+ *
+ * <pre>
+ * {@code
+ * Map<String, Map<String, Long>> mutationWriteMetrics = null;
+ * Map<String, Map<String, Long>> mutationReadMetrics = null;
+ * try (Connection conn = DriverManager.getConnection(url)) {
+ * conn.createStatement.executeUpdate(dml1);
+ * ....
+ * conn.createStatement.executeUpdate(dml2);
+ * ...
+ * conn.createStatement.executeUpdate(dml3);
+ * ...
+ * conn.commit();
+ * mutationWriteMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
+ * mutationReadMetrics = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(conn);
+ * PhoenixRuntime.resetMetrics(rs);
+ * }
+ * </pre>
+ *
+ * @param conn
+ * connection to get the metrics for
+ * @return a map of (table name) -> (map of (metric name) -> (metric value))
+ * @throws SQLException
+ */
+ public static Map<String, Map<String, Long>> getWriteMetricsForMutationsSinceLastReset(Connection conn) throws SQLException {
+ PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+ return pConn.getMutationMetrics();
+ }
+
+ /**
+ * Method to expose the read metrics associated with executing a dml statement. These metrics are updated when
+ * commit is called on the passed connection. Read metrics are accumulated till {@link #resetMetrics(Connection)} is
+ * called or the connection is closed. Example usage:
+ *
+ * <pre>
+ * {@code
+ * Map<String, Map<String, Long>> mutationWriteMetrics = null;
+ * Map<String, Map<String, Long>> mutationReadMetrics = null;
+ * try (Connection conn = DriverManager.getConnection(url)) {
+ * conn.createStatement.executeUpdate(dml1);
+ * ....
+ * conn.createStatement.executeUpdate(dml2);
+ * ...
+ * conn.createStatement.executeUpdate(dml3);
+ * ...
+ * conn.commit();
+ * mutationWriteMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
+ * mutationReadMetrics = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(conn);
+ * PhoenixRuntime.resetMetrics(rs);
+ * }
+ * </pre>
+ * @param conn
+ * connection to get the metrics for
+ * @return a map of (table name) -> (map of (metric name) -> (metric value))
+ * @throws SQLException
+ */
+ public static Map<String, Map<String, Long>> getReadMetricsForMutationsSinceLastReset(Connection conn) throws SQLException {
+ PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+ return pConn.getReadMetrics();
+ }
+
+ /**
+ * Reset the read metrics collected in the result set.
+ *
+ * @see {@link #getRequestReadMetrics(ResultSet)} {@link #getOverAllReadRequestMetrics(ResultSet)}
+ * @param rs
+ * @throws SQLException
+ */
+ public static void resetMetrics(ResultSet rs) throws SQLException {
+ PhoenixResultSet prs = rs.unwrap(PhoenixResultSet.class);
+ prs.resetMetrics();
+ }
+
+ /**
+ * Reset the mutation and reads-for-mutations metrics collected in the connection.
+ *
+ * @see {@link #getReadMetricsForMutationsSinceLastReset(Connection)} {@link #getWriteMetricsForMutationsSinceLastReset(Connection)}
+ * @param conn
+ * @throws SQLException
+ */
+ public static void resetMetrics(Connection conn) throws SQLException {
+ PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+ pConn.clearMetrics();
+ }
+
+ }
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java
index ab6a4a7..5ae1a56 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.memory.DelegatingMemoryManager;
import org.apache.phoenix.memory.GlobalMemoryManager;
import org.apache.phoenix.memory.MemoryManager;
+import org.apache.phoenix.monitoring.MemoryMetricsHolder;
+import org.apache.phoenix.monitoring.SpoolingMetricsHolder;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
import org.apache.phoenix.schema.tuple.Tuple;
@@ -52,7 +54,7 @@ public class SpoolingResultIteratorTest {
};
MemoryManager memoryManager = new DelegatingMemoryManager(new GlobalMemoryManager(threshold, 0));
- ResultIterator scanner = new SpoolingResultIterator(iterator, memoryManager, threshold, maxSizeSpool,"/tmp");
+ ResultIterator scanner = new SpoolingResultIterator(SpoolingMetricsHolder.NO_OP_INSTANCE, MemoryMetricsHolder.NO_OP_INSTANCE, iterator, memoryManager, threshold, maxSizeSpool,"/tmp");
AssertResults.assertResults(scanner, expectedResults);
}
[4/4] phoenix git commit: PHOENIX-1819 Build a framework to capture
and report phoenix client side request level metrics
Posted by sa...@apache.org.
PHOENIX-1819 Build a framework to capture and report phoenix client side request level metrics
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/7e29d57b
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/7e29d57b
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/7e29d57b
Branch: refs/heads/4.x-HBase-0.98
Commit: 7e29d57bda0eb66b1dbd102060253e1b1468f052
Parents: 006aec5
Author: Samarth <sa...@salesforce.com>
Authored: Fri Jun 26 16:28:50 2015 -0700
Committer: Samarth <sa...@salesforce.com>
Committed: Fri Jun 26 16:28:50 2015 -0700
----------------------------------------------------------------------
.../phoenix/end2end/PhoenixMetricsIT.java | 147 ----
.../apache/phoenix/execute/PartialCommitIT.java | 1 +
.../phoenix/monitoring/PhoenixMetricsIT.java | 815 +++++++++++++++++++
.../apache/phoenix/cache/ServerCacheClient.java | 7 +
.../apache/phoenix/compile/DeleteCompiler.java | 50 +-
.../MutatingParallelIteratorFactory.java | 51 +-
.../phoenix/compile/StatementContext.java | 49 +-
.../apache/phoenix/compile/UpsertCompiler.java | 80 +-
.../apache/phoenix/execute/AggregatePlan.java | 8 +-
.../apache/phoenix/execute/HashJoinPlan.java | 7 +
.../apache/phoenix/execute/MutationState.java | 290 ++++---
.../org/apache/phoenix/execute/UnionPlan.java | 8 +-
.../phoenix/iterate/BaseResultIterators.java | 15 +-
.../phoenix/iterate/ChunkedResultIterator.java | 21 +-
.../iterate/ParallelIteratorFactory.java | 4 +-
.../phoenix/iterate/ParallelIterators.java | 25 +-
.../iterate/RoundRobinResultIterator.java | 4 +-
.../phoenix/iterate/ScanningResultIterator.java | 38 +-
.../apache/phoenix/iterate/SerialIterators.java | 23 +-
.../phoenix/iterate/SpoolingResultIterator.java | 49 +-
.../phoenix/iterate/TableResultIterator.java | 17 +-
.../phoenix/iterate/UnionResultIterators.java | 70 +-
.../apache/phoenix/jdbc/PhoenixConnection.java | 28 +-
.../phoenix/jdbc/PhoenixDatabaseMetaData.java | 21 +-
.../apache/phoenix/jdbc/PhoenixResultSet.java | 48 +-
.../apache/phoenix/jdbc/PhoenixStatement.java | 20 +-
.../java/org/apache/phoenix/job/JobManager.java | 60 +-
.../phoenix/mapreduce/CsvBulkLoadTool.java | 10 +-
.../phoenix/mapreduce/PhoenixRecordReader.java | 12 +-
.../phoenix/memory/GlobalMemoryManager.java | 5 -
.../apache/phoenix/monitoring/AtomicMetric.java | 70 ++
.../phoenix/monitoring/CombinableMetric.java | 77 ++
.../monitoring/CombinableMetricImpl.java | 77 ++
.../org/apache/phoenix/monitoring/Counter.java | 85 --
.../phoenix/monitoring/GlobalClientMetrics.java | 117 +++
.../apache/phoenix/monitoring/GlobalMetric.java | 37 +
.../phoenix/monitoring/GlobalMetricImpl.java | 74 ++
.../phoenix/monitoring/MemoryMetricsHolder.java | 43 +
.../org/apache/phoenix/monitoring/Metric.java | 45 +-
.../apache/phoenix/monitoring/MetricType.java | 55 ++
.../phoenix/monitoring/MetricsStopWatch.java | 59 ++
.../phoenix/monitoring/MutationMetricQueue.java | 131 +++
.../phoenix/monitoring/NonAtomicMetric.java | 71 ++
.../phoenix/monitoring/OverAllQueryMetrics.java | 121 +++
.../phoenix/monitoring/PhoenixMetrics.java | 118 ---
.../phoenix/monitoring/ReadMetricQueue.java | 180 ++++
.../phoenix/monitoring/SizeStatistic.java | 78 --
.../monitoring/SpoolingMetricsHolder.java | 43 +
.../monitoring/TaskExecutionMetricsHolder.java | 68 ++
.../phoenix/query/BaseQueryServicesImpl.java | 2 +-
.../org/apache/phoenix/query/QueryServices.java | 3 +-
.../phoenix/query/QueryServicesOptions.java | 25 +-
.../phoenix/trace/PhoenixMetricsSink.java | 36 +-
.../java/org/apache/phoenix/util/JDBCUtil.java | 5 +
.../org/apache/phoenix/util/PhoenixRuntime.java | 175 +++-
.../iterate/SpoolingResultIteratorTest.java | 4 +-
56 files changed, 2933 insertions(+), 849 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixMetricsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixMetricsIT.java
deleted file mode 100644
index edb4042..0000000
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixMetricsIT.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.end2end;
-
-import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.FAILED_QUERY;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.MUTATION_COUNT;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.NUM_SPOOL_FILE;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.QUERY_COUNT;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.QUERY_TIMEOUT;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.REJECTED_TASK_COUNT;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MUTATION_BATCH_SIZE;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MUTATION_BYTES;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MUTATION_COMMIT_TIME;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.PARALLEL_SCANS;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.QUERY_TIME;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.SCAN_BYTES;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-
-import org.apache.phoenix.monitoring.Metric;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.junit.Test;
-
-public class PhoenixMetricsIT extends BaseHBaseManagedTimeIT {
-
- @Test
- public void testResetPhoenixMetrics() {
- resetMetrics();
- for (Metric m : PhoenixRuntime.getInternalPhoenixMetrics()) {
- assertEquals(0, m.getTotalSum());
- assertEquals(0, m.getNumberOfSamples());
- }
- }
-
- @Test
- public void testPhoenixMetricsForQueries() throws Exception {
- createTableAndInsertValues("T", true);
- resetMetrics(); // we want to count metrics related only to the below query
- Connection conn = DriverManager.getConnection(getUrl());
- String query = "SELECT * FROM T";
- ResultSet rs = conn.createStatement().executeQuery(query);
- while (rs.next()) {
- rs.getString(1);
- rs.getString(2);
- }
- assertEquals(1, PARALLEL_SCANS.getMetric().getTotalSum());
- assertEquals(1, QUERY_COUNT.getMetric().getTotalSum());
- assertEquals(0, REJECTED_TASK_COUNT.getMetric().getTotalSum());
- assertEquals(0, QUERY_TIMEOUT.getMetric().getTotalSum());
- assertEquals(0, FAILED_QUERY.getMetric().getTotalSum());
- assertEquals(0, NUM_SPOOL_FILE.getMetric().getTotalSum());
- assertEquals(0, MUTATION_BATCH_SIZE.getMetric().getTotalSum());
- assertEquals(0, MUTATION_BYTES.getMetric().getTotalSum());
- assertEquals(0, MUTATION_COMMIT_TIME.getMetric().getTotalSum());
-
- assertTrue(SCAN_BYTES.getMetric().getTotalSum() > 0);
- assertTrue(QUERY_TIME.getMetric().getTotalSum() > 0);
- }
-
- @Test
- public void testPhoenixMetricsForMutations() throws Exception {
- createTableAndInsertValues("T", true);
- assertEquals(10, MUTATION_BATCH_SIZE.getMetric().getTotalSum());
- assertEquals(10, MUTATION_COUNT.getMetric().getTotalSum());
- assertTrue(MUTATION_BYTES.getMetric().getTotalSum() > 0);
- assertTrue(MUTATION_COMMIT_TIME.getMetric().getTotalSum() > 0);
- assertEquals(0, PARALLEL_SCANS.getMetric().getTotalSum());
- assertEquals(0, QUERY_COUNT.getMetric().getTotalSum());
- assertEquals(0, REJECTED_TASK_COUNT.getMetric().getTotalSum());
- assertEquals(0, QUERY_TIMEOUT.getMetric().getTotalSum());
- assertEquals(0, FAILED_QUERY.getMetric().getTotalSum());
- assertEquals(0, NUM_SPOOL_FILE.getMetric().getTotalSum());
- }
-
-
- @Test
- public void testPhoenixMetricsForUpsertSelect() throws Exception {
- createTableAndInsertValues("T", true);
- resetMetrics();
- String ddl = "CREATE TABLE T2 (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)";
- Connection conn = DriverManager.getConnection(getUrl());
- conn.createStatement().execute(ddl);
- resetMetrics();
- String dml = "UPSERT INTO T2 (K, V) SELECT K, V FROM T";
- conn.createStatement().executeUpdate(dml);
- conn.commit();
- assertEquals(10, MUTATION_BATCH_SIZE.getMetric().getTotalSum());
- assertEquals(1, MUTATION_COUNT.getMetric().getTotalSum());
- assertEquals(1, PARALLEL_SCANS.getMetric().getTotalSum());
- assertEquals(0, QUERY_TIME.getMetric().getTotalSum());
- assertTrue(SCAN_BYTES.getMetric().getTotalSum() > 0);
- assertTrue(MUTATION_BYTES.getMetric().getTotalSum() > 0);
- assertTrue(MUTATION_COMMIT_TIME.getMetric().getTotalSum() > 0);
- assertEquals(0, QUERY_COUNT.getMetric().getTotalSum());
- assertEquals(0, REJECTED_TASK_COUNT.getMetric().getTotalSum());
- assertEquals(0, QUERY_TIMEOUT.getMetric().getTotalSum());
- assertEquals(0, FAILED_QUERY.getMetric().getTotalSum());
- assertEquals(0, NUM_SPOOL_FILE.getMetric().getTotalSum());
- }
-
- private static void resetMetrics() {
- for (Metric m : PhoenixRuntime.getInternalPhoenixMetrics()) {
- m.reset();
- }
- }
-
- private static void createTableAndInsertValues(String tableName, boolean resetMetricsAfterTableCreate) throws Exception {
- String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)";
- Connection conn = DriverManager.getConnection(getUrl());
- conn.createStatement().execute(ddl);
- if (resetMetricsAfterTableCreate) {
- resetMetrics();
- }
- // executing 10 upserts/mutations.
- String dml = "UPSERT INTO " + tableName + " VALUES (?, ?)";
- PreparedStatement stmt = conn.prepareStatement(dml);
- for (int i = 1; i <= 10; i++) {
- stmt.setString(1, "key" + i);
- stmt.setString(2, "value" + i);
- stmt.executeUpdate();
- }
- conn.commit();
- }
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
index c8696e2..e0f0a3c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
@@ -260,6 +260,7 @@ public class PartialCommitIT {
PhoenixConnection phxCon = new PhoenixConnection(con.unwrap(PhoenixConnection.class));
final Map<TableRef,Map<ImmutableBytesPtr,MutationState.RowMutationState>> mutations = Maps.newTreeMap(new TableRefComparator());
return new PhoenixConnection(phxCon) {
+ @Override
protected MutationState newMutationState(int maxSize) {
return new MutationState(maxSize, this, mutations);
};
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
new file mode 100644
index 0000000..d9ca8e8
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
@@ -0,0 +1,815 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_QUERY_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BATCH_SIZE;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BYTES;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_COMMIT_TIME;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_NUM_PARALLEL_SCANS;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIME;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIMEOUT_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_REJECTED_TASK_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SCAN_BYTES;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SELECT_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SPOOL_FILE_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_END_TO_END_TIME;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_EXECUTION_TIME;
+import static org.apache.phoenix.monitoring.MetricType.MEMORY_CHUNK_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTION_TIME;
+import static org.apache.phoenix.util.PhoenixRuntime.UPSERT_BATCH_SIZE_ATTRIB;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.end2end.BaseOwnClusterHBaseManagedTimeIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+public class PhoenixMetricsIT extends BaseOwnClusterHBaseManagedTimeIT {
+
+ private static final List<String> mutationMetricsToSkip = Lists
+ .newArrayList(MetricType.MUTATION_COMMIT_TIME.name());
+ private static final List<String> readMetricsToSkip = Lists.newArrayList(MetricType.TASK_QUEUE_WAIT_TIME.name(),
+ MetricType.TASK_EXECUTION_TIME.name(), MetricType.TASK_END_TO_END_TIME.name());
+
+ @BeforeClass
+ public static void doSetup() throws Exception {
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+ // Enable request metric collection at the driver level
+ props.put(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, String.valueOf(true));
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ @Test
+ public void testResetGlobalPhoenixMetrics() {
+ resetGlobalMetrics();
+ for (GlobalMetric m : PhoenixRuntime.getGlobalPhoenixClientMetrics()) {
+ assertEquals(0, m.getTotalSum());
+ assertEquals(0, m.getNumberOfSamples());
+ }
+ }
+
+ @Test
+ public void testGlobalPhoenixMetricsForQueries() throws Exception {
+ createTableAndInsertValues("T", true);
+ resetGlobalMetrics(); // we want to count metrics related only to the below query
+ Connection conn = DriverManager.getConnection(getUrl());
+ String query = "SELECT * FROM T";
+ ResultSet rs = conn.createStatement().executeQuery(query);
+ while (rs.next()) {
+ rs.getString(1);
+ rs.getString(2);
+ }
+ assertEquals(1, GLOBAL_NUM_PARALLEL_SCANS.getMetric().getTotalSum());
+ assertEquals(1, GLOBAL_SELECT_SQL_COUNTER.getMetric().getTotalSum());
+ assertEquals(0, GLOBAL_REJECTED_TASK_COUNTER.getMetric().getTotalSum());
+ assertEquals(0, GLOBAL_QUERY_TIMEOUT_COUNTER.getMetric().getTotalSum());
+ assertEquals(0, GLOBAL_FAILED_QUERY_COUNTER.getMetric().getTotalSum());
+ assertEquals(0, GLOBAL_SPOOL_FILE_COUNTER.getMetric().getTotalSum());
+ assertEquals(0, GLOBAL_MUTATION_BATCH_SIZE.getMetric().getTotalSum());
+ assertEquals(0, GLOBAL_MUTATION_BYTES.getMetric().getTotalSum());
+ assertEquals(0, GLOBAL_MUTATION_COMMIT_TIME.getMetric().getTotalSum());
+
+ assertTrue(GLOBAL_SCAN_BYTES.getMetric().getTotalSum() > 0);
+ assertTrue(GLOBAL_QUERY_TIME.getMetric().getTotalSum() > 0);
+ assertTrue(GLOBAL_TASK_END_TO_END_TIME.getMetric().getTotalSum() > 0);
+ assertTrue(GLOBAL_TASK_EXECUTION_TIME.getMetric().getTotalSum() > 0);
+ }
+
+ @Test
+ public void testGlobalPhoenixMetricsForMutations() throws Exception {
+ createTableAndInsertValues("T", true);
+ assertEquals(10, GLOBAL_MUTATION_BATCH_SIZE.getMetric().getTotalSum());
+ assertEquals(10, GLOBAL_MUTATION_SQL_COUNTER.getMetric().getTotalSum());
+ assertTrue(GLOBAL_MUTATION_BYTES.getMetric().getTotalSum() > 0);
+ assertTrue(GLOBAL_MUTATION_COMMIT_TIME.getMetric().getTotalSum() > 0);
+ assertEquals(0, GLOBAL_NUM_PARALLEL_SCANS.getMetric().getTotalSum());
+ assertEquals(0, GLOBAL_SELECT_SQL_COUNTER.getMetric().getTotalSum());
+ assertEquals(0, GLOBAL_REJECTED_TASK_COUNTER.getMetric().getTotalSum());
+ assertEquals(0, GLOBAL_QUERY_TIMEOUT_COUNTER.getMetric().getTotalSum());
+ assertEquals(0, GLOBAL_FAILED_QUERY_COUNTER.getMetric().getTotalSum());
+ assertEquals(0, GLOBAL_SPOOL_FILE_COUNTER.getMetric().getTotalSum());
+ }
+
+ @Test
+ public void testGlobalPhoenixMetricsForUpsertSelect() throws Exception {
+ createTableAndInsertValues("T", true);
+ resetGlobalMetrics();
+ String ddl = "CREATE TABLE T2 (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)";
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.createStatement().execute(ddl);
+ resetGlobalMetrics();
+ String dml = "UPSERT INTO T2 (K, V) SELECT K, V FROM T";
+ conn.createStatement().executeUpdate(dml);
+ conn.commit();
+ assertEquals(10, GLOBAL_MUTATION_BATCH_SIZE.getMetric().getTotalSum());
+ assertEquals(1, GLOBAL_MUTATION_SQL_COUNTER.getMetric().getTotalSum());
+ assertEquals(1, GLOBAL_NUM_PARALLEL_SCANS.getMetric().getTotalSum());
+ assertEquals(0, GLOBAL_QUERY_TIME.getMetric().getTotalSum());
+ assertTrue(GLOBAL_SCAN_BYTES.getMetric().getTotalSum() > 0);
+ assertTrue(GLOBAL_MUTATION_BYTES.getMetric().getTotalSum() > 0);
+ assertTrue(GLOBAL_MUTATION_COMMIT_TIME.getMetric().getTotalSum() > 0);
+ assertEquals(0, GLOBAL_SELECT_SQL_COUNTER.getMetric().getTotalSum());
+ assertEquals(0, GLOBAL_REJECTED_TASK_COUNTER.getMetric().getTotalSum());
+ assertEquals(0, GLOBAL_QUERY_TIMEOUT_COUNTER.getMetric().getTotalSum());
+ assertEquals(0, GLOBAL_FAILED_QUERY_COUNTER.getMetric().getTotalSum());
+ assertEquals(0, GLOBAL_SPOOL_FILE_COUNTER.getMetric().getTotalSum());
+ }
+
+ private static void resetGlobalMetrics() {
+ for (GlobalMetric m : PhoenixRuntime.getGlobalPhoenixClientMetrics()) {
+ m.reset();
+ }
+ }
+
+ private static void createTableAndInsertValues(String tableName, boolean resetGlobalMetricsAfterTableCreate)
+ throws Exception {
+ String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)";
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.createStatement().execute(ddl);
+ if (resetGlobalMetricsAfterTableCreate) {
+ resetGlobalMetrics();
+ }
+ // executing 10 upserts/mutations.
+ String dml = "UPSERT INTO " + tableName + " VALUES (?, ?)";
+ PreparedStatement stmt = conn.prepareStatement(dml);
+ for (int i = 1; i <= 10; i++) {
+ stmt.setString(1, "key" + i);
+ stmt.setString(2, "value" + i);
+ stmt.executeUpdate();
+ }
+ conn.commit();
+ }
+
+ @Test
+ public void testOverallQueryMetricsForSelect() throws Exception {
+ String tableName = "SCANMETRICS";
+ String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = 6";
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.createStatement().execute(ddl);
+ }
+
+ @Test
+ public void testReadMetricsForSelect() throws Exception {
+ String tableName = "READMETRICSFORSELECT";
+ long numSaltBuckets = 6;
+ String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = "
+ + numSaltBuckets;
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.createStatement().execute(ddl);
+
+ long numRows = 1000;
+ long numExpectedTasks = numSaltBuckets;
+ insertRowsInTable(tableName, numRows);
+
+ String query = "SELECT * FROM " + tableName;
+ Statement stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery(query);
+ PhoenixResultSet resultSetBeingTested = rs.unwrap(PhoenixResultSet.class);
+ changeInternalStateForTesting(resultSetBeingTested);
+ while (resultSetBeingTested.next()) {}
+ resultSetBeingTested.close();
+ Set<String> expectedTableNames = Sets.newHashSet(tableName);
+ assertReadMetricValuesForSelectSql(Lists.newArrayList(numRows), Lists.newArrayList(numExpectedTasks),
+ resultSetBeingTested, expectedTableNames);
+ }
+
+ @Test
+ public void testMetricsForUpsert() throws Exception {
+ String tableName = "UPSERTMETRICS";
+ String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = 6";
+ Connection ddlConn = DriverManager.getConnection(getUrl());
+ ddlConn.createStatement().execute(ddl);
+ ddlConn.close();
+
+ int numRows = 10;
+ Connection conn = insertRowsInTable(tableName, numRows);
+ PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+ Map<String, Map<String, Long>> mutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
+ for (Entry<String, Map<String, Long>> entry : mutationMetrics.entrySet()) {
+ String t = entry.getKey();
+ assertEquals("Table names didn't match!", tableName, t);
+ Map<String, Long> p = entry.getValue();
+ assertEquals("There should have been three metrics", 3, p.size());
+ boolean mutationBatchSizePresent = false;
+ boolean mutationCommitTimePresent = false;
+ boolean mutationBytesPresent = false;
+ for (Entry<String, Long> metric : p.entrySet()) {
+ String metricName = metric.getKey();
+ long metricValue = metric.getValue();
+ if (metricName.equals(MetricType.MUTATION_BATCH_SIZE.name())) {
+ assertEquals("Mutation batch sizes didn't match!", numRows, metricValue);
+ mutationBatchSizePresent = true;
+ } else if (metricName.equals(MetricType.MUTATION_COMMIT_TIME.name())) {
+ assertTrue("Mutation commit time should be greater than zero", metricValue > 0);
+ mutationCommitTimePresent = true;
+ } else if (metricName.equals(MetricType.MUTATION_BYTES.name())) {
+ assertTrue("Mutation bytes size should be greater than zero", metricValue > 0);
+ mutationBytesPresent = true;
+ }
+ }
+ assertTrue(mutationBatchSizePresent);
+ assertTrue(mutationCommitTimePresent);
+ assertTrue(mutationBytesPresent);
+ }
+ Map<String, Map<String, Long>> readMetrics = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+ assertEquals("Read metrics should be empty", 0, readMetrics.size());
+ }
+
+ @Test
+ public void testMetricsForUpsertSelect() throws Exception {
+ String tableName1 = "UPSERTFROM";
+ long table1SaltBuckets = 6;
+ String ddl = "CREATE TABLE " + tableName1 + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = "
+ + table1SaltBuckets;
+ Connection ddlConn = DriverManager.getConnection(getUrl());
+ ddlConn.createStatement().execute(ddl);
+ ddlConn.close();
+ int numRows = 10;
+ insertRowsInTable(tableName1, numRows);
+
+ String tableName2 = "UPSERTTO";
+ ddl = "CREATE TABLE " + tableName2 + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = 10";
+ ddlConn = DriverManager.getConnection(getUrl());
+ ddlConn.createStatement().execute(ddl);
+ ddlConn.close();
+
+ Connection conn = DriverManager.getConnection(getUrl());
+ String upsertSelect = "UPSERT INTO " + tableName2 + " SELECT * FROM " + tableName1;
+ conn.createStatement().executeUpdate(upsertSelect);
+ conn.commit();
+ PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+
+ Map<String, Map<String, Long>> mutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
+ assertMutationMetrics(tableName2, numRows, mutationMetrics);
+ Map<String, Map<String, Long>> readMetrics = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+ assertReadMetricsForMutatingSql(tableName1, table1SaltBuckets, readMetrics);
+ }
+
+ @Test
+ public void testMetricsForDelete() throws Exception {
+ String tableName = "DELETEMETRICS";
+ long tableSaltBuckets = 6;
+ String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = "
+ + tableSaltBuckets;
+ Connection ddlConn = DriverManager.getConnection(getUrl());
+ ddlConn.createStatement().execute(ddl);
+ ddlConn.close();
+ int numRows = 10;
+ insertRowsInTable(tableName, numRows);
+ Connection conn = DriverManager.getConnection(getUrl());
+ String delete = "DELETE FROM " + tableName;
+ conn.createStatement().execute(delete);
+ conn.commit();
+ PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+ Map<String, Map<String, Long>> mutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
+ assertMutationMetrics(tableName, numRows, mutationMetrics);
+
+ Map<String, Map<String, Long>> readMetrics = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+ assertReadMetricsForMutatingSql(tableName, tableSaltBuckets, readMetrics);
+ }
+
+ @Test
+ public void testNoMetricsCollectedForConnection() throws Exception {
+ String tableName = "NOMETRICS";
+ long tableSaltBuckets = 6;
+ String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = "
+ + tableSaltBuckets;
+ Connection ddlConn = DriverManager.getConnection(getUrl());
+ ddlConn.createStatement().execute(ddl);
+ ddlConn.close();
+ int numRows = 10;
+ insertRowsInTable(tableName, numRows);
+ Properties props = new Properties();
+ props.setProperty(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, "false");
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName);
+ while (rs.next()) {}
+ rs.close();
+ Map<String, Map<String, Long>> readMetrics = PhoenixRuntime.getRequestReadMetrics(rs);
+ assertTrue("No read metrics should have been generated", readMetrics.size() == 0);
+ conn.createStatement().executeUpdate("UPSERT INTO " + tableName + " VALUES ('KEY', 'VALUE')");
+ conn.commit();
+ Map<String, Map<String, Long>> writeMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
+ assertTrue("No write metrics should have been generated", writeMetrics.size() == 0);
+ }
+
+ @Test
+ public void testMetricsForUpsertWithAutoCommit() throws Exception {
+ String tableName = "VERIFYUPSERTAUTOCOMMIT";
+ long tableSaltBuckets = 6;
+ String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = "
+ + tableSaltBuckets;
+ try (Connection ddlConn = DriverManager.getConnection(getUrl())) {
+ ddlConn.createStatement().execute(ddl);
+ }
+
+ String upsert = "UPSERT INTO " + tableName + " VALUES (?, ?)";
+ int numRows = 10;
+ Map<String, Map<String, Long>> mutationMetricsForAutoCommitOff = null;
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.setAutoCommit(false);
+ upsertRows(upsert, numRows, conn);
+ conn.commit();
+ mutationMetricsForAutoCommitOff = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
+ }
+
+ // Insert rows now with auto-commit on
+ Map<String, Map<String, Long>> mutationMetricsAutoCommitOn = null;
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.setAutoCommit(true);
+ upsertRows(upsert, numRows, conn);
+ mutationMetricsAutoCommitOn = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
+ }
+ // Verify that the mutation metrics are same for both cases
+ assertMetricsAreSame(mutationMetricsForAutoCommitOff, mutationMetricsAutoCommitOn, mutationMetricsToSkip);
+ }
+
+ private void upsertRows(String upsert, int numRows, Connection conn) throws SQLException {
+ PreparedStatement stmt = conn.prepareStatement(upsert);
+ for (int i = 1; i <= numRows; i++) {
+ stmt.setString(1, "key" + i);
+ stmt.setString(2, "value" + i);
+ stmt.executeUpdate();
+ }
+ }
+
+ @Test
+ public void testMetricsForDeleteWithAutoCommit() throws Exception {
+ String tableName = "VERIFYDELETEAUTOCOMMIT";
+ long tableSaltBuckets = 6;
+ String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = "
+ + tableSaltBuckets;
+ try (Connection ddlConn = DriverManager.getConnection(getUrl())) {
+ ddlConn.createStatement().execute(ddl);
+ }
+
+ String upsert = "UPSERT INTO " + tableName + " VALUES (?, ?)";
+ int numRows = 10;
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.setAutoCommit(false);
+ upsertRows(upsert, numRows, conn);
+ conn.commit();
+ }
+
+ String delete = "DELETE FROM " + tableName;
+ // Delete rows now with auto-commit off
+ Map<String, Map<String, Long>> deleteMetricsWithAutoCommitOff = null;
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.setAutoCommit(false);
+ conn.createStatement().executeUpdate(delete);
+ deleteMetricsWithAutoCommitOff = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
+ }
+
+ // Upsert the rows back
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.setAutoCommit(false);
+ upsertRows(upsert, numRows, conn);
+ conn.commit();
+ }
+
+ // Now delete rows with auto-commit on
+ Map<String, Map<String, Long>> deleteMetricsWithAutoCommitOn = null;
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.setAutoCommit(true);
+ conn.createStatement().executeUpdate(delete);
+ deleteMetricsWithAutoCommitOn = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
+ }
+
+ // Verify that the mutation metrics are same for both cases.
+ assertMetricsAreSame(deleteMetricsWithAutoCommitOff, deleteMetricsWithAutoCommitOn, mutationMetricsToSkip);
+ }
+
+ @Test
+ public void testMetricsForUpsertSelectWithAutoCommit() throws Exception {
+ String tableName1 = "UPSERTFROMAUTOCOMMIT";
+ long table1SaltBuckets = 6;
+ String ddl = "CREATE TABLE " + tableName1 + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = "
+ + table1SaltBuckets;
+ Connection ddlConn = DriverManager.getConnection(getUrl());
+ ddlConn.createStatement().execute(ddl);
+ ddlConn.close();
+ int numRows = 10;
+ insertRowsInTable(tableName1, numRows);
+
+ String tableName2 = "UPSERTTOAUTCOMMIT";
+ ddl = "CREATE TABLE " + tableName2 + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = 10";
+ ddlConn = DriverManager.getConnection(getUrl());
+ ddlConn.createStatement().execute(ddl);
+ ddlConn.close();
+
+ String upsertSelect = "UPSERT INTO " + tableName2 + " SELECT * FROM " + tableName1;
+
+ Map<String, Map<String, Long>> mutationMetricsAutoCommitOff = null;
+ Map<String, Map<String, Long>> readMetricsAutoCommitOff = null;
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.setAutoCommit(false);
+ conn.createStatement().executeUpdate(upsertSelect);
+ conn.commit();
+ PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+ mutationMetricsAutoCommitOff = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
+ readMetricsAutoCommitOff = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+ }
+
+ Map<String, Map<String, Long>> mutationMetricsAutoCommitOn = null;
+ Map<String, Map<String, Long>> readMetricsAutoCommitOn = null;
+
+ int autoCommitBatchSize = numRows + 1; // batchsize = 11 is less than numRows and is not a divisor of batchsize
+ Properties props = new Properties();
+ props.setProperty(UPSERT_BATCH_SIZE_ATTRIB, Integer.toString(autoCommitBatchSize));
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ conn.createStatement().executeUpdate(upsertSelect);
+ PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+ mutationMetricsAutoCommitOn = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
+ readMetricsAutoCommitOn = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+ }
+ assertMetricsAreSame(mutationMetricsAutoCommitOff, mutationMetricsAutoCommitOn, mutationMetricsToSkip);
+ assertMetricsAreSame(readMetricsAutoCommitOff, readMetricsAutoCommitOn, readMetricsToSkip);
+
+ autoCommitBatchSize = numRows - 1; // batchsize = 9 is less than numRows and is not a divisor of batchsize
+ props = new Properties();
+ props.setProperty(UPSERT_BATCH_SIZE_ATTRIB, Integer.toString(autoCommitBatchSize));
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ conn.createStatement().executeUpdate(upsertSelect);
+ PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+ mutationMetricsAutoCommitOn = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
+ readMetricsAutoCommitOn = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+ }
+ assertMetricsAreSame(mutationMetricsAutoCommitOff, mutationMetricsAutoCommitOn, mutationMetricsToSkip);
+ assertMetricsAreSame(readMetricsAutoCommitOff, readMetricsAutoCommitOn, readMetricsToSkip);
+
+ autoCommitBatchSize = numRows;
+ props = new Properties();
+ props.setProperty(UPSERT_BATCH_SIZE_ATTRIB, Integer.toString(autoCommitBatchSize));
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ conn.createStatement().executeUpdate(upsertSelect);
+ PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+ mutationMetricsAutoCommitOn = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
+ readMetricsAutoCommitOn = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+ }
+ assertMetricsAreSame(mutationMetricsAutoCommitOff, mutationMetricsAutoCommitOn, mutationMetricsToSkip);
+ assertMetricsAreSame(readMetricsAutoCommitOff, readMetricsAutoCommitOn, readMetricsToSkip);
+
+ autoCommitBatchSize = 2; // multiple batches of equal size
+ props = new Properties();
+ props.setProperty(UPSERT_BATCH_SIZE_ATTRIB, Integer.toString(autoCommitBatchSize));
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ conn.createStatement().executeUpdate(upsertSelect);
+ PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+ mutationMetricsAutoCommitOn = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
+ readMetricsAutoCommitOn = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+ }
+ assertMetricsAreSame(mutationMetricsAutoCommitOff, mutationMetricsAutoCommitOn, mutationMetricsToSkip);
+ assertMetricsAreSame(readMetricsAutoCommitOff, readMetricsAutoCommitOff, readMetricsToSkip);
+ }
+
+ @Test
+ public void testMutationMetricsWhenUpsertingToMultipleTables() throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String table1 = "TABLE1";
+ createTableAndInsertValues(true, 10, conn, table1);
+ String table2 = "TABLE2";
+ createTableAndInsertValues(true, 10, conn, table2);
+ String table3 = "TABLE3";
+ createTableAndInsertValues(true, 10, conn, table3);
+ Map<String, Map<String, Long>> mutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
+ assertTrue("Mutation metrics not present for " + table1, mutationMetrics.get(table1) != null);
+ assertTrue("Mutation metrics not present for " + table2, mutationMetrics.get(table2) != null);
+ assertTrue("Mutation metrics not present for " + table3, mutationMetrics.get(table3) != null);
+ assertMetricsHaveSameValues(mutationMetrics.get(table1), mutationMetrics.get(table2), mutationMetricsToSkip);
+ assertMetricsHaveSameValues(mutationMetrics.get(table1), mutationMetrics.get(table3), mutationMetricsToSkip);
+ }
+ }
+
+ @Test
+ public void testClosingConnectionClearsMetrics() throws Exception {
+ Connection conn = null;
+ try {
+ conn = DriverManager.getConnection(getUrl());
+ createTableAndInsertValues(true, 10, conn, "clearmetrics");
+ assertTrue("Mutation metrics not present", PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn).size() > 0);
+ } finally {
+ if (conn != null) {
+ conn.close();
+ assertTrue("Closing connection didn't clear metrics",
+ PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn).size() == 0);
+ }
+ }
+ }
+
+ @Test
+ public void testMetricsForUpsertingIntoImmutableTableWithIndices() throws Exception {
+ String dataTable = "IMMTABLEWITHINDICES";
+ String tableDdl = "CREATE TABLE "
+ + dataTable
+ + " (K1 VARCHAR NOT NULL, K2 VARCHAR NOT NULL, V1 INTEGER, V2 INTEGER, V3 INTEGER CONSTRAINT NAME_PK PRIMARY KEY(K1, K2)) IMMUTABLE_ROWS = true";
+ String index1 = "I1";
+ String index1Ddl = "CREATE INDEX " + index1 + " ON " + dataTable + " (V1) include (V2)";
+ String index2 = "I2";
+ String index2Ddl = "CREATE INDEX " + index2 + " ON " + dataTable + " (V2) include (V3)";
+ String index3 = "I3";
+ String index3Ddl = "CREATE INDEX " + index3 + " ON " + dataTable + " (V3) include (V1)";
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.createStatement().execute(tableDdl);
+ conn.createStatement().execute(index1Ddl);
+ conn.createStatement().execute(index2Ddl);
+ conn.createStatement().execute(index3Ddl);
+ }
+ String upsert = "UPSERT INTO " + dataTable + " VALUES (?, ?, ?, ?, ?)";
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ /*
+ * Upsert data into table. Because the table is immutable, mutations for updating the indices on it are
+ * handled by the client itself. So mutation metrics should include mutations for the indices as well as the
+ * data table.
+ */
+ PreparedStatement stmt = conn.prepareStatement(upsert);
+ for (int i = 1; i < 10; i++) {
+ stmt.setString(1, "key1" + i);
+ stmt.setString(2, "key2" + i);
+ stmt.setInt(3, i);
+ stmt.setInt(4, i);
+ stmt.setInt(5, i);
+ stmt.executeUpdate();
+ }
+ conn.commit();
+ Map<String, Map<String, Long>> metrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
+ assertTrue(metrics.get(dataTable).size() > 0);
+ assertTrue(metrics.get(index1).size() > 0);
+ assertTrue(metrics.get(index2).size() > 0);
+ assertMetricsHaveSameValues(metrics.get(index1), metrics.get(index2), mutationMetricsToSkip);
+ assertTrue(metrics.get(index3).size() > 0);
+ assertMetricsHaveSameValues(metrics.get(index1), metrics.get(index3), mutationMetricsToSkip);
+ }
+ }
+
+ @Test
+ public void testMetricsForUpsertSelectSameTable() throws Exception {
+ String tableName = "UPSERTSAME";
+ long table1SaltBuckets = 6;
+ String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = "
+ + table1SaltBuckets;
+ Connection ddlConn = DriverManager.getConnection(getUrl());
+ ddlConn.createStatement().execute(ddl);
+ ddlConn.close();
+ int numRows = 10;
+ insertRowsInTable(tableName, numRows);
+
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.setAutoCommit(false);
+ String upsertSelect = "UPSERT INTO " + tableName + " SELECT * FROM " + tableName;
+ conn.createStatement().executeUpdate(upsertSelect);
+ conn.commit();
+ PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+
+ Map<String, Map<String, Long>> mutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
+ // Because auto-commit is off, upsert select into the same table will run on the client.
+ // So we should have client side read and write metrics available.
+ assertMutationMetrics(tableName, numRows, mutationMetrics);
+ Map<String, Map<String, Long>> readMetrics = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+ assertReadMetricsForMutatingSql(tableName, table1SaltBuckets, readMetrics);
+ PhoenixRuntime.resetMetrics(pConn);
+ // With autocommit on, still, this upsert select runs on the client side.
+ conn.setAutoCommit(true);
+ conn.createStatement().executeUpdate(upsertSelect);
+ Map<String, Map<String, Long>> autoCommitMutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
+ Map<String, Map<String, Long>> autoCommitReadMetrics = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+ assertMetricsAreSame(mutationMetrics, autoCommitMutationMetrics, mutationMetricsToSkip);
+ assertMetricsAreSame(readMetrics, autoCommitReadMetrics, readMetricsToSkip);
+ }
+
+ private void createTableAndInsertValues(boolean commit, int numRows, Connection conn, String tableName)
+ throws SQLException {
+ String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)";
+ conn.createStatement().execute(ddl);
+ // executing 10 upserts/mutations.
+ String dml = "UPSERT INTO " + tableName + " VALUES (?, ?)";
+ PreparedStatement stmt = conn.prepareStatement(dml);
+ for (int i = 1; i <= numRows; i++) {
+ stmt.setString(1, "key" + i);
+ stmt.setString(2, "value" + i);
+ stmt.executeUpdate();
+ }
+ if (commit) {
+ conn.commit();
+ }
+ }
+
+ private void assertMetricsAreSame(Map<String, Map<String, Long>> metric1, Map<String, Map<String, Long>> metric2,
+ List<String> metricsToSkip) {
+ assertTrue("The two metrics have different or unequal number of table names ",
+ metric1.keySet().equals(metric2.keySet()));
+ for (Entry<String, Map<String, Long>> entry : metric1.entrySet()) {
+ Map<String, Long> metricNameValueMap1 = entry.getValue();
+ Map<String, Long> metricNameValueMap2 = metric2.get(entry.getKey());
+ assertMetricsHaveSameValues(metricNameValueMap1, metricNameValueMap2, metricsToSkip);
+ }
+ }
+
+ private void assertMetricsHaveSameValues(Map<String, Long> metricNameValueMap1,
+ Map<String, Long> metricNameValueMap2, List<String> metricsToSkip) {
+ assertTrue("The two metrics have different or unequal number of metric names ", metricNameValueMap1.keySet()
+ .equals(metricNameValueMap2.keySet()));
+ for (Entry<String, Long> entry : metricNameValueMap1.entrySet()) {
+ String metricName = entry.getKey();
+ if (!metricsToSkip.contains(metricName)) {
+ assertEquals("Unequal values for metric " + metricName, entry.getValue(),
+ metricNameValueMap2.get(metricName));
+ }
+ }
+ }
+
+ private void changeInternalStateForTesting(PhoenixResultSet rs) {
+ // get and set the internal state for testing purposes.
+ ReadMetricQueue testMetricsQueue = new TestReadMetricsQueue(true);
+ StatementContext ctx = (StatementContext)Whitebox.getInternalState(rs, "context");
+ Whitebox.setInternalState(ctx, "readMetricsQueue", testMetricsQueue);
+ Whitebox.setInternalState(rs, "readMetricsQueue", testMetricsQueue);
+ }
+
+ private void assertReadMetricValuesForSelectSql(ArrayList<Long> numRows, ArrayList<Long> numExpectedTasks,
+ PhoenixResultSet resultSetBeingTested, Set<String> expectedTableNames) throws SQLException {
+ Map<String, Map<String, Long>> metrics = PhoenixRuntime.getRequestReadMetrics(resultSetBeingTested);
+ int counter = 0;
+ for (Entry<String, Map<String, Long>> entry : metrics.entrySet()) {
+ String tableName = entry.getKey();
+ expectedTableNames.remove(tableName);
+ Map<String, Long> metricValues = entry.getValue();
+ boolean scanMetricsPresent = false;
+ boolean taskCounterMetricsPresent = false;
+ boolean taskExecutionTimeMetricsPresent = false;
+ boolean memoryMetricsPresent = false;
+ for (Entry<String, Long> pair : metricValues.entrySet()) {
+ String metricName = pair.getKey();
+ long metricValue = pair.getValue();
+ long n = numRows.get(counter);
+ long numTask = numExpectedTasks.get(counter);
+ if (metricName.equals(SCAN_BYTES.name())) {
+ // we are using a SCAN_BYTES_DELTA of 1. So number of scan bytes read should be number of rows read
+ assertEquals(n, metricValue);
+ scanMetricsPresent = true;
+ } else if (metricName.equals(TASK_EXECUTED_COUNTER.name())) {
+ assertEquals(numTask, metricValue);
+ taskCounterMetricsPresent = true;
+ } else if (metricName.equals(TASK_EXECUTION_TIME.name())) {
+ assertEquals(numTask * TASK_EXECUTION_TIME_DELTA, metricValue);
+ taskExecutionTimeMetricsPresent = true;
+ } else if (metricName.equals(MEMORY_CHUNK_BYTES.name())) {
+ assertEquals(numTask * MEMORY_CHUNK_BYTES_DELTA, metricValue);
+ memoryMetricsPresent = true;
+ }
+ }
+ counter++;
+ assertTrue(scanMetricsPresent);
+ assertTrue(taskCounterMetricsPresent);
+ assertTrue(taskExecutionTimeMetricsPresent);
+ assertTrue(memoryMetricsPresent);
+ }
+ PhoenixRuntime.resetMetrics(resultSetBeingTested);
+ assertTrue("Metrics not found tables " + Joiner.on(",").join(expectedTableNames),
+ expectedTableNames.size() == 0);
+ }
+
+ private Connection insertRowsInTable(String tableName, long numRows) throws SQLException {
+ String dml = "UPSERT INTO " + tableName + " VALUES (?, ?)";
+ Connection conn = DriverManager.getConnection(getUrl());
+ PreparedStatement stmt = conn.prepareStatement(dml);
+ for (int i = 1; i <= numRows; i++) {
+ stmt.setString(1, "key" + i);
+ stmt.setString(2, "value" + i);
+ stmt.executeUpdate();
+ }
+ conn.commit();
+ return conn;
+ }
+
+ // number of records read should be number of bytes at the end
+ public static final long SCAN_BYTES_DELTA = 1;
+
+ // total task execution time should be numTasks * TASK_EXECUTION_TIME_DELTA
+ public static final long TASK_EXECUTION_TIME_DELTA = 10;
+
+ // total task execution time should be numTasks * TASK_EXECUTION_TIME_DELTA
+ public static final long MEMORY_CHUNK_BYTES_DELTA = 100;
+
+ private class TestReadMetricsQueue extends ReadMetricQueue {
+
+ public TestReadMetricsQueue(boolean isRequestMetricsEnabled) {
+ super(isRequestMetricsEnabled);
+ }
+
+ @Override
+ public CombinableMetric getMetric(MetricType type) {
+ switch (type) {
+ case SCAN_BYTES:
+ return new CombinableMetricImpl(type) {
+
+ @Override
+ public void change(long delta) {
+ super.change(SCAN_BYTES_DELTA);
+ }
+ };
+ case TASK_EXECUTION_TIME:
+ return new CombinableMetricImpl(type) {
+
+ @Override
+ public void change(long delta) {
+ super.change(TASK_EXECUTION_TIME_DELTA);
+ }
+ };
+ case MEMORY_CHUNK_BYTES:
+ return new CombinableMetricImpl(type) {
+
+ @Override
+ public void change(long delta) {
+ super.change(MEMORY_CHUNK_BYTES_DELTA);
+ }
+ };
+ }
+ return super.getMetric(type);
+ }
+ }
+
+ private void assertReadMetricsForMutatingSql(String tableName, long tableSaltBuckets,
+ Map<String, Map<String, Long>> readMetrics) {
+ assertTrue("No read metrics present when there should have been!", readMetrics.size() > 0);
+ int numTables = 0;
+ for (Entry<String, Map<String, Long>> entry : readMetrics.entrySet()) {
+ String t = entry.getKey();
+ assertEquals("Table name didn't match for read metrics", tableName, t);
+ numTables++;
+ Map<String, Long> p = entry.getValue();
+ assertTrue("No read metrics present when there should have been", p.size() > 0);
+ for (Entry<String, Long> metric : p.entrySet()) {
+ String metricName = metric.getKey();
+ long metricValue = metric.getValue();
+ if (metricName.equals(TASK_EXECUTED_COUNTER.name())) {
+ assertEquals(tableSaltBuckets, metricValue);
+ } else if (metricName.equals(SCAN_BYTES.name())) {
+ assertTrue("Scan bytes read should be greater than zero", metricValue > 0);
+ }
+ }
+ }
+ assertEquals("There should have been read metrics only for one table: " + tableName, 1, numTables);
+ }
+
+ private void assertMutationMetrics(String tableName, int numRows, Map<String, Map<String, Long>> mutationMetrics) {
+ assertTrue("No mutation metrics present when there should have been", mutationMetrics.size() > 0);
+ for (Entry<String, Map<String, Long>> entry : mutationMetrics.entrySet()) {
+ String t = entry.getKey();
+ assertEquals("Table name didn't match for mutation metrics", tableName, t);
+ Map<String, Long> p = entry.getValue();
+ assertEquals("There should have been three metrics", 3, p.size());
+ for (Entry<String, Long> metric : p.entrySet()) {
+ String metricName = metric.getKey();
+ long metricValue = metric.getValue();
+ if (metricName.equals(MetricType.MUTATION_BATCH_SIZE.name())) {
+ assertEquals("Mutation batch sizes didn't match!", numRows, metricValue);
+ } else if (metricName.equals(MetricType.MUTATION_COMMIT_TIME.name())) {
+ assertTrue("Mutation commit time should be greater than zero", metricValue > 0);
+ } else if (metricName.equals(MetricType.MUTATION_BYTES.name())) {
+ assertTrue("Mutation bytes size should be greater than zero", metricValue > 0);
+ }
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
index 9718709..9ad9ef5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
@@ -17,6 +17,7 @@
*/
package org.apache.phoenix.cache;
+import static org.apache.phoenix.monitoring.TaskExecutionMetricsHolder.NO_OP_INSTANCE;
import static org.apache.phoenix.util.LogUtil.addCustomAnnotations;
import java.io.Closeable;
@@ -57,6 +58,7 @@ import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.ServerCachin
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.job.JobManager.JobCallable;
import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
@@ -226,6 +228,11 @@ public class ServerCacheClient {
public Object getJobId() {
return ServerCacheClient.this;
}
+
+ @Override
+ public TaskExecutionMetricsHolder getTaskExecutionMetric() {
+ return NO_OP_INSTANCE;
+ }
}));
} else {
if (LOG.isDebugEnabled()) {LOG.debug(addCustomAnnotations("NOT adding cache entry to be sent for " + entry + " since one already exists for that entry", connection));}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 575f0f3..a28f614 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -94,8 +94,9 @@ public class DeleteCompiler {
this.statement = statement;
}
- private static MutationState deleteRows(PhoenixStatement statement, TableRef targetTableRef, TableRef indexTableRef, ResultIterator iterator, RowProjector projector, TableRef sourceTableRef) throws SQLException {
+ private static MutationState deleteRows(StatementContext childContext, TableRef targetTableRef, TableRef indexTableRef, ResultIterator iterator, RowProjector projector, TableRef sourceTableRef) throws SQLException {
PTable table = targetTableRef.getTable();
+ PhoenixStatement statement = childContext.getStatement();
PhoenixConnection connection = statement.getConnection();
PName tenantId = connection.getTenantId();
byte[] tenantIdBytes = null;
@@ -114,19 +115,18 @@ public class DeleteCompiler {
if (indexTableRef != null) {
indexMutations = Maps.newHashMapWithExpectedSize(batchSize);
}
- try {
- List<PColumn> pkColumns = table.getPKColumns();
- boolean isMultiTenant = table.isMultiTenant() && tenantIdBytes != null;
- boolean isSharedViewIndex = table.getViewIndexId() != null;
- int offset = (table.getBucketNum() == null ? 0 : 1);
- byte[][] values = new byte[pkColumns.size()][];
- if (isMultiTenant) {
- values[offset++] = tenantIdBytes;
- }
- if (isSharedViewIndex) {
- values[offset++] = MetaDataUtil.getViewIndexIdDataType().toBytes(table.getViewIndexId());
- }
- PhoenixResultSet rs = new PhoenixResultSet(iterator, projector, statement);
+ List<PColumn> pkColumns = table.getPKColumns();
+ boolean isMultiTenant = table.isMultiTenant() && tenantIdBytes != null;
+ boolean isSharedViewIndex = table.getViewIndexId() != null;
+ int offset = (table.getBucketNum() == null ? 0 : 1);
+ byte[][] values = new byte[pkColumns.size()][];
+ if (isMultiTenant) {
+ values[offset++] = tenantIdBytes;
+ }
+ if (isSharedViewIndex) {
+ values[offset++] = MetaDataUtil.getViewIndexIdDataType().toBytes(table.getViewIndexId());
+ }
+ try (PhoenixResultSet rs = new PhoenixResultSet(iterator, projector, childContext)) {
int rowCount = 0;
while (rs.next()) {
ImmutableBytesPtr ptr = new ImmutableBytesPtr(); // allocate new as this is a key in a Map
@@ -183,8 +183,6 @@ public class DeleteCompiler {
state.join(indexState);
}
return state;
- } finally {
- iterator.close();
}
}
@@ -199,9 +197,16 @@ public class DeleteCompiler {
}
@Override
- protected MutationState mutate(StatementContext context, ResultIterator iterator, PhoenixConnection connection) throws SQLException {
+ protected MutationState mutate(StatementContext parentContext, ResultIterator iterator, PhoenixConnection connection) throws SQLException {
PhoenixStatement statement = new PhoenixStatement(connection);
- return deleteRows(statement, targetTableRef, indexTableRef, iterator, projector, sourceTableRef);
+ /*
+ * We don't want to collect any read metrics within the child context. This is because any read metrics that
+ * need to be captured are already getting collected in the parent statement context enclosed in the result
+ * iterator being used for reading rows out.
+ */
+ StatementContext ctx = new StatementContext(statement, false);
+ MutationState state = deleteRows(ctx, targetTableRef, indexTableRef, iterator, projector, sourceTableRef);
+ return state;
}
public void setTargetTableRef(TableRef tableRef) {
@@ -559,9 +564,14 @@ public class DeleteCompiler {
}
// Return total number of rows that have been delete. In the case of auto commit being off
// the mutations will all be in the mutation state of the current connection.
- return new MutationState(maxSize, connection, totalRowCount);
+ MutationState state = new MutationState(maxSize, connection, totalRowCount);
+
+ // set the read metrics accumulated in the parent context so that it can be published when the mutations are committed.
+ state.setReadMetricQueue(plan.getContext().getReadMetricsQueue());
+
+ return state;
} else {
- return deleteRows(statement, tableRef, deleteFromImmutableIndexToo ? plan.getTableRef() : null, iterator, plan.getProjector(), plan.getTableRef());
+ return deleteRows(plan.getContext(), tableRef, deleteFromImmutableIndexToo ? plan.getTableRef() : null, iterator, plan.getProjector(), plan.getTableRef());
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
index bcac17d..630760c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
@@ -35,9 +35,9 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.util.KeyValueUtil;
/**
@@ -53,21 +53,34 @@ public abstract class MutatingParallelIteratorFactory implements ParallelIterato
/**
* Method that does the actual mutation work
*/
- abstract protected MutationState mutate(StatementContext context, ResultIterator iterator, PhoenixConnection connection) throws SQLException;
+ abstract protected MutationState mutate(StatementContext parentContext, ResultIterator iterator, PhoenixConnection connection) throws SQLException;
@Override
- public PeekingResultIterator newIterator(StatementContext context, ResultIterator iterator, Scan scan) throws SQLException {
- final PhoenixConnection connection = new PhoenixConnection(this.connection);
- MutationState state = mutate(context, iterator, connection);
+ public PeekingResultIterator newIterator(final StatementContext parentContext, ResultIterator iterator, Scan scan, String tableName) throws SQLException {
+ final PhoenixConnection clonedConnection = new PhoenixConnection(this.connection);
+
+ MutationState state = mutate(parentContext, iterator, clonedConnection);
+
long totalRowCount = state.getUpdateCount();
- if (connection.getAutoCommit()) {
- connection.getMutationState().join(state);
- connection.commit();
- ConnectionQueryServices services = connection.getQueryServices();
- int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
- state = new MutationState(maxSize, connection, totalRowCount);
+ if (clonedConnection.getAutoCommit()) {
+ clonedConnection.getMutationState().join(state);
+ clonedConnection.commit();
+ ConnectionQueryServices services = clonedConnection.getQueryServices();
+ int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
+ /*
+ * Everything that was mutated as part of the clonedConnection has been committed. However, we want to
+ * report the mutation work done using this clonedConnection as part of the overall mutation work of the
+ * parent connection. So we need to set those metrics in the empty mutation state so that they could be
+ * combined with the parent connection's mutation metrics (as part of combining mutation state) in the
+ * close() method of the iterator being returned. Don't combine the read metrics in parent context yet
+ * though because they are possibly being concurrently modified by other threads at this stage. Instead we
+ * will get hold of the read metrics when all the mutating iterators are done.
+ */
+ state = MutationState.emptyMutationState(maxSize, clonedConnection);
+ state.getMutationMetricQueue().combineMetricQueues(clonedConnection.getMutationState().getMutationMetricQueue());
}
final MutationState finalState = state;
+
byte[] value = PLong.INSTANCE.toBytes(totalRowCount);
KeyValue keyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length);
final Tuple tuple = new SingleKeyValueTuple(keyValue);
@@ -90,13 +103,17 @@ public abstract class MutatingParallelIteratorFactory implements ParallelIterato
@Override
public void close() throws SQLException {
try {
- // Join the child mutation states in close, since this is called in a single threaded manner
- // after the parallel results have been processed.
- if (!connection.getAutoCommit()) {
- MutatingParallelIteratorFactory.this.connection.getMutationState().join(finalState);
- }
+ /*
+ * Join the child mutation states in close, since this is called in a single threaded manner
+ * after the parallel results have been processed.
+ * If auto-commit is on for the cloned child connection, then the finalState here is an empty mutation
+ * state (with no mutations). However, it still has the metrics for mutation work done by the
+ * mutating-iterator. Joining the mutation state makes sure those metrics are passed over
+ * to the parent connection.
+ */
+ MutatingParallelIteratorFactory.this.connection.getMutationState().join(finalState);
} finally {
- connection.close();
+ clonedConnection.close();
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
index d726488..52bb7f2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
@@ -32,6 +32,8 @@ import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.monitoring.OverAllQueryMetrics;
+import org.apache.phoenix.monitoring.ReadMetricQueue;
import org.apache.phoenix.parse.SelectStatement;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
@@ -41,6 +43,7 @@ import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.util.DateUtil;
import org.apache.phoenix.util.NumberUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
import com.google.common.collect.Maps;
@@ -80,10 +83,19 @@ public class StatementContext {
private TimeRange scanTimeRange = null;
private Map<SelectStatement, Object> subqueryResults;
-
+ private final ReadMetricQueue readMetricsQueue;
+ private final OverAllQueryMetrics overAllQueryMetrics;
+
public StatementContext(PhoenixStatement statement) {
this(statement, new Scan());
}
+
+ /**
+ * Constructor that lets you override whether or not to collect request level metrics.
+ */
+ public StatementContext(PhoenixStatement statement, boolean collectRequestLevelMetrics) {
+ this(statement, FromCompiler.EMPTY_TABLE_RESOLVER, new Scan(), new SequenceManager(statement), collectRequestLevelMetrics);
+ }
public StatementContext(PhoenixStatement statement, Scan scan) {
this(statement, FromCompiler.EMPTY_TABLE_RESOLVER, new Scan(), new SequenceManager(statement));
@@ -94,6 +106,10 @@ public class StatementContext {
}
public StatementContext(PhoenixStatement statement, ColumnResolver resolver, Scan scan, SequenceManager seqManager) {
+ this(statement, resolver, scan, seqManager, statement.getConnection().isRequestLevelMetricsEnabled());
+ }
+
+ public StatementContext(PhoenixStatement statement, ColumnResolver resolver, Scan scan, SequenceManager seqManager, boolean isRequestMetricsEnabled) {
this.statement = statement;
this.resolver = resolver;
this.scan = scan;
@@ -102,20 +118,24 @@ public class StatementContext {
this.aggregates = new AggregationManager();
this.expressions = new ExpressionManager();
PhoenixConnection connection = statement.getConnection();
- this.dateFormat = connection.getQueryServices().getProps().get(QueryServices.DATE_FORMAT_ATTRIB, DateUtil.DEFAULT_DATE_FORMAT);
+ ReadOnlyProps props = connection.getQueryServices().getProps();
+ this.dateFormat = props.get(QueryServices.DATE_FORMAT_ATTRIB, DateUtil.DEFAULT_DATE_FORMAT);
this.dateFormatter = DateUtil.getDateFormatter(dateFormat);
- this.timeFormat = connection.getQueryServices().getProps().get(QueryServices.TIME_FORMAT_ATTRIB, DateUtil.DEFAULT_TIME_FORMAT);
+ this.timeFormat = props.get(QueryServices.TIME_FORMAT_ATTRIB, DateUtil.DEFAULT_TIME_FORMAT);
this.timeFormatter = DateUtil.getTimeFormatter(timeFormat);
- this.timestampFormat = connection.getQueryServices().getProps().get(QueryServices.TIMESTAMP_FORMAT_ATTRIB, DateUtil.DEFAULT_TIMESTAMP_FORMAT);
+ this.timestampFormat = props.get(QueryServices.TIMESTAMP_FORMAT_ATTRIB, DateUtil.DEFAULT_TIMESTAMP_FORMAT);
this.timestampFormatter = DateUtil.getTimestampFormatter(timestampFormat);
- this.dateFormatTimeZone = TimeZone.getTimeZone(
- connection.getQueryServices().getProps().get(QueryServices.DATE_FORMAT_TIMEZONE_ATTRIB, DateUtil.DEFAULT_TIME_ZONE_ID));
- this.numberFormat = connection.getQueryServices().getProps().get(QueryServices.NUMBER_FORMAT_ATTRIB, NumberUtil.DEFAULT_NUMBER_FORMAT);
+ this.dateFormatTimeZone = TimeZone.getTimeZone(props.get(QueryServices.DATE_FORMAT_TIMEZONE_ATTRIB,
+ DateUtil.DEFAULT_TIME_ZONE_ID));
+ this.numberFormat = props.get(QueryServices.NUMBER_FORMAT_ATTRIB, NumberUtil.DEFAULT_NUMBER_FORMAT);
this.tempPtr = new ImmutableBytesWritable();
this.currentTable = resolver != null && !resolver.getTables().isEmpty() ? resolver.getTables().get(0) : null;
- this.whereConditionColumns = new ArrayList<Pair<byte[],byte[]>>();
- this.dataColumns = this.currentTable == null ? Collections.<PColumn, Integer>emptyMap() : Maps.<PColumn, Integer>newLinkedHashMap();
- this.subqueryResults = Maps.<SelectStatement, Object>newHashMap();
+ this.whereConditionColumns = new ArrayList<Pair<byte[], byte[]>>();
+ this.dataColumns = this.currentTable == null ? Collections.<PColumn, Integer> emptyMap() : Maps
+ .<PColumn, Integer> newLinkedHashMap();
+ this.subqueryResults = Maps.<SelectStatement, Object> newHashMap();
+ this.readMetricsQueue = new ReadMetricQueue(isRequestMetricsEnabled);
+ this.overAllQueryMetrics = new OverAllQueryMetrics(isRequestMetricsEnabled);
}
/**
@@ -285,4 +305,13 @@ public class StatementContext {
public void setSubqueryResult(SelectStatement select, Object result) {
subqueryResults.put(select, result);
}
+
+ public ReadMetricQueue getReadMetricsQueue() {
+ return readMetricsQueue;
+ }
+
+ public OverAllQueryMetrics getOverallQueryMetrics() {
+ return overAllQueryMetrics;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 2b35d4f..7b39a28 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -118,43 +118,40 @@ public class UpsertCompiler {
mutation.put(ptr, new RowMutationState(columnValues, statement.getConnection().getStatementExecutionCounter()));
}
- private static MutationState upsertSelect(PhoenixStatement statement,
- TableRef tableRef, RowProjector projector, ResultIterator iterator, int[] columnIndexes,
- int[] pkSlotIndexes) throws SQLException {
- try {
- PhoenixConnection connection = statement.getConnection();
- ConnectionQueryServices services = connection.getQueryServices();
- int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
- int batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
- boolean isAutoCommit = connection.getAutoCommit();
- byte[][] values = new byte[columnIndexes.length][];
- int rowCount = 0;
- Map<ImmutableBytesPtr,RowMutationState> mutation = Maps.newHashMapWithExpectedSize(batchSize);
- PTable table = tableRef.getTable();
- ResultSet rs = new PhoenixResultSet(iterator, projector, statement);
+ private static MutationState upsertSelect(StatementContext childContext, TableRef tableRef, RowProjector projector,
+ ResultIterator iterator, int[] columnIndexes, int[] pkSlotIndexes) throws SQLException {
+ PhoenixStatement statement = childContext.getStatement();
+ PhoenixConnection connection = statement.getConnection();
+ ConnectionQueryServices services = connection.getQueryServices();
+ int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,
+ QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
+ int batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
+ boolean isAutoCommit = connection.getAutoCommit();
+ byte[][] values = new byte[columnIndexes.length][];
+ int rowCount = 0;
+ Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(batchSize);
+ PTable table = tableRef.getTable();
+ try (ResultSet rs = new PhoenixResultSet(iterator, projector, childContext)) {
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
while (rs.next()) {
for (int i = 0; i < values.length; i++) {
PColumn column = table.getColumns().get(columnIndexes[i]);
- byte[] bytes = rs.getBytes(i+1);
+ byte[] bytes = rs.getBytes(i + 1);
ptr.set(bytes == null ? ByteUtil.EMPTY_BYTE_ARRAY : bytes);
- Object value = rs.getObject(i+1);
- int rsPrecision = rs.getMetaData().getPrecision(i+1);
+ Object value = rs.getObject(i + 1);
+ int rsPrecision = rs.getMetaData().getPrecision(i + 1);
Integer precision = rsPrecision == 0 ? null : rsPrecision;
- int rsScale = rs.getMetaData().getScale(i+1);
+ int rsScale = rs.getMetaData().getScale(i + 1);
Integer scale = rsScale == 0 ? null : rsScale;
// We are guaranteed that the two column will have compatible types,
// as we checked that before.
- if (!column.getDataType().isSizeCompatible(ptr, value, column.getDataType(),
- precision, scale,
- column.getMaxLength(),column.getScale())) {
- throw new SQLExceptionInfo.Builder(SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY)
- .setColumnName(column.getName().getString())
- .setMessage("value=" + column.getDataType().toStringLiteral(ptr, null)).build().buildException();
- }
- column.getDataType().coerceBytes(ptr, value, column.getDataType(),
- precision, scale, SortOrder.getDefault(),
- column.getMaxLength(), column.getScale(), column.getSortOrder());
+ if (!column.getDataType().isSizeCompatible(ptr, value, column.getDataType(), precision, scale,
+ column.getMaxLength(), column.getScale())) { throw new SQLExceptionInfo.Builder(
+ SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY).setColumnName(column.getName().getString())
+ .setMessage("value=" + column.getDataType().toStringLiteral(ptr, null)).build()
+ .buildException(); }
+ column.getDataType().coerceBytes(ptr, value, column.getDataType(), precision, scale,
+ SortOrder.getDefault(), column.getMaxLength(), column.getScale(), column.getSortOrder());
values[i] = ByteUtil.copyKeyBytesIfNecessary(ptr);
}
setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement);
@@ -169,8 +166,6 @@ public class UpsertCompiler {
}
// If auto commit is true, this last batch will be committed upon return
return new MutationState(tableRef, mutation, rowCount / batchSize * batchSize, maxSize, connection);
- } finally {
- iterator.close();
}
}
@@ -186,14 +181,21 @@ public class UpsertCompiler {
}
@Override
- protected MutationState mutate(StatementContext context, ResultIterator iterator, PhoenixConnection connection) throws SQLException {
- if (context.getSequenceManager().getSequenceCount() > 0) {
+ protected MutationState mutate(StatementContext parentContext, ResultIterator iterator, PhoenixConnection connection) throws SQLException {
+ if (parentContext.getSequenceManager().getSequenceCount() > 0) {
throw new IllegalStateException("Cannot pipeline upsert when sequence is referenced");
}
PhoenixStatement statement = new PhoenixStatement(connection);
+ /*
+ * We don't want to collect any read metrics within the child context. This is because any read metrics that
+ * need to be captured are already getting collected in the parent statement context enclosed in the result
+ * iterator being used for reading rows out.
+ */
+ StatementContext childContext = new StatementContext(statement, false);
// Clone the row projector as it's not thread safe and would be used simultaneously by
// multiple threads otherwise.
- return upsertSelect(statement, tableRef, projector.cloneIfNecessary(), iterator, columnIndexes, pkSlotIndexes);
+ MutationState state = upsertSelect(childContext, tableRef, projector.cloneIfNecessary(), iterator, columnIndexes, pkSlotIndexes);
+ return state;
}
public void setRowProjector(RowProjector projector) {
@@ -669,7 +671,7 @@ public class UpsertCompiler {
public MutationState execute() throws SQLException {
ResultIterator iterator = queryPlan.iterator();
if (parallelIteratorFactory == null) {
- return upsertSelect(statement, tableRef, projector, iterator, columnIndexes, pkSlotIndexes);
+ return upsertSelect(new StatementContext(statement), tableRef, projector, iterator, columnIndexes, pkSlotIndexes);
}
try {
parallelIteratorFactory.setRowProjector(projector);
@@ -677,13 +679,21 @@ public class UpsertCompiler {
parallelIteratorFactory.setPkSlotIndexes(pkSlotIndexes);
Tuple tuple;
long totalRowCount = 0;
+ StatementContext context = queryPlan.getContext();
while ((tuple=iterator.next()) != null) {// Runs query
Cell kv = tuple.getValue(0);
totalRowCount += PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(), kv.getValueOffset(), SortOrder.getDefault());
}
// Return total number of rows that have been updated. In the case of auto commit being off
// the mutations will all be in the mutation state of the current connection.
- return new MutationState(maxSize, statement.getConnection(), totalRowCount);
+ MutationState mutationState = new MutationState(maxSize, statement.getConnection(), totalRowCount);
+ /*
+ * All the metrics collected for measuring the reads done by the parallel mutating iterators
+ * is included in the ReadMetricHolder of the statement context. Include these metrics in the
+ * returned mutation state so they can be published on commit.
+ */
+ mutationState.setReadMetricQueue(context.getReadMetricsQueue());
+ return mutationState;
} finally {
iterator.close();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index ba137f8..00e843d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -102,7 +102,7 @@ public class AggregatePlan extends BaseQueryPlan {
this.services = services;
}
@Override
- public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException {
+ public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String tableName) throws SQLException {
Expression expression = RowKeyExpression.INSTANCE;
OrderByExpression orderByExpression = new OrderByExpression(expression, false, true);
int threshold = services.getProps().getInt(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
@@ -119,9 +119,9 @@ public class AggregatePlan extends BaseQueryPlan {
this.outerFactory = outerFactory;
}
@Override
- public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException {
- PeekingResultIterator iterator = innerFactory.newIterator(context, scanner, scan);
- return outerFactory.newIterator(context, iterator, scan);
+ public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String tableName) throws SQLException {
+ PeekingResultIterator iterator = innerFactory.newIterator(context, scanner, scan, tableName);
+ return outerFactory.newIterator(context, iterator, scan, tableName);
}
}
[3/4] phoenix git commit: PHOENIX-1819 Build a framework to capture
and report phoenix client side request level metrics
Posted by sa...@apache.org.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index 857a952..57fa25a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -17,6 +17,7 @@
*/
package org.apache.phoenix.execute;
+import static org.apache.phoenix.monitoring.TaskExecutionMetricsHolder.NO_OP_INSTANCE;
import static org.apache.phoenix.util.LogUtil.addCustomAnnotations;
import java.sql.SQLException;
@@ -54,6 +55,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.job.JobManager.JobCallable;
import org.apache.phoenix.join.HashCacheClient;
import org.apache.phoenix.join.HashJoinInfo;
+import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.parse.ParseNode;
import org.apache.phoenix.parse.SQLParser;
@@ -140,6 +142,11 @@ public class HashJoinPlan extends DelegateQueryPlan {
public Object getJobId() {
return HashJoinPlan.this;
}
+
+ @Override
+ public TaskExecutionMetricsHolder getTaskExecutionMetric() {
+ return NO_OP_INSTANCE;
+ }
}));
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 9ffa135..0de7aa3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -17,6 +17,10 @@
*/
package org.apache.phoenix.execute;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BATCH_SIZE;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BYTES;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_COMMIT_TIME;
+
import java.io.IOException;
import java.sql.SQLException;
import java.util.Arrays;
@@ -39,7 +43,11 @@ import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.index.IndexMetaDataCacheClient;
import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.monitoring.PhoenixMetrics;
+import org.apache.phoenix.monitoring.GlobalClientMetrics;
+import org.apache.phoenix.monitoring.MutationMetricQueue;
+import org.apache.phoenix.monitoring.MutationMetricQueue.MutationMetric;
+import org.apache.phoenix.monitoring.MutationMetricQueue.NoOpMutationMetricsQueue;
+import org.apache.phoenix.monitoring.ReadMetricQueue;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.IllegalDataException;
import org.apache.phoenix.schema.MetaDataClient;
@@ -65,9 +73,6 @@ import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.sun.istack.NotNull;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MUTATION_BYTES;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MUTATION_BATCH_SIZE;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MUTATION_COMMIT_TIME;
/**
*
@@ -85,11 +90,17 @@ public class MutationState implements SQLCloseable {
private final Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations;
private long sizeOffset;
private int numRows = 0;
+ private final MutationMetricQueue mutationMetricQueue;
+ private ReadMetricQueue readMetricQueue;
- MutationState(long maxSize, PhoenixConnection connection, Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations) {
+ MutationState(long maxSize, PhoenixConnection connection,
+ Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations) {
this.maxSize = maxSize;
this.connection = connection;
this.mutations = mutations;
+ boolean isMetricsEnabled = connection.isRequestLevelMetricsEnabled();
+ this.mutationMetricQueue = isMetricsEnabled ? new MutationMetricQueue()
+ : NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE;
}
public MutationState(long maxSize, PhoenixConnection connection) {
@@ -108,6 +119,12 @@ public class MutationState implements SQLCloseable {
throwIfTooBig();
}
+ public static MutationState emptyMutationState(long maxSize, PhoenixConnection connection) {
+ MutationState state = new MutationState(maxSize, connection, Collections.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>emptyMap());
+ state.sizeOffset = 0;
+ return state;
+ }
+
private void throwIfTooBig() {
if (numRows > maxSize) {
// TODO: throw SQLException ?
@@ -120,17 +137,18 @@ public class MutationState implements SQLCloseable {
}
/**
- * Combine a newer mutation with this one, where in the event of overlaps,
- * the newer one will take precedence.
- * @param newMutation the newer mutation
+ * Combine a newer mutation with this one, where in the event of overlaps, the newer one will take precedence.
+ * Combine any metrics collected for the newer mutation.
+ *
+ * @param newMutationState the newer mutation state
*/
- public void join(MutationState newMutation) {
- if (this == newMutation) { // Doesn't make sense
+ public void join(MutationState newMutationState) {
+ if (this == newMutationState) { // Doesn't make sense
return;
}
- this.sizeOffset += newMutation.sizeOffset;
+ this.sizeOffset += newMutationState.sizeOffset;
// Merge newMutation with this one, keeping state from newMutation for any overlaps
- for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : newMutation.mutations.entrySet()) {
+ for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : newMutationState.mutations.entrySet()) {
// Replace existing entries for the table with new entries
TableRef tableRef = entry.getKey();
PTable table = tableRef.getTable();
@@ -168,6 +186,12 @@ public class MutationState implements SQLCloseable {
}
}
}
+ mutationMetricQueue.combineMetricQueues(newMutationState.mutationMetricQueue);
+ if (readMetricQueue == null) {
+ readMetricQueue = newMutationState.readMetricQueue;
+ } else if (readMetricQueue != null && newMutationState.readMetricQueue != null) {
+ readMetricQueue.combineReadMetrics(newMutationState.readMetricQueue);
+ }
throwIfTooBig();
}
@@ -332,18 +356,15 @@ public class MutationState implements SQLCloseable {
return timeStamps;
}
- private static void logMutationSize(HTableInterface htable, List<Mutation> mutations, PhoenixConnection connection) {
+ private static long calculateMutationSize(List<Mutation> mutations) {
long byteSize = 0;
- int keyValueCount = 0;
- if (PhoenixMetrics.isMetricsEnabled() || logger.isDebugEnabled()) {
+ if (GlobalClientMetrics.isMetricsEnabled()) {
for (Mutation mutation : mutations) {
byteSize += mutation.heapSize();
}
- MUTATION_BYTES.update(byteSize);
- if (logger.isDebugEnabled()) {
- logger.debug(LogUtil.addCustomAnnotations("Sending " + mutations.size() + " mutations for " + Bytes.toString(htable.getTableName()) + " with " + keyValueCount + " key values of total size " + byteSize + " bytes", connection));
- }
}
+ GLOBAL_MUTATION_BYTES.update(byteSize);
+ return byteSize;
}
@SuppressWarnings("deprecation")
@@ -352,126 +373,134 @@ public class MutationState implements SQLCloseable {
byte[] tenantId = connection.getTenantId() == null ? null : connection.getTenantId().getBytes();
long[] serverTimeStamps = validate();
Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>>> iterator = this.mutations.entrySet().iterator();
-
// add tracing for this operation
- TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables");
- Span span = trace.getSpan();
- while (iterator.hasNext()) {
- Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry = iterator.next();
- Map<ImmutableBytesPtr,RowMutationState> valuesMap = entry.getValue();
- TableRef tableRef = entry.getKey();
- PTable table = tableRef.getTable();
- table.getIndexMaintainers(tempPtr, connection);
- boolean hasIndexMaintainers = tempPtr.getLength() > 0;
- boolean isDataTable = true;
- long serverTimestamp = serverTimeStamps[i++];
- Iterator<Pair<byte[],List<Mutation>>> mutationsIterator = addRowMutations(tableRef, valuesMap, serverTimestamp, false);
- while (mutationsIterator.hasNext()) {
- Pair<byte[],List<Mutation>> pair = mutationsIterator.next();
- byte[] htableName = pair.getFirst();
- List<Mutation> mutations = pair.getSecond();
-
- //create a span per target table
- //TODO maybe we can be smarter about the table name to string here?
- Span child = Tracing.child(span,"Writing mutation batch for table: "+Bytes.toString(htableName));
+ try (TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables")) {
+ Span span = trace.getSpan();
+ while (iterator.hasNext()) {
+ Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry = iterator.next();
+ // at this point we are going through mutations for each table
- int retryCount = 0;
- boolean shouldRetry = false;
- do {
- ServerCache cache = null;
- if (hasIndexMaintainers && isDataTable) {
- byte[] attribValue = null;
- byte[] uuidValue;
- if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, tempPtr.getLength())) {
- IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef);
- cache = client.addIndexMetadataCache(mutations, tempPtr);
- child.addTimelineAnnotation("Updated index metadata cache");
- uuidValue = cache.getId();
- // If we haven't retried yet, retry for this case only, as it's possible that
- // a split will occur after we send the index metadata cache to all known
- // region servers.
- shouldRetry = true;
- } else {
- attribValue = ByteUtil.copyKeyBytesIfNecessary(tempPtr);
- uuidValue = ServerCacheClient.generateId();
- }
- // Either set the UUID to be able to access the index metadata from the cache
- // or set the index metadata directly on the Mutation
- for (Mutation mutation : mutations) {
- if (tenantId != null) {
- mutation.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
- }
- mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
- if (attribValue != null) {
- mutation.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
- }
- }
- }
-
- SQLException sqlE = null;
- HTableInterface hTable = connection.getQueryServices().getTable(htableName);
- try {
- logMutationSize(hTable, mutations, connection);
- MUTATION_BATCH_SIZE.update(mutations.size());
- long startTime = System.currentTimeMillis();
- child.addTimelineAnnotation("Attempt " + retryCount);
- hTable.batch(mutations);
- child.stop();
- long duration = System.currentTimeMillis() - startTime;
- MUTATION_COMMIT_TIME.update(duration);
- shouldRetry = false;
- if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("Total time for batch call of " + mutations.size() + " mutations into " + table.getName().getString() + ": " + duration + " ms", connection));
- } catch (Exception e) {
- SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e);
- if (inferredE != null) {
- if (shouldRetry && retryCount == 0 && inferredE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) {
- // Swallow this exception once, as it's possible that we split after sending the index metadata
- // and one of the region servers doesn't have it. This will cause it to have it the next go around.
- // If it fails again, we don't retry.
- String msg = "Swallowing exception and retrying after clearing meta cache on connection. " + inferredE;
- logger.warn(LogUtil.addCustomAnnotations(msg, connection));
- connection.getQueryServices().clearTableRegionCache(htableName);
+ Map<ImmutableBytesPtr,RowMutationState> valuesMap = entry.getValue();
+ // above is mutations for a table where the first part is the row key and the second part is column values.
- // add a new child span as this one failed
- child.addTimelineAnnotation(msg);
- child.stop();
- child = Tracing.child(span,"Failed batch, attempting retry");
+ TableRef tableRef = entry.getKey();
+ PTable table = tableRef.getTable();
+ table.getIndexMaintainers(tempPtr, connection);
+ boolean hasIndexMaintainers = tempPtr.getLength() > 0;
+ boolean isDataTable = true;
+ long serverTimestamp = serverTimeStamps[i++];
+ Iterator<Pair<byte[],List<Mutation>>> mutationsIterator = addRowMutations(tableRef, valuesMap, serverTimestamp, false);
+ // above returns an iterator of pair where the first
+ while (mutationsIterator.hasNext()) {
+ Pair<byte[],List<Mutation>> pair = mutationsIterator.next();
+ byte[] htableName = pair.getFirst();
+ List<Mutation> mutations = pair.getSecond();
- continue;
+ //create a span per target table
+ //TODO maybe we can be smarter about the table name to string here?
+ Span child = Tracing.child(span,"Writing mutation batch for table: "+Bytes.toString(htableName));
+
+ int retryCount = 0;
+ boolean shouldRetry = false;
+ do {
+ ServerCache cache = null;
+ if (hasIndexMaintainers && isDataTable) {
+ byte[] attribValue = null;
+ byte[] uuidValue;
+ if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, tempPtr.getLength())) {
+ IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef);
+ cache = client.addIndexMetadataCache(mutations, tempPtr);
+ child.addTimelineAnnotation("Updated index metadata cache");
+ uuidValue = cache.getId();
+ // If we haven't retried yet, retry for this case only, as it's possible that
+ // a split will occur after we send the index metadata cache to all known
+ // region servers.
+ shouldRetry = true;
+ } else {
+ attribValue = ByteUtil.copyKeyBytesIfNecessary(tempPtr);
+ uuidValue = ServerCacheClient.generateId();
+ }
+ // Either set the UUID to be able to access the index metadata from the cache
+ // or set the index metadata directly on the Mutation
+ for (Mutation mutation : mutations) {
+ if (tenantId != null) {
+ mutation.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+ }
+ mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+ if (attribValue != null) {
+ mutation.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
+ }
}
- e = inferredE;
}
- sqlE = new CommitException(e, getUncommittedSattementIndexes());
- } finally {
+
+ SQLException sqlE = null;
+ HTableInterface hTable = connection.getQueryServices().getTable(htableName);
try {
- hTable.close();
- } catch (IOException e) {
- if (sqlE != null) {
- sqlE.setNextException(ServerUtil.parseServerException(e));
- } else {
- sqlE = ServerUtil.parseServerException(e);
+ long numMutations = mutations.size();
+ GLOBAL_MUTATION_BATCH_SIZE.update(numMutations);
+
+ long startTime = System.currentTimeMillis();
+ child.addTimelineAnnotation("Attempt " + retryCount);
+ hTable.batch(mutations);
+ child.stop();
+ shouldRetry = false;
+ long mutationCommitTime = System.currentTimeMillis() - startTime;
+ GLOBAL_MUTATION_COMMIT_TIME.update(mutationCommitTime);
+
+ long mutationSizeBytes = calculateMutationSize(mutations);
+ MutationMetric mutationsMetric = new MutationMetric(numMutations, mutationSizeBytes, mutationCommitTime);
+ mutationMetricQueue.addMetricsForTable(Bytes.toString(htableName), mutationsMetric);
+ } catch (Exception e) {
+ SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e);
+ if (inferredE != null) {
+ if (shouldRetry && retryCount == 0 && inferredE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) {
+ // Swallow this exception once, as it's possible that we split after sending the index metadata
+ // and one of the region servers doesn't have it. This will cause it to have it the next go around.
+ // If it fails again, we don't retry.
+ String msg = "Swallowing exception and retrying after clearing meta cache on connection. " + inferredE;
+ logger.warn(LogUtil.addCustomAnnotations(msg, connection));
+ connection.getQueryServices().clearTableRegionCache(htableName);
+
+ // add a new child span as this one failed
+ child.addTimelineAnnotation(msg);
+ child.stop();
+ child = Tracing.child(span,"Failed batch, attempting retry");
+
+ continue;
+ }
+ e = inferredE;
}
+ sqlE = new CommitException(e, getUncommittedStatementIndexes());
} finally {
try {
- if (cache != null) {
- cache.close();
+ hTable.close();
+ } catch (IOException e) {
+ if (sqlE != null) {
+ sqlE.setNextException(ServerUtil.parseServerException(e));
+ } else {
+ sqlE = ServerUtil.parseServerException(e);
}
} finally {
- if (sqlE != null) {
- throw sqlE;
+ try {
+ if (cache != null) {
+ cache.close();
+ }
+ } finally {
+ if (sqlE != null) {
+ throw sqlE;
+ }
}
}
}
- }
- } while (shouldRetry && retryCount++ < 1);
- isDataTable = false;
- }
- if (tableRef.getTable().getType() != PTableType.INDEX) {
- numRows -= entry.getValue().size();
+ } while (shouldRetry && retryCount++ < 1);
+ isDataTable = false;
+ }
+ if (tableRef.getTable().getType() != PTableType.INDEX) {
+ numRows -= entry.getValue().size();
+ }
+ iterator.remove(); // Remove batches as we process them
}
- iterator.remove(); // Remove batches as we process them
}
- trace.close();
assert(numRows==0);
assert(this.mutations.isEmpty());
}
@@ -481,7 +510,7 @@ public class MutationState implements SQLCloseable {
numRows = 0;
}
- private int[] getUncommittedSattementIndexes() {
+ private int[] getUncommittedStatementIndexes() {
int[] result = new int[0];
for (Map<ImmutableBytesPtr, RowMutationState> rowMutations : mutations.values()) {
for (RowMutationState rowMutationState : rowMutations.values()) {
@@ -533,12 +562,23 @@ public class MutationState implements SQLCloseable {
int[] getStatementIndexes() {
return statementIndexes;
}
-
+
void join(RowMutationState newRow) {
getColumnValues().putAll(newRow.getColumnValues());
statementIndexes = joinSortedIntArrays(statementIndexes, newRow.getStatementIndexes());
}
-
+ }
+
+ public ReadMetricQueue getReadMetricQueue() {
+ return readMetricQueue;
+ }
+ public void setReadMetricQueue(ReadMetricQueue readMetricQueue) {
+ this.readMetricQueue = readMetricQueue;
}
+
+ public MutationMetricQueue getMutationMetricQueue() {
+ return mutationMetricQueue;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
index 031b58b..2bed3a0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
@@ -49,7 +49,7 @@ public class UnionPlan implements QueryPlan {
private final FilterableStatement statement;
private final ParameterMetaData paramMetaData;
private final OrderBy orderBy;
- private final StatementContext context;
+ private final StatementContext parentContext;
private final Integer limit;
private final GroupBy groupBy;
private final RowProjector projector;
@@ -59,7 +59,7 @@ public class UnionPlan implements QueryPlan {
public UnionPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector,
Integer limit, OrderBy orderBy, GroupBy groupBy, List<QueryPlan> plans, ParameterMetaData paramMetaData) throws SQLException {
- this.context = context;
+ this.parentContext = context;
this.statement = statement;
this.tableRef = table;
this.projector = projector;
@@ -128,7 +128,7 @@ public class UnionPlan implements QueryPlan {
}
public final ResultIterator iterator(final List<? extends SQLCloseable> dependencies) throws SQLException {
- this.iterators = new UnionResultIterators(plans);
+ this.iterators = new UnionResultIterators(plans, parentContext);
ResultIterator scanner;
boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty();
@@ -175,7 +175,7 @@ public class UnionPlan implements QueryPlan {
@Override
public StatementContext getContext() {
- return context;
+ return parentContext;
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 6a3847b..43731cb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -18,8 +18,8 @@
package org.apache.phoenix.iterate;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.FAILED_QUERY;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.QUERY_TIMEOUT;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_QUERY_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIMEOUT_COUNTER;
import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
import java.sql.SQLException;
@@ -540,12 +540,13 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
} catch (ExecutionException e) {
try { // Rethrow as SQLException
throw ServerUtil.parseServerException(e);
- } catch (StaleRegionBoundaryCacheException e2) {
+ } catch (StaleRegionBoundaryCacheException e2) {
// Catch only to try to recover from region boundary cache being out of date
List<List<Pair<Scan,Future<PeekingResultIterator>>>> newFutures = Lists.newArrayListWithExpectedSize(2);
if (!clearedCache) { // Clear cache once so that we rejigger job based on new boundaries
services.clearTableRegionCache(physicalTableName);
clearedCache = true;
+ context.getOverallQueryMetrics().cacheRefreshedDueToSplits();
}
// Resubmit just this portion of work again
Scan oldScan = scanPair.getFirst();
@@ -582,7 +583,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
success = true;
return iterators;
} catch (TimeoutException e) {
- QUERY_TIMEOUT.increment();
+ context.getOverallQueryMetrics().queryTimedOut();
+ GLOBAL_QUERY_TIMEOUT_COUNTER.increment();
// thrown when a thread times out waiting for the future.get() call to return
toThrow = new SQLExceptionInfo.Builder(SQLExceptionCode.OPERATION_TIMED_OUT)
.setMessage(". Query couldn't be completed in the alloted time: " + queryTimeOut + " ms")
@@ -616,7 +618,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
}
} finally {
if (toThrow != null) {
- FAILED_QUERY.increment();
+ GLOBAL_FAILED_QUERY_COUNTER.increment();
+ context.getOverallQueryMetrics().queryFailed();
throw toThrow;
}
}
@@ -639,7 +642,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
if (futurePair != null) {
Future<PeekingResultIterator> future = futurePair.getSecond();
if (future != null) {
- cancelledWork |= future.cancel(false);
+ future.cancel(false);
}
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
index e1ee8db..f272e55 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
@@ -19,6 +19,7 @@
package org.apache.phoenix.iterate;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.STARTKEY_OFFSET;
+import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
import java.sql.SQLException;
import java.util.List;
@@ -66,18 +67,17 @@ public class ChunkedResultIterator implements PeekingResultIterator {
}
@Override
- public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException {
- scanner.close(); //close the iterator since we don't need it anymore.
+ public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String tableName) throws SQLException {
if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("ChunkedResultIteratorFactory.newIterator over " + tableRef.getTable().getName().getString() + " with " + scan, ScanUtil.getCustomAnnotations(scan)));
return new ChunkedResultIterator(delegateFactory, context, tableRef, scan,
context.getConnection().getQueryServices().getProps().getLong(
QueryServices.SCAN_RESULT_CHUNK_SIZE,
- QueryServicesOptions.DEFAULT_SCAN_RESULT_CHUNK_SIZE));
+ QueryServicesOptions.DEFAULT_SCAN_RESULT_CHUNK_SIZE), scanner);
}
}
- public ChunkedResultIterator(ParallelIteratorFactory delegateIteratorFactory,
- StatementContext context, TableRef tableRef, Scan scan, long chunkSize) throws SQLException {
+ private ChunkedResultIterator(ParallelIteratorFactory delegateIteratorFactory,
+ StatementContext context, TableRef tableRef, Scan scan, long chunkSize, ResultIterator scanner) throws SQLException {
this.delegateIteratorFactory = delegateIteratorFactory;
this.context = context;
this.tableRef = tableRef;
@@ -87,9 +87,9 @@ public class ChunkedResultIterator implements PeekingResultIterator {
// to get parallel scans kicked off in separate threads. If we delay this,
// we'll get serialized behavior (see PHOENIX-
if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("Get first chunked result iterator over " + tableRef.getTable().getName().getString() + " with " + scan, ScanUtil.getCustomAnnotations(scan)));
- ResultIterator singleChunkResultIterator = new SingleChunkResultIterator(
- new TableResultIterator(context, tableRef, scan), chunkSize);
- resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan);
+ ResultIterator singleChunkResultIterator = new SingleChunkResultIterator(scanner, chunkSize);
+ String tableName = tableRef.getTable().getPhysicalName().getString();
+ resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan, tableName);
}
@Override
@@ -118,9 +118,10 @@ public class ChunkedResultIterator implements PeekingResultIterator {
scan = ScanUtil.newScan(scan);
scan.setStartRow(ByteUtil.copyKeyBytesIfNecessary(lastKey));
if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("Get next chunked result iterator over " + tableRef.getTable().getName().getString() + " with " + scan, ScanUtil.getCustomAnnotations(scan)));
+ String tableName = tableRef.getTable().getPhysicalName().getString();
ResultIterator singleChunkResultIterator = new SingleChunkResultIterator(
- new TableResultIterator(context, tableRef, scan), chunkSize);
- resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan);
+ new TableResultIterator(context, tableRef, scan, context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName)), chunkSize);
+ resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan, tableName);
}
return resultIterator;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
index df8f658..f25e373 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
@@ -25,10 +25,10 @@ import org.apache.phoenix.compile.StatementContext;
public interface ParallelIteratorFactory {
public static ParallelIteratorFactory NOOP_FACTORY = new ParallelIteratorFactory() {
@Override
- public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan)
+ public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String physicalTableName)
throws SQLException {
return LookAheadResultIterator.wrap(scanner);
}
};
- PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException;
+ PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String physicalTableName) throws SQLException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index be10c20..2dfbfe3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -17,7 +17,7 @@
*/
package org.apache.phoenix.iterate;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.PARALLEL_SCANS;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_NUM_PARALLEL_SCANS;
import java.sql.SQLException;
import java.util.Collections;
@@ -30,6 +30,10 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.job.JobManager.JobCallable;
+import org.apache.phoenix.monitoring.MetricType;
+import org.apache.phoenix.monitoring.CombinableMetric;
+import org.apache.phoenix.monitoring.ReadMetricQueue;
+import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
import org.apache.phoenix.trace.util.Tracing;
import org.apache.phoenix.util.LogUtil;
import org.apache.phoenix.util.ScanUtil;
@@ -79,19 +83,25 @@ public class ParallelIterators extends BaseResultIterators {
// Shuffle so that we start execution across many machines
// before we fill up the thread pool
Collections.shuffle(scanLocations);
- PARALLEL_SCANS.update(scanLocations.size());
+ ReadMetricQueue readMetrics = context.getReadMetricsQueue();
+ final String physicalTableName = tableRef.getTable().getPhysicalName().getString();
+ int numScans = scanLocations.size();
+ context.getOverallQueryMetrics().updateNumParallelScans(numScans);
+ GLOBAL_NUM_PARALLEL_SCANS.update(numScans);
for (ScanLocator scanLocation : scanLocations) {
final Scan scan = scanLocation.getScan();
+ final CombinableMetric scanMetrics = readMetrics.allotMetric(MetricType.SCAN_BYTES, physicalTableName);
+ final TaskExecutionMetricsHolder taskMetrics = new TaskExecutionMetricsHolder(readMetrics, physicalTableName);
Future<PeekingResultIterator> future = executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() {
-
+
@Override
public PeekingResultIterator call() throws Exception {
long startTime = System.currentTimeMillis();
- ResultIterator scanner = new TableResultIterator(context, tableRef, scan);
+ ResultIterator scanner = new TableResultIterator(context, tableRef, scan, scanMetrics);
if (logger.isDebugEnabled()) {
logger.debug(LogUtil.addCustomAnnotations("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + scan, ScanUtil.getCustomAnnotations(scan)));
}
- PeekingResultIterator iterator = iteratorFactory.newIterator(context, scanner, scan);
+ PeekingResultIterator iterator = iteratorFactory.newIterator(context, scanner, scan, physicalTableName);
// Fill the scanner's cache. This helps reduce latency since we are parallelizing the I/O needed.
iterator.peek();
@@ -109,6 +119,11 @@ public class ParallelIterators extends BaseResultIterators {
public Object getJobId() {
return ParallelIterators.this;
}
+
+ @Override
+ public TaskExecutionMetricsHolder getTaskExecutionMetric() {
+ return taskMetrics;
+ }
}, "Parallel scanner for table: " + tableRef.getTable().getName().getString()));
// Add our future in the right place so that we can concatenate the
// results of the inner futures versus merge sorting across all of them.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java
index 4a9ad3e..92ac570 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java
@@ -18,7 +18,7 @@
package org.apache.phoenix.iterate;
import static com.google.common.base.Preconditions.checkArgument;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.FAILED_QUERY;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_QUERY_COUNTER;
import java.sql.SQLException;
import java.util.ArrayList;
@@ -268,7 +268,7 @@ public class RoundRobinResultIterator implements ResultIterator {
}
} finally {
if (toThrow != null) {
- FAILED_QUERY.increment();
+ GLOBAL_FAILED_QUERY_COUNTER.increment();
throw toThrow;
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
index fd65d0c..b722794 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
@@ -17,7 +17,7 @@
*/
package org.apache.phoenix.iterate;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.SCAN_BYTES;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SCAN_BYTES;
import java.io.IOException;
import java.sql.SQLException;
@@ -28,15 +28,20 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.phoenix.monitoring.PhoenixMetrics;
+import org.apache.phoenix.monitoring.CombinableMetric.NoOpRequestMetric;
+import org.apache.phoenix.monitoring.GlobalClientMetrics;
+import org.apache.phoenix.monitoring.CombinableMetric;
import org.apache.phoenix.schema.tuple.ResultTuple;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.util.ServerUtil;
public class ScanningResultIterator implements ResultIterator {
private final ResultScanner scanner;
- public ScanningResultIterator(ResultScanner scanner) {
+ private final CombinableMetric scanMetrics;
+
+ public ScanningResultIterator(ResultScanner scanner, CombinableMetric scanMetrics) {
this.scanner = scanner;
+ this.scanMetrics = scanMetrics;
}
@Override
@@ -66,17 +71,18 @@ public class ScanningResultIterator implements ResultIterator {
return "ScanningResultIterator [scanner=" + scanner + "]";
}
- private static void calculateScanSize(Result result) {
- if (PhoenixMetrics.isMetricsEnabled()) {
- if (result != null) {
- Cell[] cells = result.rawCells();
- long scanResultSize = 0;
- for (Cell cell : cells) {
- KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
- scanResultSize += kv.heapSize();
- }
- SCAN_BYTES.update(scanResultSize);
- }
- }
- }
+ private void calculateScanSize(Result result) {
+ if (GlobalClientMetrics.isMetricsEnabled() || scanMetrics != NoOpRequestMetric.INSTANCE) {
+ if (result != null) {
+ Cell[] cells = result.rawCells();
+ long scanResultSize = 0;
+ for (Cell cell : cells) {
+ KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+ scanResultSize += kv.heapSize();
+ }
+ scanMetrics.change(scanResultSize);
+ GLOBAL_SCAN_BYTES.update(scanResultSize);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
index 6b3b5e3..516d73e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
@@ -17,6 +17,8 @@
*/
package org.apache.phoenix.iterate;
+import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
+
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
@@ -29,11 +31,9 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.iterate.TableResultIterator.ScannerCreation;
import org.apache.phoenix.job.JobManager.JobCallable;
+import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
import org.apache.phoenix.trace.util.Tracing;
-import org.apache.phoenix.util.LogUtil;
import org.apache.phoenix.util.ScanUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -48,7 +48,6 @@ import com.google.common.collect.Lists;
* @since 0.1
*/
public class SerialIterators extends BaseResultIterators {
- private static final Logger logger = LoggerFactory.getLogger(SerialIterators.class);
private static final String NAME = "SERIAL";
private final ParallelIteratorFactory iteratorFactory;
@@ -74,18 +73,15 @@ public class SerialIterators extends BaseResultIterators {
Scan lastScan = scans.get(scans.size()-1);
final Scan overallScan = ScanUtil.newScan(firstScan);
overallScan.setStopRow(lastScan.getStopRow());
+ final String tableName = tableRef.getTable().getPhysicalName().getString();
+ final TaskExecutionMetricsHolder taskMetrics = new TaskExecutionMetricsHolder(context.getReadMetricsQueue(), tableName);
Future<PeekingResultIterator> future = executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() {
-
@Override
public PeekingResultIterator call() throws Exception {
List<PeekingResultIterator> concatIterators = Lists.newArrayListWithExpectedSize(scans.size());
for (final Scan scan : scans) {
- long startTime = System.currentTimeMillis();
- ResultIterator scanner = new TableResultIterator(context, tableRef, scan, ScannerCreation.DELAYED);
- if (logger.isDebugEnabled()) {
- logger.debug(LogUtil.addCustomAnnotations("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + scan, ScanUtil.getCustomAnnotations(scan)));
- }
- concatIterators.add(iteratorFactory.newIterator(context, scanner, scan));
+ ResultIterator scanner = new TableResultIterator(context, tableRef, scan, context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), ScannerCreation.DELAYED);
+ concatIterators.add(iteratorFactory.newIterator(context, scanner, scan, tableName));
}
PeekingResultIterator concatIterator = ConcatResultIterator.newIterator(concatIterators);
allIterators.add(concatIterator);
@@ -101,6 +97,11 @@ public class SerialIterators extends BaseResultIterators {
public Object getJobId() {
return SerialIterators.this;
}
+
+ @Override
+ public TaskExecutionMetricsHolder getTaskExecutionMetric() {
+ return taskMetrics;
+ }
}, "Serial scanner for table: " + tableRef.getTable().getName().getString()));
// Add our singleton Future which will execute serially
nestedFutures.add(Collections.singletonList(new Pair<Scan,Future<PeekingResultIterator>>(overallScan,future)));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
index f49bce5..540b410 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
@@ -17,6 +17,11 @@
*/
package org.apache.phoenix.iterate;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MEMORY_CHUNK_BYTES;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MEMORY_WAIT_TIME;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SPOOL_FILE_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SPOOL_FILE_SIZE;
+
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
@@ -34,6 +39,9 @@ import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.memory.MemoryManager;
import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.monitoring.MemoryMetricsHolder;
+import org.apache.phoenix.monitoring.ReadMetricQueue;
+import org.apache.phoenix.monitoring.SpoolingMetricsHolder;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.tuple.ResultTuple;
@@ -42,8 +50,6 @@ import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.ResultUtil;
import org.apache.phoenix.util.ServerUtil;
import org.apache.phoenix.util.TupleUtil;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.NUM_SPOOL_FILE;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.SPOOL_FILE_SIZE;
@@ -56,7 +62,10 @@ import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.SPOOL_FILE
* @since 0.1
*/
public class SpoolingResultIterator implements PeekingResultIterator {
+
private final PeekingResultIterator spoolFrom;
+ private final SpoolingMetricsHolder spoolMetrics;
+ private final MemoryMetricsHolder memoryMetrics;
public static class SpoolingResultIteratorFactory implements ParallelIteratorFactory {
private final QueryServices services;
@@ -65,14 +74,16 @@ public class SpoolingResultIterator implements PeekingResultIterator {
this.services = services;
}
@Override
- public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException {
- return new SpoolingResultIterator(scanner, services);
+ public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String physicalTableName) throws SQLException {
+ ReadMetricQueue readRequestMetric = context.getReadMetricsQueue();
+ SpoolingMetricsHolder spoolMetrics = new SpoolingMetricsHolder(readRequestMetric, physicalTableName);
+ MemoryMetricsHolder memoryMetrics = new MemoryMetricsHolder(readRequestMetric, physicalTableName);
+ return new SpoolingResultIterator(spoolMetrics, memoryMetrics, scanner, services);
}
-
}
- public SpoolingResultIterator(ResultIterator scanner, QueryServices services) throws SQLException {
- this (scanner, services.getMemoryManager(),
+ private SpoolingResultIterator(SpoolingMetricsHolder spoolMetrics, MemoryMetricsHolder memoryMetrics, ResultIterator scanner, QueryServices services) throws SQLException {
+ this (spoolMetrics, memoryMetrics, scanner, services.getMemoryManager(),
services.getProps().getInt(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES),
services.getProps().getLong(QueryServices.MAX_SPOOL_TO_DISK_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SPOOL_TO_DISK_BYTES),
services.getProps().get(QueryServices.SPOOL_DIRECTORY, QueryServicesOptions.DEFAULT_SPOOL_DIRECTORY));
@@ -87,9 +98,15 @@ public class SpoolingResultIterator implements PeekingResultIterator {
* the memory manager) is exceeded.
* @throws SQLException
*/
- SpoolingResultIterator(ResultIterator scanner, MemoryManager mm, final int thresholdBytes, final long maxSpoolToDisk, final String spoolDirectory) throws SQLException {
+ SpoolingResultIterator(SpoolingMetricsHolder sMetrics, MemoryMetricsHolder mMetrics, ResultIterator scanner, MemoryManager mm, final int thresholdBytes, final long maxSpoolToDisk, final String spoolDirectory) throws SQLException {
+ this.spoolMetrics = sMetrics;
+ this.memoryMetrics = mMetrics;
boolean success = false;
+ long startTime = System.currentTimeMillis();
final MemoryChunk chunk = mm.allocate(0, thresholdBytes);
+ long waitTime = System.currentTimeMillis() - startTime;
+ GLOBAL_MEMORY_WAIT_TIME.update(waitTime);
+ memoryMetrics.getMemoryWaitTimeMetric().change(waitTime);
DeferredFileOutputStream spoolTo = null;
try {
// Can't be bigger than int, since it's the max of the above allocation
@@ -97,8 +114,11 @@ public class SpoolingResultIterator implements PeekingResultIterator {
spoolTo = new DeferredFileOutputStream(size, "ResultSpooler",".bin", new File(spoolDirectory)) {
@Override
protected void thresholdReached() throws IOException {
- super.thresholdReached();
- chunk.close();
+ try {
+ super.thresholdReached();
+ } finally {
+ chunk.close();
+ }
}
};
DataOutputStream out = new DataOutputStream(spoolTo);
@@ -116,9 +136,14 @@ public class SpoolingResultIterator implements PeekingResultIterator {
byte[] data = spoolTo.getData();
chunk.resize(data.length);
spoolFrom = new InMemoryResultIterator(data, chunk);
+ GLOBAL_MEMORY_CHUNK_BYTES.update(data.length);
+ memoryMetrics.getMemoryChunkSizeMetric().change(data.length);
} else {
- NUM_SPOOL_FILE.increment();
- SPOOL_FILE_SIZE.update(spoolTo.getFile().length());
+ long sizeOfSpoolFile = spoolTo.getFile().length();
+ GLOBAL_SPOOL_FILE_SIZE.update(sizeOfSpoolFile);
+ GLOBAL_SPOOL_FILE_COUNTER.increment();
+ spoolMetrics.getNumSpoolFileMetric().increment();
+ spoolMetrics.getSpoolFileSizeMetric().change(sizeOfSpoolFile);
spoolFrom = new OnDiskResultIterator(spoolTo.getFile());
}
success = true;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index ea13dfd..6f040d1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.monitoring.CombinableMetric;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.util.Closeables;
@@ -44,9 +45,10 @@ public class TableResultIterator extends ExplainTable implements ResultIterator
private final Scan scan;
private final HTableInterface htable;
private volatile ResultIterator delegate;
-
- public TableResultIterator(StatementContext context, TableRef tableRef) throws SQLException {
- this(context, tableRef, context.getScan());
+ private final CombinableMetric scanMetrics;
+
+ public TableResultIterator(StatementContext context, TableRef tableRef, CombinableMetric scanMetrics) throws SQLException {
+ this(context, tableRef, context.getScan(), scanMetrics);
}
/*
@@ -62,7 +64,7 @@ public class TableResultIterator extends ExplainTable implements ResultIterator
delegate = this.delegate;
if (delegate == null) {
try {
- this.delegate = delegate = isClosing ? ResultIterator.EMPTY_ITERATOR : new ScanningResultIterator(htable.getScanner(scan));
+ this.delegate = delegate = isClosing ? ResultIterator.EMPTY_ITERATOR : new ScanningResultIterator(htable.getScanner(scan), scanMetrics);
} catch (IOException e) {
Closeables.closeQuietly(htable);
throw ServerUtil.parseServerException(e);
@@ -73,13 +75,14 @@ public class TableResultIterator extends ExplainTable implements ResultIterator
return delegate;
}
- public TableResultIterator(StatementContext context, TableRef tableRef, Scan scan) throws SQLException {
- this(context, tableRef, scan, ScannerCreation.IMMEDIATE);
+ public TableResultIterator(StatementContext context, TableRef tableRef, Scan scan, CombinableMetric scanMetrics) throws SQLException {
+ this(context, tableRef, scan, scanMetrics, ScannerCreation.IMMEDIATE);
}
- public TableResultIterator(StatementContext context, TableRef tableRef, Scan scan, ScannerCreation creationMode) throws SQLException {
+ public TableResultIterator(StatementContext context, TableRef tableRef, Scan scan, CombinableMetric scanMetrics, ScannerCreation creationMode) throws SQLException {
super(context, tableRef);
this.scan = scan;
+ this.scanMetrics = scanMetrics;
htable = context.getConnection().getQueryServices().getTable(tableRef.getTable().getPhysicalName().getBytes());
if (creationMode == ScannerCreation.IMMEDIATE) {
getDelegate(false);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/iterate/UnionResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/UnionResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/UnionResultIterators.java
index b7c8b21..2296982 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/UnionResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/UnionResultIterators.java
@@ -22,6 +22,9 @@ import java.util.List;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.monitoring.OverAllQueryMetrics;
+import org.apache.phoenix.monitoring.ReadMetricQueue;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.util.ServerUtil;
@@ -39,14 +42,22 @@ public class UnionResultIterators implements ResultIterators {
private final List<List<Scan>> scans;
private final List<PeekingResultIterator> iterators;
private final List<QueryPlan> plans;
-
- public UnionResultIterators(List<QueryPlan> plans) throws SQLException {
+ private final List<ReadMetricQueue> readMetricsList;
+ private final List<OverAllQueryMetrics> overAllQueryMetricsList;
+ private boolean closed;
+ private final StatementContext parentStmtCtx;
+ public UnionResultIterators(List<QueryPlan> plans, StatementContext parentStmtCtx) throws SQLException {
+ this.parentStmtCtx = parentStmtCtx;
this.plans = plans;
int nPlans = plans.size();
iterators = Lists.newArrayListWithExpectedSize(nPlans);
splits = Lists.newArrayListWithExpectedSize(nPlans * 30);
scans = Lists.newArrayListWithExpectedSize(nPlans * 10);
+ readMetricsList = Lists.newArrayListWithCapacity(nPlans);
+ overAllQueryMetricsList = Lists.newArrayListWithCapacity(nPlans);
for (QueryPlan plan : this.plans) {
+ readMetricsList.add(plan.getContext().getReadMetricsQueue());
+ overAllQueryMetricsList.add(plan.getContext().getOverallQueryMetrics());
iterators.add(LookAheadResultIterator.wrap(plan.iterator()));
splits.addAll(plan.getSplits());
scans.addAll(plan.getScans());
@@ -59,32 +70,47 @@ public class UnionResultIterators implements ResultIterators {
}
@Override
- public void close() throws SQLException {
- SQLException toThrow = null;
- try {
- if (iterators != null) {
- for (int index=0; index < iterators.size(); index++) {
- PeekingResultIterator iterator = iterators.get(index);
- try {
- iterator.close();
- } catch (Exception e) {
- if (toThrow == null) {
- toThrow = ServerUtil.parseServerException(e);
- } else {
- toThrow.setNextException(ServerUtil.parseServerException(e));
+ public void close() throws SQLException {
+ if (!closed) {
+ closed = true;
+ SQLException toThrow = null;
+ try {
+ if (iterators != null) {
+ for (int index=0; index < iterators.size(); index++) {
+ PeekingResultIterator iterator = iterators.get(index);
+ try {
+ iterator.close();
+ } catch (Exception e) {
+ if (toThrow == null) {
+ toThrow = ServerUtil.parseServerException(e);
+ } else {
+ toThrow.setNextException(ServerUtil.parseServerException(e));
+ }
}
}
}
- }
- } catch (Exception e) {
- toThrow = ServerUtil.parseServerException(e);
- } finally {
- if (toThrow != null) {
- throw toThrow;
+ } catch (Exception e) {
+ toThrow = ServerUtil.parseServerException(e);
+ } finally {
+ setMetricsInParentContext();
+ if (toThrow != null) {
+ throw toThrow;
+ }
}
}
}
-
+
+ private void setMetricsInParentContext() {
+ ReadMetricQueue parentCtxReadMetrics = parentStmtCtx.getReadMetricsQueue();
+ for (ReadMetricQueue readMetrics : readMetricsList) {
+ parentCtxReadMetrics.combineReadMetrics(readMetrics);
+ }
+ OverAllQueryMetrics parentCtxQueryMetrics = parentStmtCtx.getOverallQueryMetrics();
+ for (OverAllQueryMetrics metric : overAllQueryMetricsList) {
+ parentCtxQueryMetrics.combine(metric);
+ }
+ }
+
@Override
public List<List<Scan>> getScans() {
return scans;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index e33e8ee..a6479f3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -122,7 +122,7 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
private final Properties info;
private List<SQLCloseable> statements = new ArrayList<SQLCloseable>();
private final Map<PDataType<?>, Format> formatters = new HashMap<>();
- private MutationState mutationState;
+ private final MutationState mutationState;
private final int mutateBatchSize;
private final Long scn;
private boolean isAutoCommit = false;
@@ -136,8 +136,9 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
private boolean isClosed = false;
private Sampler<?> sampler;
private boolean readOnly = false;
- private Map<String, String> customTracingAnnotations = emptyMap();
-
+ private Map<String, String> customTracingAnnotations = emptyMap();
+ private final boolean isRequestLevelMetricsEnabled;
+
static {
Tracing.addTraceMetricsSource();
}
@@ -232,6 +233,7 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
! Objects.equal(tenantId, function.getTenantId()));
}
};
+ this.isRequestLevelMetricsEnabled = JDBCUtil.isCollectingRequestLevelMetricsEnabled(url, info, this.services.getProps());
this.mutationState = newMutationState(maxSize);
this.metaData = metaData.pruneTables(pruner);
this.metaData = metaData.pruneFunctions(pruner);
@@ -433,6 +435,7 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
return;
}
try {
+ clearMetrics();
try {
if (traceScope != null) {
traceScope.close();
@@ -853,4 +856,23 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
public void setTraceScope(TraceScope traceScope) {
this.traceScope = traceScope;
}
+
+ public Map<String, Map<String, Long>> getMutationMetrics() {
+ return mutationState.getMutationMetricQueue().aggregate();
+ }
+
+ public Map<String, Map<String, Long>> getReadMetrics() {
+ return mutationState.getReadMetricQueue() != null ? mutationState.getReadMetricQueue().aggregate() : Collections.<String, Map<String, Long>>emptyMap();
+ }
+
+ public boolean isRequestLevelMetricsEnabled() {
+ return isRequestLevelMetricsEnabled;
+ }
+
+ public void clearMetrics() {
+ mutationState.getMutationMetricQueue().clearMetrics();
+ if (mutationState.getReadMetricQueue() != null) {
+ mutationState.getReadMetricQueue().clearMetrics();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index d1b3b27..2dd8af4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.compile.ColumnProjector;
import org.apache.phoenix.compile.ExpressionProjector;
import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
@@ -311,7 +312,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho
public static final int CLIENT_KEY_VALUE_BUILDER_THRESHOLD = VersionUtil.encodeVersion("0", "94", "14");
PhoenixDatabaseMetaData(PhoenixConnection connection) throws SQLException {
- this.emptyResultSet = new PhoenixResultSet(ResultIterator.EMPTY_ITERATOR, RowProjector.EMPTY_PROJECTOR, new PhoenixStatement(connection));
+ this.emptyResultSet = new PhoenixResultSet(ResultIterator.EMPTY_ITERATOR, RowProjector.EMPTY_PROJECTOR, new StatementContext(new PhoenixStatement(connection), false));
this.connection = connection;
}
@@ -509,11 +510,10 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho
public PhoenixStatement newStatement(PhoenixConnection connection) {
return new PhoenixStatement(connection) {
@Override
- protected PhoenixResultSet newResultSet(ResultIterator iterator, RowProjector projector)
- throws SQLException {
- return new PhoenixResultSet(
- new TenantColumnFilteringIterator(iterator, projector),
- projector, this);
+ protected PhoenixResultSet newResultSet(ResultIterator iterator, RowProjector projector,
+ StatementContext context) throws SQLException {
+ return new PhoenixResultSet(new TenantColumnFilteringIterator(iterator, projector),
+ projector, context);
}
};
}
@@ -523,7 +523,12 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho
}
return stmt.executeQuery(buf.toString());
}
-
+
+// private ColumnResolver getColumnResolverForCatalogTable() throws SQLException {
+// TableRef tableRef = new TableRef(getTable(connection, SYSTEM_CATALOG_NAME));
+// return FromCompiler.getResolver(tableRef);
+// }
+
/**
* Filters the tenant id column out of a column metadata result set (thus, where each row is a column definition).
* The tenant id is by definition the first column of the primary key, but the primary key does not necessarily
@@ -1007,7 +1012,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho
}
@Override
public ResultSet getTableTypes() throws SQLException {
- return new PhoenixResultSet(new MaterializedResultIterator(TABLE_TYPE_TUPLES), TABLE_TYPE_ROW_PROJECTOR, new PhoenixStatement(connection));
+ return new PhoenixResultSet(new MaterializedResultIterator(TABLE_TYPE_TUPLES), TABLE_TYPE_ROW_PROJECTOR, new StatementContext(new PhoenixStatement(connection), false));
}
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
index 8ee56ea..da06370 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
@@ -39,16 +39,21 @@ import java.sql.Time;
import java.sql.Timestamp;
import java.text.Format;
import java.util.Calendar;
+import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.compile.ColumnProjector;
import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.monitoring.OverAllQueryMetrics;
+import org.apache.phoenix.monitoring.ReadMetricQueue;
import org.apache.phoenix.schema.tuple.ResultTuple;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PBoolean;
@@ -109,18 +114,25 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable, org.apache.pho
private final ResultIterator scanner;
private final RowProjector rowProjector;
private final PhoenixStatement statement;
+ private final StatementContext context;
+ private final ReadMetricQueue readMetricsQueue;
+ private final OverAllQueryMetrics overAllQueryMetrics;
private final ImmutableBytesWritable ptr = new ImmutableBytesWritable();
private Tuple currentRow = BEFORE_FIRST;
private boolean isClosed = false;
private boolean wasNull = false;
-
- public PhoenixResultSet(ResultIterator resultIterator, RowProjector rowProjector, PhoenixStatement statement) throws SQLException {
+ private boolean firstRecordRead = false;
+
+ public PhoenixResultSet(ResultIterator resultIterator, RowProjector rowProjector, StatementContext ctx) throws SQLException {
this.rowProjector = rowProjector;
this.scanner = resultIterator;
- this.statement = statement;
+ this.context = ctx;
+ this.statement = context.getStatement();
+ this.readMetricsQueue = context.getReadMetricsQueue();
+ this.overAllQueryMetrics = context.getOverallQueryMetrics();
}
-
+
@Override
public boolean absolute(int row) throws SQLException {
throw new SQLFeatureNotSupportedException();
@@ -147,14 +159,14 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable, org.apache.pho
@Override
public void close() throws SQLException {
- if (isClosed) {
- return;
- }
+ if (isClosed) { return; }
try {
scanner.close();
} finally {
isClosed = true;
statement.getResultSets().remove(this);
+ overAllQueryMetrics.endQuery();
+ overAllQueryMetrics.stopResultSetWatch();
}
}
@@ -754,6 +766,10 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable, org.apache.pho
public boolean next() throws SQLException {
checkOpen();
try {
+ if (!firstRecordRead) {
+ firstRecordRead = true;
+ overAllQueryMetrics.startResultSetWatch();
+ }
currentRow = scanner.next();
rowProjector.reset();
} catch (RuntimeException e) {
@@ -764,6 +780,10 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable, org.apache.pho
}
throw e;
}
+ if (currentRow == null) {
+ overAllQueryMetrics.endQuery();
+ overAllQueryMetrics.stopResultSetWatch();
+ }
return currentRow != null;
}
@@ -1261,4 +1281,18 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable, org.apache.pho
public ResultIterator getUnderlyingIterator() {
return scanner;
}
+
+ public Map<String, Map<String, Long>> getReadMetrics() {
+ return readMetricsQueue.aggregate();
+ }
+
+ public Map<String, Long> getOverAllRequestReadMetrics() {
+ return overAllQueryMetrics.publish();
+ }
+
+ public void resetMetrics() {
+ readMetricsQueue.clearMetrics();
+ overAllQueryMetrics.reset();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 51fb027..9b67c22 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -17,9 +17,9 @@
*/
package org.apache.phoenix.jdbc;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.MUTATION_COUNT;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.QUERY_COUNT;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.QUERY_TIME;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIME;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SELECT_SQL_COUNTER;
import java.io.IOException;
import java.io.Reader;
@@ -213,8 +213,8 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
return resultSets;
}
- protected PhoenixResultSet newResultSet(ResultIterator iterator, RowProjector projector) throws SQLException {
- return new PhoenixResultSet(iterator, projector, this);
+ protected PhoenixResultSet newResultSet(ResultIterator iterator, RowProjector projector, StatementContext context) throws SQLException {
+ return new PhoenixResultSet(iterator, projector, context);
}
protected boolean execute(final CompilableStatement stmt) throws SQLException {
@@ -232,7 +232,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
}
protected PhoenixResultSet executeQuery(final CompilableStatement stmt) throws SQLException {
- QUERY_COUNT.increment();
+ GLOBAL_SELECT_SQL_COUNTER.increment();
try {
return CallRunner.run(
new CallRunner.CallableThrowable<PhoenixResultSet, SQLException>() {
@@ -250,7 +250,9 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
String explainPlan = QueryUtil.getExplainPlan(resultIterator);
logger.debug(LogUtil.addCustomAnnotations("Explain plan: " + explainPlan, connection));
}
- PhoenixResultSet rs = newResultSet(resultIterator, plan.getProjector());
+ StatementContext context = plan.getContext();
+ context.getOverallQueryMetrics().startQuery();
+ PhoenixResultSet rs = newResultSet(resultIterator, plan.getProjector(), context);
resultSets.add(rs);
setLastQueryPlan(plan);
setLastResultSet(rs);
@@ -269,7 +271,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
// Regardless of whether the query was successfully handled or not,
// update the time spent so far. If needed, we can separate out the
// success times and failure times.
- QUERY_TIME.update(System.currentTimeMillis() - startTime);
+ GLOBAL_QUERY_TIME.update(System.currentTimeMillis() - startTime);
}
}
}, PhoenixContextExecutor.inContext());
@@ -285,7 +287,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
SQLExceptionCode.READ_ONLY_CONNECTION).
build().buildException();
}
- MUTATION_COUNT.increment();
+ GLOBAL_MUTATION_SQL_COUNTER.increment();
try {
return CallRunner
.run(
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/job/JobManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/job/JobManager.java b/phoenix-core/src/main/java/org/apache/phoenix/job/JobManager.java
index 31ef742..7406e46 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/job/JobManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/job/JobManager.java
@@ -17,11 +17,11 @@
*/
package org.apache.phoenix.job;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.REJECTED_TASK_COUNT;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.TASK_COUNT;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.TASK_END_TO_END_TIME;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.TASK_EXECUTION_TIME;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.TASK_QUEUE_WAIT_TIME;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_REJECTED_TASK_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_END_TO_END_TIME;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_EXECUTED_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_EXECUTION_TIME;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_QUEUE_WAIT_TIME;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
@@ -36,6 +36,10 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.Nullable;
+
+import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
*
@@ -63,6 +67,7 @@ public class JobManager<T> extends AbstractRoundRobinQueue<T> {
public static interface JobRunnable<T> extends Runnable {
public Object getJobId();
+ public TaskExecutionMetricsHolder getTaskExecutionMetric();
}
public static ThreadPoolExecutor createThreadPoolExec(int keepAliveMs, int size, int queueSize, boolean useInstrumentedThreadPool) {
@@ -117,13 +122,17 @@ public class JobManager<T> extends AbstractRoundRobinQueue<T> {
*/
static class JobFutureTask<T> extends FutureTask<T> {
private final Object jobId;
+ @Nullable
+ private final TaskExecutionMetricsHolder taskMetric;
public JobFutureTask(Runnable r, T t) {
super(r, t);
if(r instanceof JobRunnable){
this.jobId = ((JobRunnable)r).getJobId();
+ this.taskMetric = ((JobRunnable)r).getTaskExecutionMetric();
} else {
this.jobId = this;
+ this.taskMetric = null;
}
}
@@ -132,8 +141,10 @@ public class JobManager<T> extends AbstractRoundRobinQueue<T> {
// FIXME: this fails when executor used by hbase
if (c instanceof JobCallable) {
this.jobId = ((JobCallable<T>) c).getJobId();
+ this.taskMetric = ((JobCallable<T>) c).getTaskExecutionMetric();
} else {
this.jobId = this;
+ this.taskMetric = null;
}
}
@@ -187,6 +198,7 @@ public class JobManager<T> extends AbstractRoundRobinQueue<T> {
*/
public static interface JobCallable<T> extends Callable<T> {
public Object getJobId();
+ public TaskExecutionMetricsHolder getTaskExecutionMetric();
}
@@ -224,27 +236,40 @@ public class JobManager<T> extends AbstractRoundRobinQueue<T> {
private final RejectedExecutionHandler rejectedExecHandler = new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
- REJECTED_TASK_COUNT.increment();
+ TaskExecutionMetricsHolder metrics = getRequestMetric(r);
+ if (metrics != null) {
+ metrics.getNumRejectedTasks().increment();
+ }
+ GLOBAL_REJECTED_TASK_COUNTER.increment();
throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + executor.toString());
}
};
- public InstrumentedThreadPoolExecutor(String threadPoolName, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
- BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
+ public InstrumentedThreadPoolExecutor(String threadPoolName, int corePoolSize, int maximumPoolSize,
+ long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
setRejectedExecutionHandler(rejectedExecHandler);
}
@Override
public void execute(Runnable task) {
- TASK_COUNT.increment();
+ TaskExecutionMetricsHolder metrics = getRequestMetric(task);
+ if (metrics != null) {
+ metrics.getNumTasks().increment();
+ }
+ GLOBAL_TASK_EXECUTED_COUNTER.increment();
super.execute(task);
}
@Override
protected void beforeExecute(Thread worker, Runnable task) {
InstrumentedJobFutureTask instrumentedTask = (InstrumentedJobFutureTask)task;
- TASK_QUEUE_WAIT_TIME.update(System.currentTimeMillis() - instrumentedTask.getTaskSubmissionTime());
+ long queueWaitTime = System.currentTimeMillis() - instrumentedTask.getTaskSubmissionTime();
+ GLOBAL_TASK_QUEUE_WAIT_TIME.update(queueWaitTime);
+ TaskExecutionMetricsHolder metrics = getRequestMetric(task);
+ if (metrics != null) {
+ metrics.getTaskQueueWaitTime().change(queueWaitTime);
+ }
super.beforeExecute(worker, instrumentedTask);
}
@@ -254,10 +279,21 @@ public class JobManager<T> extends AbstractRoundRobinQueue<T> {
try {
super.afterExecute(instrumentedTask, t);
} finally {
- TASK_EXECUTION_TIME.update(System.currentTimeMillis() - instrumentedTask.getTaskExecutionStartTime());
- TASK_END_TO_END_TIME.update(System.currentTimeMillis() - instrumentedTask.getTaskSubmissionTime());
+ long taskExecutionTime = System.currentTimeMillis() - instrumentedTask.getTaskExecutionStartTime();
+ long endToEndTaskTime = System.currentTimeMillis() - instrumentedTask.getTaskSubmissionTime();
+ TaskExecutionMetricsHolder metrics = getRequestMetric(task);
+ if (metrics != null) {
+ metrics.getTaskExecutionTime().change(taskExecutionTime);
+ metrics.getTaskEndToEndTime().change(endToEndTaskTime);
+ }
+ GLOBAL_TASK_EXECUTION_TIME.update(taskExecutionTime);
+ GLOBAL_TASK_END_TO_END_TIME.update(endToEndTaskTime);
}
}
+
+ private static TaskExecutionMetricsHolder getRequestMetric(Runnable task) {
+ return ((JobFutureTask)task).taskMetric;
+ }
}
}
[2/4] phoenix git commit: PHOENIX-1819 Build a framework to capture
and report phoenix client side request level metrics
Posted by sa...@apache.org.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
index 5270277..bb4054b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
@@ -57,6 +57,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixDriver;
import org.apache.phoenix.job.JobManager;
+import org.apache.phoenix.monitoring.GlobalClientMetrics;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
@@ -255,12 +256,9 @@ public class CsvBulkLoadTool extends Configured implements Tool {
}
List<Future<Boolean>> runningJobs = new ArrayList<Future<Boolean>>();
- boolean useInstrumentedPool = conn
- .unwrap(PhoenixConnection.class)
- .getQueryServices()
- .getProps()
- .getBoolean(QueryServices.METRICS_ENABLED,
- QueryServicesOptions.DEFAULT_IS_METRICS_ENABLED);
+ boolean useInstrumentedPool = GlobalClientMetrics.isMetricsEnabled()
+ || conn.unwrap(PhoenixConnection.class).isRequestLevelMetricsEnabled();
+
ExecutorService executor =
JobManager.createThreadPoolExec(Integer.MAX_VALUE, 5, 20, useInstrumentedPool);
try{
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
index eb6dc3d..b500a25 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
@@ -17,6 +17,8 @@
*/
package org.apache.phoenix.mapreduce;
+import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
+
import java.io.IOException;
import java.sql.SQLException;
import java.util.List;
@@ -32,6 +34,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.iterate.ConcatResultIterator;
import org.apache.phoenix.iterate.LookAheadResultIterator;
import org.apache.phoenix.iterate.PeekingResultIterator;
@@ -40,6 +43,7 @@ import org.apache.phoenix.iterate.RoundRobinResultIterator;
import org.apache.phoenix.iterate.SequenceResultIterator;
import org.apache.phoenix.iterate.TableResultIterator;
import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.monitoring.ReadMetricQueue;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
@@ -100,8 +104,12 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null
final List<Scan> scans = pSplit.getScans();
try {
List<PeekingResultIterator> iterators = Lists.newArrayListWithExpectedSize(scans.size());
+ StatementContext ctx = queryPlan.getContext();
+ ReadMetricQueue readMetrics = ctx.getReadMetricsQueue();
+ String tableName = queryPlan.getTableRef().getTable().getPhysicalName().getString();
for (Scan scan : scans) {
- final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext(), queryPlan.getTableRef(), scan);
+ final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext(),
+ queryPlan.getTableRef(), scan, readMetrics.allotMetric(SCAN_BYTES, tableName));
PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator);
iterators.add(peekingResultIterator);
}
@@ -112,7 +120,7 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null
this.resultIterator = iterator;
// Clone the row projector as it's not thread safe and would be used simultaneously by
// multiple threads otherwise.
- this.resultSet = new PhoenixResultSet(this.resultIterator, queryPlan.getProjector().cloneIfNecessary(),queryPlan.getContext().getStatement());
+ this.resultSet = new PhoenixResultSet(this.resultIterator, queryPlan.getProjector().cloneIfNecessary(), queryPlan.getContext());
} catch (SQLException e) {
LOG.error(String.format(" Error [%s] initializing PhoenixRecordReader. ",e.getMessage()));
Throwables.propagate(e);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java b/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java
index 02c1dea..79b49c5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java
@@ -17,9 +17,6 @@
*/
package org.apache.phoenix.memory;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MEMORY_MANAGER_BYTES;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MEMORY_WAIT_TIME;
-
import org.apache.http.annotation.GuardedBy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -92,8 +89,6 @@ public class GlobalMemoryManager implements MemoryManager {
}
usedMemoryBytes += nBytes;
}
- MEMORY_WAIT_TIME.update(System.currentTimeMillis() - startTimeMs);
- MEMORY_MANAGER_BYTES.update(nBytes);
return nBytes;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/AtomicMetric.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/AtomicMetric.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/AtomicMetric.java
new file mode 100644
index 0000000..796e8ba
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/AtomicMetric.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Version of {@link Metric} that can be used when the metric is being concurrently accessed or modified by multiple
+ * threads.
+ */
+public class AtomicMetric implements Metric {
+
+ private final MetricType type;
+ private final AtomicLong value = new AtomicLong();
+
+ public AtomicMetric(MetricType type) {
+ this.type = type;
+ }
+
+ @Override
+ public String getName() {
+ return type.name();
+ }
+
+ @Override
+ public String getDescription() {
+ return type.description();
+ }
+
+ @Override
+ public long getValue() {
+ return value.get();
+ }
+
+ @Override
+ public void change(long delta) {
+ value.addAndGet(delta);
+ }
+
+ @Override
+ public void increment() {
+ value.incrementAndGet();
+ }
+
+ @Override
+ public String getCurrentMetricState() {
+ return getName() + ": " + value.get();
+ }
+
+ @Override
+ public void reset() {
+ value.set(0);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetric.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetric.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetric.java
new file mode 100644
index 0000000..7ebb0c1
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetric.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+
+
+/**
+ * Interface for representing a metric that could be published and possibly combined with a metric of the same
+ * type.
+ */
+public interface CombinableMetric extends Metric {
+
+ String getPublishString();
+
+ CombinableMetric combine(CombinableMetric metric);
+
+ public class NoOpRequestMetric implements CombinableMetric {
+
+ public static NoOpRequestMetric INSTANCE = new NoOpRequestMetric();
+ private static final String EMPTY_STRING = "";
+
+ @Override
+ public String getName() {
+ return EMPTY_STRING;
+ }
+
+ @Override
+ public String getDescription() {
+ return EMPTY_STRING;
+ }
+
+ @Override
+ public long getValue() {
+ return 0;
+ }
+
+ @Override
+ public void change(long delta) {}
+
+ @Override
+ public void increment() {}
+
+ @Override
+ public String getCurrentMetricState() {
+ return EMPTY_STRING;
+ }
+
+ @Override
+ public void reset() {}
+
+ @Override
+ public String getPublishString() {
+ return EMPTY_STRING;
+ }
+
+ @Override
+ public CombinableMetric combine(CombinableMetric metric) {
+ return INSTANCE;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetricImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetricImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetricImpl.java
new file mode 100644
index 0000000..fa6f7d3
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetricImpl.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class CombinableMetricImpl implements CombinableMetric {
+
+ private final Metric metric;
+
+ public CombinableMetricImpl(MetricType type) {
+ metric = new NonAtomicMetric(type);
+ }
+
+ @Override
+ public String getName() {
+ return metric.getName();
+ }
+
+ @Override
+ public String getDescription() {
+ return metric.getDescription();
+ }
+
+ @Override
+ public long getValue() {
+ return metric.getValue();
+ }
+
+ @Override
+ public void change(long delta) {
+ metric.change(delta);
+ }
+
+ @Override
+ public void increment() {
+ metric.increment();
+ }
+
+ @Override
+ public String getCurrentMetricState() {
+ return metric.getCurrentMetricState();
+ }
+
+ @Override
+ public void reset() {
+ metric.reset();
+ }
+
+ @Override
+ public String getPublishString() {
+ return getCurrentMetricState();
+ }
+
+ @Override
+ public CombinableMetric combine(CombinableMetric metric) {
+ checkArgument(this.getClass().equals(metric.getClass()));
+ this.metric.change(metric.getValue());
+ return this;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Counter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Counter.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Counter.java
deleted file mode 100644
index 141294d..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Counter.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.monitoring;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.annotation.concurrent.ThreadSafe;
-
-/**
- * Incrementing only counter that keeps track of the
- * number of occurrences of something.
- *
- */
-@ThreadSafe
-class Counter implements Metric {
-
- private final AtomicLong counter;
- private final String name;
- private final String description;
-
- public Counter(String name, String description) {
- this.name = name;
- this.description = description;
- this.counter = new AtomicLong(0);
- }
-
- public long increment() {
- return counter.incrementAndGet();
- }
-
- public long getCurrentCount() {
- return counter.get();
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public String getDescription() {
- return description;
- }
-
- @Override
- public void reset() {
- counter.set(0);
- }
-
- @Override
- public String toString() {
- return "Name: " + name + ", Current count: " + counter.get();
- }
-
- @Override
- public String getCurrentMetricState() {
- return toString();
- }
-
- @Override
- public long getNumberOfSamples() {
- return getCurrentCount();
- }
-
- @Override
- public long getTotalSum() {
- return getCurrentCount();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
new file mode 100644
index 0000000..a8f3bb4
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import static org.apache.phoenix.monitoring.MetricType.QUERY_FAILED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.MEMORY_CHUNK_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.MEMORY_WAIT_TIME;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_SIZE;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_COMMIT_TIME;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.NUM_PARALLEL_SCANS;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_TIME;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_TIMEOUT_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.TASK_REJECTED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SPOOL_FILE_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SPOOL_FILE_SIZE;
+import static org.apache.phoenix.monitoring.MetricType.TASK_END_TO_END_TIME;
+import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTION_TIME;
+import static org.apache.phoenix.monitoring.MetricType.TASK_QUEUE_WAIT_TIME;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.phoenix.query.QueryServicesOptions;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Central place where we keep track of all the global client phoenix metrics. These metrics are different from
+ * {@link ReadMetricQueue} or {@link MutationMetricQueue} as they are collected at the client JVM level as opposed
+ * to the above two which are collected for every phoenix request.
+ */
+
+public enum GlobalClientMetrics {
+
+ GLOBAL_MUTATION_BATCH_SIZE(MUTATION_BATCH_SIZE),
+ GLOBAL_MUTATION_BYTES(MUTATION_BYTES),
+ GLOBAL_MUTATION_COMMIT_TIME(MUTATION_COMMIT_TIME),
+ GLOBAL_QUERY_TIME(QUERY_TIME),
+ GLOBAL_NUM_PARALLEL_SCANS(NUM_PARALLEL_SCANS),
+ GLOBAL_SCAN_BYTES(SCAN_BYTES),
+ GLOBAL_SPOOL_FILE_SIZE(SPOOL_FILE_SIZE),
+ GLOBAL_MEMORY_CHUNK_BYTES(MEMORY_CHUNK_BYTES),
+ GLOBAL_MEMORY_WAIT_TIME(MEMORY_WAIT_TIME),
+ GLOBAL_TASK_QUEUE_WAIT_TIME(TASK_QUEUE_WAIT_TIME),
+ GLOBAL_TASK_END_TO_END_TIME(TASK_END_TO_END_TIME),
+ GLOBAL_TASK_EXECUTION_TIME(TASK_EXECUTION_TIME),
+ GLOBAL_MUTATION_SQL_COUNTER(MUTATION_SQL_COUNTER),
+ GLOBAL_SELECT_SQL_COUNTER(SELECT_SQL_COUNTER),
+ GLOBAL_TASK_EXECUTED_COUNTER(TASK_EXECUTED_COUNTER),
+ GLOBAL_REJECTED_TASK_COUNTER(TASK_REJECTED_COUNTER),
+ GLOBAL_QUERY_TIMEOUT_COUNTER(QUERY_TIMEOUT_COUNTER),
+ GLOBAL_FAILED_QUERY_COUNTER(QUERY_FAILED_COUNTER),
+ GLOBAL_SPOOL_FILE_COUNTER(SPOOL_FILE_COUNTER);
+
+ private static final boolean isGlobalMetricsEnabled = QueryServicesOptions.withDefaults().isGlobalMetricsEnabled();
+ private GlobalMetric metric;
+
+ public void update(long value) {
+ if (isGlobalMetricsEnabled) {
+ metric.change(value);
+ }
+ }
+
+ @VisibleForTesting
+ public GlobalMetric getMetric() {
+ return metric;
+ }
+
+ @Override
+ public String toString() {
+ return metric.toString();
+ }
+
+ private GlobalClientMetrics(MetricType metricType) {
+ this.metric = new GlobalMetricImpl(metricType);
+ }
+
+ public void increment() {
+ if (isGlobalMetricsEnabled) {
+ metric.increment();
+ }
+ }
+
+ public static Collection<GlobalMetric> getMetrics() {
+ List<GlobalMetric> metrics = new ArrayList<>();
+ for (GlobalClientMetrics m : GlobalClientMetrics.values()) {
+ metrics.add(m.metric);
+ }
+ return metrics;
+ }
+
+ public static boolean isMetricsEnabled() {
+ return isGlobalMetricsEnabled;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetric.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetric.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetric.java
new file mode 100644
index 0000000..f3b562f
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetric.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+/**
+ * Class that exposes the various internal phoenix metrics collected
+ * at the JVM level. Because metrics are dynamic in nature, it is not guaranteed that the
+ * state exposed will always be in sync with each other. One should use
+ * these metrics primarily for monitoring and debugging purposes.
+ */
+public interface GlobalMetric extends Metric {
+
+ /**
+ * @return Number of samples collected since the last {@link #reset()} call.
+ */
+ public long getNumberOfSamples();
+
+ /**
+ * @return Sum of the values of the metric sampled since the last {@link #reset()} call.
+ */
+ public long getTotalSum();
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java
new file mode 100644
index 0000000..26a16e1
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class GlobalMetricImpl implements GlobalMetric {
+
+ private AtomicLong numberOfSamples = new AtomicLong(0);
+ private Metric metric;
+
+ public GlobalMetricImpl(MetricType type) {
+ this.metric = new AtomicMetric(type);
+ }
+
+ /**
+ * Reset the internal state. Typically called after metric information has been collected and a new phase of
+ * collection is being requested for the next interval.
+ */
+ @Override
+ public void reset() {
+ metric.reset();
+ numberOfSamples.set(0);
+ }
+
+ @Override
+ public long getNumberOfSamples() {
+ return numberOfSamples.get();
+ }
+
+ @Override
+ public long getTotalSum() {
+ return metric.getValue();
+ }
+
+ @Override
+ public void change(long delta) {
+ metric.change(delta);
+ numberOfSamples.incrementAndGet();
+ }
+
+ @Override
+ public void increment() {
+ metric.increment();
+ numberOfSamples.incrementAndGet();
+ }
+
+ @Override
+ public String getName() {
+ return metric.getName();
+ }
+
+ @Override
+ public String getDescription() {
+ return metric.getDescription();
+ }
+
+ @Override
+ public long getValue() {
+ return metric.getValue();
+ }
+
+ @Override
+ public String getCurrentMetricState() {
+ return metric.getCurrentMetricState() + ", Number of samples: " + numberOfSamples.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java
new file mode 100644
index 0000000..0e82ce4
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import static org.apache.phoenix.monitoring.MetricType.MEMORY_CHUNK_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.MEMORY_WAIT_TIME;
+
+/**
+ * Class that encapsulates the metrics regarding memory resources needed for servicing a request.
+ */
+public class MemoryMetricsHolder {
+ private final CombinableMetric memoryChunkSizeMetric;
+ private final CombinableMetric memoryWaitTimeMetric;
+ public static final MemoryMetricsHolder NO_OP_INSTANCE = new MemoryMetricsHolder(new ReadMetricQueue(false), null);
+
+ public MemoryMetricsHolder(ReadMetricQueue readMetrics, String tableName) {
+ this.memoryChunkSizeMetric = readMetrics.allotMetric(MEMORY_CHUNK_BYTES, tableName);
+ this.memoryWaitTimeMetric = readMetrics.allotMetric(MEMORY_WAIT_TIME, tableName);
+ }
+
+ public CombinableMetric getMemoryChunkSizeMetric() {
+ return memoryChunkSizeMetric;
+ }
+
+ public CombinableMetric getMemoryWaitTimeMetric() {
+ return memoryWaitTimeMetric;
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java
index aef792c..1ad1c7a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java
@@ -18,47 +18,46 @@
package org.apache.phoenix.monitoring;
/**
- * Interface that exposes the various internal phoenix metrics collected.
- * Because metrics are dynamic in nature, it is not guaranteed that the
- * state exposed will always be in sync with each other. One should use
- * these metrics primarily for monitoring and debugging purposes.
+ * Interface that represents phoenix-internal metric.
*/
public interface Metric {
-
/**
- *
* @return Name of the metric
*/
public String getName();
-
+
/**
- *
* @return Description of the metric
*/
public String getDescription();
-
+
/**
- * Reset the internal state. Typically called after
- * metric information has been collected and a new
- * phase of collection is being requested for the next
- * interval.
+ * @return Current value of the metric
*/
- public void reset();
-
+ public long getValue();
+
/**
+ * Change the metric by the specified amount
*
- * @return String that represents the current state of the metric.
- * Typically used to log the current state.
+ * @param delta
+ * amount by which the metric value should be changed
*/
- public String getCurrentMetricState();
-
+ public void change(long delta);
+
+ /**
+ * Change the value of metric by 1
+ */
+ public void increment();
+
/**
- * @return Number of samples collected since the last {@link #reset()} call.
+ * @return String that represents the current state of the metric. Typically used for logging or reporting purposes.
*/
- public long getNumberOfSamples();
+ public String getCurrentMetricState();
/**
- * @return Sum of the values of the metric sampled since the last {@link #reset()} call.
+ * Reset the metric
*/
- public long getTotalSum();
+ public void reset();
+
}
+
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
new file mode 100644
index 0000000..a0c2a4a
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+public enum MetricType {
+
+ MUTATION_BATCH_SIZE("Batch sizes of mutations"),
+ MUTATION_BYTES("Size of mutations in bytes"),
+ MUTATION_COMMIT_TIME("Time it took to commit mutations"),
+ QUERY_TIME("Query times"),
+ NUM_PARALLEL_SCANS("Number of scans that were executed in parallel"),
+ SCAN_BYTES("Number of bytes read by scans"),
+ MEMORY_CHUNK_BYTES("Number of bytes allocated by the memory manager"),
+ MEMORY_WAIT_TIME("Number of milliseconds threads needed to wait for memory to be allocated through memory manager"),
+ MUTATION_SQL_COUNTER("Counter for number of mutation sql statements"),
+ SELECT_SQL_COUNTER("Counter for number of sql queries"),
+ TASK_QUEUE_WAIT_TIME("Time in milliseconds tasks had to wait in the queue of the thread pool executor"),
+ TASK_END_TO_END_TIME("Time in milliseconds spent by tasks from creation to completion"),
+ TASK_EXECUTION_TIME("Time in milliseconds tasks took to execute"),
+ TASK_EXECUTED_COUNTER("Counter for number of tasks submitted to the thread pool executor"),
+ TASK_REJECTED_COUNTER("Counter for number of tasks that were rejected by the thread pool executor"),
+ QUERY_TIMEOUT_COUNTER("Number of times query timed out"),
+ QUERY_FAILED_COUNTER("Number of times query failed"),
+ SPOOL_FILE_SIZE("Size of spool files created in bytes"),
+ SPOOL_FILE_COUNTER("Number of spool files created"),
+ CACHE_REFRESH_SPLITS_COUNTER("Number of times cache was refreshed because of splits"),
+ WALL_CLOCK_TIME_MS("Wall clock time elapsed for the overall query execution"),
+ RESULT_SET_TIME_MS("Wall clock time elapsed for reading all records using resultSet.next()");
+
+ private final String description;
+
+ private MetricType(String description) {
+ this.description = description;
+ }
+
+ public String description() {
+ return description;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricsStopWatch.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricsStopWatch.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricsStopWatch.java
new file mode 100644
index 0000000..bffb9ad
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricsStopWatch.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import com.google.common.base.Stopwatch;
+
+/**
+ *
+ * Stop watch that is cognizant of the fact whether or not metrics is enabled.
+ * If metrics isn't enabled it doesn't do anything. Otherwise, it delegates
+ * calls to a {@code Stopwatch}.
+ *
+ */
+final class MetricsStopWatch {
+
+ private final boolean isMetricsEnabled;
+ private final Stopwatch stopwatch;
+
+ MetricsStopWatch(boolean isMetricsEnabled) {
+ this.isMetricsEnabled = isMetricsEnabled;
+ this.stopwatch = new Stopwatch();
+ }
+
+ void start() {
+ if (isMetricsEnabled) {
+ stopwatch.start();
+ }
+ }
+
+ void stop() {
+ if (isMetricsEnabled) {
+ if (stopwatch.isRunning()) {
+ stopwatch.stop();
+ }
+ }
+ }
+
+ long getElapsedTimeInMs() {
+ if (isMetricsEnabled) {
+ return stopwatch.elapsedMillis();
+ }
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
new file mode 100644
index 0000000..e90da46
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_SIZE;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_COMMIT_TIME;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * Queue that tracks various writes/mutations related phoenix request metrics.
+ */
+public class MutationMetricQueue {
+
+ // Map of table name -> mutation metric
+ private Map<String, MutationMetric> tableMutationMetric = new HashMap<>();
+
+ public void addMetricsForTable(String tableName, MutationMetric metric) {
+ MutationMetric tableMetric = tableMutationMetric.get(tableName);
+ if (tableMetric == null) {
+ tableMutationMetric.put(tableName, metric);
+ } else {
+ tableMetric.combineMetric(metric);
+ }
+ }
+
+ public void combineMetricQueues(MutationMetricQueue other) {
+ Map<String, MutationMetric> tableMetricMap = other.tableMutationMetric;
+ for (Entry<String, MutationMetric> entry : tableMetricMap.entrySet()) {
+ addMetricsForTable(entry.getKey(), entry.getValue());
+ }
+ }
+
+ /**
+ * Publish the metrics to wherever you want them published. The internal state is cleared out after every publish.
+ * @return map of table name -> list of pair of (metric name, metric value)
+ */
+ public Map<String, Map<String, Long>> aggregate() {
+ Map<String, Map<String, Long>> publishedMetrics = new HashMap<>();
+ for (Entry<String, MutationMetric> entry : tableMutationMetric.entrySet()) {
+ String tableName = entry.getKey();
+ MutationMetric metric = entry.getValue();
+ Map<String, Long> publishedMetricsForTable = publishedMetrics.get(tableName);
+ if (publishedMetricsForTable == null) {
+ publishedMetricsForTable = new HashMap<>();
+ publishedMetrics.put(tableName, publishedMetricsForTable);
+ }
+ publishedMetricsForTable.put(metric.getNumMutations().getName(), metric.getNumMutations().getValue());
+ publishedMetricsForTable.put(metric.getMutationsSizeBytes().getName(), metric.getMutationsSizeBytes().getValue());
+ publishedMetricsForTable.put(metric.getCommitTimeForMutations().getName(), metric.getCommitTimeForMutations().getValue());
+ }
+ return publishedMetrics;
+ }
+
+ public void clearMetrics() {
+ tableMutationMetric.clear(); // help gc
+ }
+
+ /**
+ * Class that holds together the various metrics associated with mutations.
+ */
+ public static class MutationMetric {
+ private final CombinableMetric numMutations = new CombinableMetricImpl(MUTATION_BATCH_SIZE);
+ private final CombinableMetric mutationsSizeBytes = new CombinableMetricImpl(MUTATION_BYTES);
+ private final CombinableMetric totalCommitTimeForMutations = new CombinableMetricImpl(MUTATION_COMMIT_TIME);
+
+ public MutationMetric(long numMutations, long mutationsSizeBytes, long commitTimeForMutations) {
+ this.numMutations.change(numMutations);
+ this.mutationsSizeBytes.change(mutationsSizeBytes);
+ this.totalCommitTimeForMutations.change(commitTimeForMutations);
+ }
+
+ public CombinableMetric getCommitTimeForMutations() {
+ return totalCommitTimeForMutations;
+ }
+
+ public CombinableMetric getNumMutations() {
+ return numMutations;
+ }
+
+ public CombinableMetric getMutationsSizeBytes() {
+ return mutationsSizeBytes;
+ }
+
+ public void combineMetric(MutationMetric other) {
+ this.numMutations.combine(other.numMutations);
+ this.mutationsSizeBytes.combine(other.mutationsSizeBytes);
+ this.totalCommitTimeForMutations.combine(other.totalCommitTimeForMutations);
+ }
+
+ }
+
+ /**
+ * Class to represent a no-op mutation metric. Used in places where request level metric tracking for mutations is not
+ * needed or desired.
+ */
+ public static class NoOpMutationMetricsQueue extends MutationMetricQueue {
+
+ public static final NoOpMutationMetricsQueue NO_OP_MUTATION_METRICS_QUEUE = new NoOpMutationMetricsQueue();
+
+ private NoOpMutationMetricsQueue() {}
+
+ @Override
+ public void addMetricsForTable(String tableName, MutationMetric metric) {}
+
+ @Override
+ public Map<String, Map<String, Long>> aggregate() { return Collections.emptyMap(); }
+
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NonAtomicMetric.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NonAtomicMetric.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NonAtomicMetric.java
new file mode 100644
index 0000000..2d92116
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NonAtomicMetric.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+/**
+ * Version of {@link Metric} that can be used when the metric isn't getting concurrently modified/accessed by multiple
+ * threads and the memory consistency effects of happen-before can be established. For example - phoenix client side
+ * metrics are modified/accessed by only one thread at a time. Further, the actions of threads in the phoenix client
+ * thread pool happen-before the actions of the thread that performs the aggregation of metrics. This makes
+ * {@link NonAtomicMetric} a good fit for storing Phoenix's client side request level metrics.
+ */
+class NonAtomicMetric implements Metric {
+
+ private final MetricType type;
+ private long value;
+
+ public NonAtomicMetric(MetricType type) {
+ this.type = type;
+ }
+
+ @Override
+ public String getName() {
+ return type.name();
+ }
+
+ @Override
+ public String getDescription() {
+ return type.description();
+ }
+
+ @Override
+ public long getValue() {
+ return value;
+ }
+
+ @Override
+ public void change(long delta) {
+ value += delta;
+ }
+
+ @Override
+ public void increment() {
+ value++;
+ }
+
+ @Override
+ public String getCurrentMetricState() {
+ return getName() + ": " + value;
+ }
+
+ @Override
+ public void reset() {
+ value = 0;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java
new file mode 100644
index 0000000..1f71542
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import static org.apache.phoenix.monitoring.MetricType.CACHE_REFRESH_SPLITS_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.NUM_PARALLEL_SCANS;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_FAILED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_TIMEOUT_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.RESULT_SET_TIME_MS;
+import static org.apache.phoenix.monitoring.MetricType.WALL_CLOCK_TIME_MS;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.phoenix.monitoring.CombinableMetric.NoOpRequestMetric;
+
+/**
+ * Class that represents the overall metrics associated with a query being executed by the phoenix.
+ */
+public class OverAllQueryMetrics {
+ private final MetricsStopWatch queryWatch;
+ private final MetricsStopWatch resultSetWatch;
+ private final CombinableMetric numParallelScans;
+ private final CombinableMetric wallClockTimeMS;
+ private final CombinableMetric resultSetTimeMS;
+ private final CombinableMetric queryTimedOut;
+ private final CombinableMetric queryFailed;
+ private final CombinableMetric cacheRefreshedDueToSplits;
+
+ public OverAllQueryMetrics(boolean isMetricsEnabled) {
+ queryWatch = new MetricsStopWatch(isMetricsEnabled);
+ resultSetWatch = new MetricsStopWatch(isMetricsEnabled);
+ numParallelScans = isMetricsEnabled ? new CombinableMetricImpl(NUM_PARALLEL_SCANS) : NoOpRequestMetric.INSTANCE;
+ wallClockTimeMS = isMetricsEnabled ? new CombinableMetricImpl(WALL_CLOCK_TIME_MS) : NoOpRequestMetric.INSTANCE;
+ resultSetTimeMS = isMetricsEnabled ? new CombinableMetricImpl(RESULT_SET_TIME_MS) : NoOpRequestMetric.INSTANCE;
+ queryTimedOut = isMetricsEnabled ? new CombinableMetricImpl(QUERY_TIMEOUT_COUNTER) : NoOpRequestMetric.INSTANCE;
+ queryFailed = isMetricsEnabled ? new CombinableMetricImpl(QUERY_FAILED_COUNTER) : NoOpRequestMetric.INSTANCE;
+ cacheRefreshedDueToSplits = isMetricsEnabled ? new CombinableMetricImpl(CACHE_REFRESH_SPLITS_COUNTER)
+ : NoOpRequestMetric.INSTANCE;
+ }
+
+ public void updateNumParallelScans(long numParallelScans) {
+ this.numParallelScans.change(numParallelScans);
+ }
+
+ public void queryTimedOut() {
+ queryTimedOut.increment();
+ }
+
+ public void queryFailed() {
+ queryFailed.increment();
+ }
+
+ public void cacheRefreshedDueToSplits() {
+ cacheRefreshedDueToSplits.increment();
+ }
+
+ public void startQuery() {
+ queryWatch.start();
+ }
+
+ public void endQuery() {
+ queryWatch.stop();
+ wallClockTimeMS.change(queryWatch.getElapsedTimeInMs());
+ }
+
+ public void startResultSetWatch() {
+ resultSetWatch.start();
+ }
+
+ public void stopResultSetWatch() {
+ resultSetWatch.stop();
+ resultSetTimeMS.change(resultSetWatch.getElapsedTimeInMs());
+ }
+
+ public Map<String, Long> publish() {
+ Map<String, Long> metricsForPublish = new HashMap<>();
+ metricsForPublish.put(numParallelScans.getName(), numParallelScans.getValue());
+ metricsForPublish.put(wallClockTimeMS.getName(), wallClockTimeMS.getValue());
+ metricsForPublish.put(resultSetTimeMS.getName(), resultSetTimeMS.getValue());
+ metricsForPublish.put(queryTimedOut.getName(), queryTimedOut.getValue());
+ metricsForPublish.put(queryFailed.getName(), queryFailed.getValue());
+ metricsForPublish.put(cacheRefreshedDueToSplits.getName(), cacheRefreshedDueToSplits.getValue());
+ return metricsForPublish;
+ }
+
+ public void reset() {
+ numParallelScans.reset();
+ wallClockTimeMS.reset();
+ resultSetTimeMS.reset();
+ queryTimedOut.reset();
+ queryFailed.reset();
+ cacheRefreshedDueToSplits.reset();
+ queryWatch.stop();
+ resultSetWatch.stop();
+ }
+
+ public OverAllQueryMetrics combine(OverAllQueryMetrics metric) {
+ cacheRefreshedDueToSplits.combine(metric.cacheRefreshedDueToSplits);
+ queryFailed.combine(metric.queryFailed);
+ queryTimedOut.combine(metric.queryTimedOut);
+ numParallelScans.combine(metric.numParallelScans);
+ return this;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixMetrics.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixMetrics.java
deleted file mode 100644
index 28e2f2e..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixMetrics.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.monitoring;
-
-/**
- * Central place where we keep track of all the internal
- * phoenix metrics that we track.
- *
- */
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.phoenix.query.QueryServicesOptions;
-
-public class PhoenixMetrics {
- private static final boolean isMetricsEnabled = QueryServicesOptions.withDefaults().isMetricsEnabled();
-
- public static boolean isMetricsEnabled() {
- return isMetricsEnabled;
- }
-
- public enum SizeMetric {
- MUTATION_BATCH_SIZE("CumulativeBatchSizesOfMutations", "Cumulative batch sizes of mutations"),
- MUTATION_BYTES("CumulativeMutationSize", "Cumulative size of mutations in bytes"),
- MUTATION_COMMIT_TIME("CumulativeMutationTime", "Cumulative time it took to send mutations"),
- QUERY_TIME("QueryTime", "Cumulative query times"),
- PARALLEL_SCANS("CumulativeNumberOfParallelScans", "Cumulative number of scans executed that were executed in parallel"),
- SCAN_BYTES("CumulativeScanBytesSize", "Cumulative number of bytes read by scans"),
- SPOOL_FILE_SIZE("CumulativeSpoolFilesSize", "Cumulative size of spool files created in bytes"),
- MEMORY_MANAGER_BYTES("CumulativeBytesAllocated", "Cumulative number of bytes allocated by the memory manager"),
- MEMORY_WAIT_TIME("CumulativeMemoryWaitTime", "Cumulative number of milliseconds threads needed to wait for memory to be allocated through memory manager"),
- TASK_QUEUE_WAIT_TIME("CumulativeTaskQueueWaitTime", "Cumulative time in milliseconds tasks had to wait in the queue of the thread pool executor"),
- TASK_END_TO_END_TIME("CumulativeTaskEndToEndTime", "Cumulative time in milliseconds spent by tasks from creation to completion"),
- TASK_EXECUTION_TIME("CumulativeTaskExecutionTime", "Cumulative time in milliseconds tasks took to execute");
-
- private final SizeStatistic metric;
-
- private SizeMetric(String metricName, String metricDescription) {
- metric = new SizeStatistic(metricName, metricDescription);
- }
-
- public void update(long value) {
- if (isMetricsEnabled) {
- metric.add(value);
- }
- }
-
- // exposed for testing.
- public Metric getMetric() {
- return metric;
- }
-
- @Override
- public String toString() {
- return metric.toString();
- }
- }
-
- public enum CountMetric {
- MUTATION_COUNT("NumMutationCounter", "Counter for number of mutation statements"),
- QUERY_COUNT("NumQueryCounter", "Counter for number of queries"),
- TASK_COUNT("NumberOfTasksCounter", "Counter for number of tasks submitted to the thread pool executor"),
- REJECTED_TASK_COUNT("RejectedTasksCounter", "Counter for number of tasks that were rejected by the thread pool executor"),
- QUERY_TIMEOUT("QueryTimeoutCounter", "Number of times query timed out"),
- FAILED_QUERY("QueryFailureCounter", "Number of times query failed"),
- NUM_SPOOL_FILE("NumSpoolFilesCounter", "Number of spool files created");
-
- private final Counter metric;
-
- private CountMetric(String metricName, String metricDescription) {
- metric = new Counter(metricName, metricDescription);
- }
-
- public void increment() {
- if (isMetricsEnabled) {
- metric.increment();
- }
- }
-
- // exposed for testing.
- public Metric getMetric() {
- return metric;
- }
-
- @Override
- public String toString() {
- return metric.toString();
- }
- }
-
- public static Collection<Metric> getMetrics() {
- List<Metric> metrics = new ArrayList<>();
- for (SizeMetric s : SizeMetric.values()) {
- metrics.add(s.metric);
- }
- for (CountMetric s : CountMetric.values()) {
- metrics.add(s.metric);
- }
- return metrics;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
new file mode 100644
index 0000000..e6c6be2
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.annotation.Nonnull;
+
+import org.apache.phoenix.monitoring.CombinableMetric.NoOpRequestMetric;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Queue of all metrics associated with performing reads from the cluster.
+ */
+public class ReadMetricQueue {
+
+ private static final int MAX_QUEUE_SIZE = 20000; // TODO: should this be configurable?
+
+ private final ConcurrentMap<MetricKey, Queue<CombinableMetric>> metricsMap = new ConcurrentHashMap<>();
+
+ private final boolean isRequestMetricsEnabled;
+
+ public ReadMetricQueue(boolean isRequestMetricsEnabled) {
+ this.isRequestMetricsEnabled = isRequestMetricsEnabled;
+ }
+
+ public CombinableMetric allotMetric(MetricType type, String tableName) {
+ if (!isRequestMetricsEnabled) { return NoOpRequestMetric.INSTANCE; }
+ MetricKey key = new MetricKey(type, tableName);
+ Queue<CombinableMetric> q = getMetricQueue(key);
+ CombinableMetric metric = getMetric(type);
+ q.offer(metric);
+ return metric;
+ }
+
+ @VisibleForTesting
+ public CombinableMetric getMetric(MetricType type) {
+ CombinableMetric metric = new CombinableMetricImpl(type);
+ return metric;
+ }
+
+ /**
+ * @return map of table name -> list of pair of (metric name, metric value)
+ */
+ public Map<String, Map<String, Long>> aggregate() {
+ Map<String, Map<String, Long>> publishedMetrics = new HashMap<>();
+ for (Entry<MetricKey, Queue<CombinableMetric>> entry : metricsMap.entrySet()) {
+ String tableNameToPublish = entry.getKey().tableName;
+ Collection<CombinableMetric> metrics = entry.getValue();
+ if (metrics.size() > 0) {
+ CombinableMetric m = combine(metrics);
+ Map<String, Long> map = publishedMetrics.get(tableNameToPublish);
+ if (map == null) {
+ map = new HashMap<>();
+ publishedMetrics.put(tableNameToPublish, map);
+ }
+ map.put(m.getName(), m.getValue());
+ }
+ }
+ return publishedMetrics;
+ }
+
+ public void clearMetrics() {
+ metricsMap.clear(); // help gc
+ }
+
+ private static CombinableMetric combine(Collection<CombinableMetric> metrics) {
+ int size = metrics.size();
+ if (size == 0) { throw new IllegalArgumentException("Metrics collection needs to have at least one element"); }
+ Iterator<CombinableMetric> itr = metrics.iterator();
+ CombinableMetric combinedMetric = itr.next();
+ while (itr.hasNext()) {
+ combinedMetric = combinedMetric.combine(itr.next());
+ }
+ return combinedMetric;
+ }
+
+ /**
+ * Combine the metrics. This method should only be called in a single threaded manner when the two metric holders
+ * are not getting modified.
+ */
+ public ReadMetricQueue combineReadMetrics(ReadMetricQueue other) {
+ ConcurrentMap<MetricKey, Queue<CombinableMetric>> otherMetricsMap = other.metricsMap;
+ for (Entry<MetricKey, Queue<CombinableMetric>> entry : otherMetricsMap.entrySet()) {
+ MetricKey key = entry.getKey();
+ Queue<CombinableMetric> otherQueue = entry.getValue();
+ CombinableMetric combinedMetric = null;
+ // combine the metrics corresponding to this metric key before putting it in the queue.
+ for (CombinableMetric m : otherQueue) {
+ if (combinedMetric == null) {
+ combinedMetric = m;
+ } else {
+ combinedMetric.combine(m);
+ }
+ }
+ if (combinedMetric != null) {
+ Queue<CombinableMetric> thisQueue = getMetricQueue(key);
+ thisQueue.offer(combinedMetric);
+ }
+ }
+ return this;
+ }
+
+ /**
+ * Inner class whose instances are used as keys in the metrics map.
+ */
+ private static class MetricKey {
+ @Nonnull
+ private final MetricType type;
+
+ @Nonnull
+ private final String tableName;
+
+ MetricKey(MetricType type, String tableName) {
+ checkNotNull(type);
+ checkNotNull(tableName);
+ this.type = type;
+ this.tableName = tableName;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + tableName.hashCode();
+ result = prime * result + type.hashCode();
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (obj == null) return false;
+ if (getClass() != obj.getClass()) return false;
+ MetricKey other = (MetricKey)obj;
+ if (tableName.equals(other.tableName) && type == other.type) return true;
+ return false;
+ }
+
+ }
+
+ private Queue<CombinableMetric> getMetricQueue(MetricKey key) {
+ Queue<CombinableMetric> q = metricsMap.get(key);
+ if (q == null) {
+ q = new LinkedBlockingQueue<CombinableMetric>(MAX_QUEUE_SIZE);
+ Queue<CombinableMetric> curQ = metricsMap.putIfAbsent(key, q);
+ if (curQ != null) {
+ q = curQ;
+ }
+ }
+ return q;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SizeStatistic.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SizeStatistic.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SizeStatistic.java
deleted file mode 100644
index 9eca754..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SizeStatistic.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.monitoring;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- *
- * Statistic that keeps track of the sum of long values that
- * could be used to represent a phoenix metric. For performance
- * reasons the internal state in this metric is not strictly covariant
- * and hence should only be used for monitoring and debugging purposes.
- */
-class SizeStatistic implements Metric {
-
- private final AtomicLong total = new AtomicLong(0);
- private final AtomicLong numSamples = new AtomicLong(0);
- private final String name;
- private final String description;
-
- public SizeStatistic(String name, String description) {
- this.name = name;
- this.description = description;
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public String getDescription() {
- return description;
- }
-
- @Override
- public void reset() {
- total.set(0);
- numSamples.set(0);
- }
-
- @Override
- public String getCurrentMetricState() {
- return "Name:" + description + ", Total: " + total.get() + ", Number of samples: " + numSamples.get();
- }
-
- @Override
- public long getNumberOfSamples() {
- return numSamples.get();
- }
-
- @Override
- public long getTotalSum() {
- return total.get();
- }
-
- public long add(long value) {
- // there is a race condition here but what the heck.
- numSamples.incrementAndGet();
- return total.addAndGet(value);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SpoolingMetricsHolder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SpoolingMetricsHolder.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SpoolingMetricsHolder.java
new file mode 100644
index 0000000..4373887
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SpoolingMetricsHolder.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+
+/**
+ * Class that encapsulates the various metrics associated with the spooling done by phoenix as part of servicing a
+ * request.
+ */
+public class SpoolingMetricsHolder {
+
+ private final CombinableMetric spoolFileSizeMetric;
+ private final CombinableMetric numSpoolFileMetric;
+ public static final SpoolingMetricsHolder NO_OP_INSTANCE = new SpoolingMetricsHolder(new ReadMetricQueue(false), "");
+
+ public SpoolingMetricsHolder(ReadMetricQueue readMetrics, String tableName) {
+ this.spoolFileSizeMetric = readMetrics.allotMetric(MetricType.SPOOL_FILE_SIZE, tableName);
+ this.numSpoolFileMetric = readMetrics.allotMetric(MetricType.SPOOL_FILE_COUNTER, tableName);
+ }
+
+ public CombinableMetric getSpoolFileSizeMetric() {
+ return spoolFileSizeMetric;
+ }
+
+ public CombinableMetric getNumSpoolFileMetric() {
+ return numSpoolFileMetric;
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TaskExecutionMetricsHolder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TaskExecutionMetricsHolder.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TaskExecutionMetricsHolder.java
new file mode 100644
index 0000000..98ff57c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TaskExecutionMetricsHolder.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import static org.apache.phoenix.monitoring.MetricType.TASK_END_TO_END_TIME;
+import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTION_TIME;
+import static org.apache.phoenix.monitoring.MetricType.TASK_QUEUE_WAIT_TIME;
+import static org.apache.phoenix.monitoring.MetricType.TASK_REJECTED_COUNTER;
+
+
+/**
+ * Class to encapsulate the various metrics associated with submitting and executing a task to the phoenix client
+ * thread pool.
+ */
+public class TaskExecutionMetricsHolder {
+
+ private final CombinableMetric taskQueueWaitTime;
+ private final CombinableMetric taskEndToEndTime;
+ private final CombinableMetric taskExecutionTime;
+ private final CombinableMetric numTasks;
+ private final CombinableMetric numRejectedTasks;
+ public static final TaskExecutionMetricsHolder NO_OP_INSTANCE = new TaskExecutionMetricsHolder(new ReadMetricQueue(false), "");
+
+ public TaskExecutionMetricsHolder(ReadMetricQueue readMetrics, String tableName) {
+ taskQueueWaitTime = readMetrics.allotMetric(TASK_QUEUE_WAIT_TIME, tableName);
+ taskEndToEndTime = readMetrics.allotMetric(TASK_END_TO_END_TIME, tableName);
+ taskExecutionTime = readMetrics.allotMetric(TASK_EXECUTION_TIME, tableName);
+ numTasks = readMetrics.allotMetric(TASK_EXECUTED_COUNTER, tableName);
+ numRejectedTasks = readMetrics.allotMetric(TASK_REJECTED_COUNTER, tableName);
+ }
+
+ public CombinableMetric getTaskQueueWaitTime() {
+ return taskQueueWaitTime;
+ }
+
+ public CombinableMetric getTaskEndToEndTime() {
+ return taskEndToEndTime;
+ }
+
+ public CombinableMetric getTaskExecutionTime() {
+ return taskExecutionTime;
+ }
+
+ public CombinableMetric getNumTasks() {
+ return numTasks;
+ }
+
+ public CombinableMetric getNumRejectedTasks() {
+ return numRejectedTasks;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
index 898a919..c16b86d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
@@ -45,7 +45,7 @@ public abstract class BaseQueryServicesImpl implements QueryServices {
options.getKeepAliveMs(),
options.getThreadPoolSize(),
options.getQueueSize(),
- options.isMetricsEnabled());
+ options.isGlobalMetricsEnabled());
this.memoryManager = new GlobalMemoryManager(
Runtime.getRuntime().maxMemory() * options.getMaxMemoryPerc() / 100,
options.getMaxMemoryWaitMs());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 3e7d084..825cc83 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -157,7 +157,7 @@ public interface QueryServices extends SQLCloseable {
public static final String DELAY_FOR_SCHEMA_UPDATE_CHECK = "phoenix.schema.change.delay";
public static final String DEFAULT_KEEP_DELETED_CELLS_ATTRIB = "phoenix.table.default.keep.deleted.cells";
public static final String DEFAULT_STORE_NULLS_ATTRIB = "phoenix.table.default.store.nulls";
- public static final String METRICS_ENABLED = "phoenix.query.metrics.enabled";
+ public static final String GLOBAL_METRICS_ENABLED = "phoenix.query.global.metrics.enabled";
// rpc queue configs
public static final String INDEX_HANDLER_COUNT_ATTRIB = "phoenix.rpc.index.handler.count";
@@ -165,6 +165,7 @@ public interface QueryServices extends SQLCloseable {
public static final String FORCE_ROW_KEY_ORDER_ATTRIB = "phoenix.query.force.rowkeyorder";
public static final String ALLOW_USER_DEFINED_FUNCTIONS_ATTRIB = "phoenix.functions.allowUserDefinedFunctions";
+ public static final String COLLECT_REQUEST_LEVEL_METRICS = "phoenix.query.request.metrics.enabled";
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 02c695e..4e8879b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -18,15 +18,16 @@
package org.apache.phoenix.query;
import static org.apache.phoenix.query.QueryServices.ALLOW_ONLINE_TABLE_SCHEMA_UPDATE;
-import static org.apache.phoenix.query.QueryServices.ALLOW_USER_DEFINED_FUNCTIONS_ATTRIB;
import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_PRODUCER_ATTRIB_NAME;
import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_ROUND_ROBIN_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.COLLECT_REQUEST_LEVEL_METRICS;
import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB;
import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_TIMEZONE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.DELAY_FOR_SCHEMA_UPDATE_CHECK;
import static org.apache.phoenix.query.QueryServices.DROP_METADATA_ATTRIB;
import static org.apache.phoenix.query.QueryServices.EXPLAIN_CHUNK_COUNT_ATTRIB;
import static org.apache.phoenix.query.QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.GLOBAL_METRICS_ENABLED;
import static org.apache.phoenix.query.QueryServices.GROUPBY_MAX_CACHE_SIZE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILLABLE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILL_FILES_ATTRIB;
@@ -43,7 +44,6 @@ import static org.apache.phoenix.query.QueryServices.MAX_SERVER_CACHE_TIME_TO_LI
import static org.apache.phoenix.query.QueryServices.MAX_SERVER_METADATA_CACHE_SIZE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MAX_SPOOL_TO_DISK_BYTES_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MAX_TENANT_MEMORY_PERC_ATTRIB;
-import static org.apache.phoenix.query.QueryServices.METRICS_ENABLED;
import static org.apache.phoenix.query.QueryServices.MIN_STATS_UPDATE_FREQ_MS_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK;
@@ -187,13 +187,14 @@ public class QueryServicesOptions {
// TODO Change this to true as part of PHOENIX-1543
public static final boolean DEFAULT_AUTO_COMMIT = false;
- public static final boolean DEFAULT_IS_METRICS_ENABLED = true;
+ public static final boolean DEFAULT_IS_GLOBAL_METRICS_ENABLED = true;
private static final String DEFAULT_CLIENT_RPC_CONTROLLER_FACTORY = ClientRpcControllerFactory.class.getName();
public static final boolean DEFAULT_USE_BYTE_BASED_REGEX = false;
public static final boolean DEFAULT_FORCE_ROW_KEY_ORDER = false;
public static final boolean DEFAULT_ALLOW_USER_DEFINED_FUNCTIONS = false;
+ public static final boolean DEFAULT_REQUEST_LEVEL_METRICS_ENABLED = false;
private final Configuration config;
@@ -246,10 +247,11 @@ public class QueryServicesOptions {
.setIfUnset(ALLOW_ONLINE_TABLE_SCHEMA_UPDATE, DEFAULT_ALLOW_ONLINE_TABLE_SCHEMA_UPDATE)
.setIfUnset(NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK, DEFAULT_RETRIES_FOR_SCHEMA_UPDATE_CHECK)
.setIfUnset(DELAY_FOR_SCHEMA_UPDATE_CHECK, DEFAULT_DELAY_FOR_SCHEMA_UPDATE_CHECK)
- .setIfUnset(METRICS_ENABLED, DEFAULT_IS_METRICS_ENABLED)
+ .setIfUnset(GLOBAL_METRICS_ENABLED, DEFAULT_IS_GLOBAL_METRICS_ENABLED)
.setIfUnset(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, DEFAULT_CLIENT_RPC_CONTROLLER_FACTORY)
.setIfUnset(USE_BYTE_BASED_REGEX_ATTRIB, DEFAULT_USE_BYTE_BASED_REGEX)
- .setIfUnset(FORCE_ROW_KEY_ORDER_ATTRIB, DEFAULT_FORCE_ROW_KEY_ORDER);
+ .setIfUnset(FORCE_ROW_KEY_ORDER_ATTRIB, DEFAULT_FORCE_ROW_KEY_ORDER)
+ .setIfUnset(COLLECT_REQUEST_LEVEL_METRICS, DEFAULT_REQUEST_LEVEL_METRICS_ENABLED)
;
// HBase sets this to 1, so we reset it to something more appropriate.
// Hopefully HBase will change this, because we can't know if a user set
@@ -445,10 +447,10 @@ public class QueryServicesOptions {
return config.getInt(GROUPBY_SPILL_FILES_ATTRIB, DEFAULT_GROUPBY_SPILL_FILES);
}
- public boolean isMetricsEnabled() {
- return config.getBoolean(METRICS_ENABLED, DEFAULT_IS_METRICS_ENABLED);
+ public boolean isGlobalMetricsEnabled() {
+ return config.getBoolean(GLOBAL_METRICS_ENABLED, DEFAULT_IS_GLOBAL_METRICS_ENABLED);
}
-
+
public boolean isUseByteBasedRegex() {
return config.getBoolean(USE_BYTE_BASED_REGEX_ATTRIB, DEFAULT_USE_BYTE_BASED_REGEX);
}
@@ -526,11 +528,7 @@ public class QueryServicesOptions {
return this;
}
- public QueryServicesOptions setMetricsEnabled(boolean flag) {
- config.setBoolean(METRICS_ENABLED, flag);
- return this;
- }
-
+
public QueryServicesOptions setUseByteBasedRegex(boolean flag) {
config.setBoolean(USE_BYTE_BASED_REGEX_ATTRIB, flag);
return this;
@@ -540,4 +538,5 @@ public class QueryServicesOptions {
config.setBoolean(FORCE_ROW_KEY_ORDER_ATTRIB, forceRowKeyOrder);
return this;
}
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java b/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java
index 265fc78..159e0c9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java
@@ -17,12 +17,23 @@
*/
package org.apache.phoenix.trace;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterators;
-import org.apache.commons.configuration.Configuration;
+import static org.apache.phoenix.metrics.MetricInfo.ANNOTATION;
+import static org.apache.phoenix.metrics.MetricInfo.DESCRIPTION;
+import static org.apache.phoenix.metrics.MetricInfo.END;
+import static org.apache.phoenix.metrics.MetricInfo.HOSTNAME;
+import static org.apache.phoenix.metrics.MetricInfo.PARENT;
+import static org.apache.phoenix.metrics.MetricInfo.SPAN;
+import static org.apache.phoenix.metrics.MetricInfo.START;
+import static org.apache.phoenix.metrics.MetricInfo.TAG;
+import static org.apache.phoenix.metrics.MetricInfo.TRACE;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
import org.apache.commons.configuration.SubsetConfiguration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -31,20 +42,15 @@ import org.apache.hadoop.metrics2.AbstractMetric;
import org.apache.hadoop.metrics2.MetricsRecord;
import org.apache.hadoop.metrics2.MetricsSink;
import org.apache.hadoop.metrics2.MetricsTag;
-import org.apache.phoenix.metrics.*;
+import org.apache.phoenix.metrics.MetricInfo;
+import org.apache.phoenix.metrics.Metrics;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.trace.util.Tracing;
import org.apache.phoenix.util.QueryUtil;
-import javax.annotation.Nullable;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.*;
-
-import static org.apache.phoenix.metrics.MetricInfo.*;
-import static org.apache.phoenix.metrics.MetricInfo.HOSTNAME;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
/**
* Write the metrics to a phoenix table.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
index 06534d1..cbe5a1a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
@@ -154,4 +154,9 @@ public class JDBCUtil {
}
return Boolean.valueOf(autoCommit);
}
+
+ public static boolean isCollectingRequestLevelMetricsEnabled(String url, Properties overrideProps, ReadOnlyProps queryServicesProps) throws SQLException {
+ String batchSizeStr = findProperty(url, overrideProps, PhoenixRuntime.REQUEST_METRIC_ATTRIB);
+ return (batchSizeStr == null ? queryServicesProps.getBoolean(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, QueryServicesOptions.DEFAULT_REQUEST_LEVEL_METRICS_ENABLED) : Boolean.parseBoolean(batchSizeStr));
+ }
}