You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2015/06/27 01:29:06 UTC
[4/4] phoenix git commit: PHOENIX-1819 Build a framework to capture
and report phoenix client side request level metrics
PHOENIX-1819 Build a framework to capture and report phoenix client side request level metrics
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/7e29d57b
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/7e29d57b
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/7e29d57b
Branch: refs/heads/4.x-HBase-0.98
Commit: 7e29d57bda0eb66b1dbd102060253e1b1468f052
Parents: 006aec5
Author: Samarth <sa...@salesforce.com>
Authored: Fri Jun 26 16:28:50 2015 -0700
Committer: Samarth <sa...@salesforce.com>
Committed: Fri Jun 26 16:28:50 2015 -0700
----------------------------------------------------------------------
.../phoenix/end2end/PhoenixMetricsIT.java | 147 ----
.../apache/phoenix/execute/PartialCommitIT.java | 1 +
.../phoenix/monitoring/PhoenixMetricsIT.java | 815 +++++++++++++++++++
.../apache/phoenix/cache/ServerCacheClient.java | 7 +
.../apache/phoenix/compile/DeleteCompiler.java | 50 +-
.../MutatingParallelIteratorFactory.java | 51 +-
.../phoenix/compile/StatementContext.java | 49 +-
.../apache/phoenix/compile/UpsertCompiler.java | 80 +-
.../apache/phoenix/execute/AggregatePlan.java | 8 +-
.../apache/phoenix/execute/HashJoinPlan.java | 7 +
.../apache/phoenix/execute/MutationState.java | 290 ++++---
.../org/apache/phoenix/execute/UnionPlan.java | 8 +-
.../phoenix/iterate/BaseResultIterators.java | 15 +-
.../phoenix/iterate/ChunkedResultIterator.java | 21 +-
.../iterate/ParallelIteratorFactory.java | 4 +-
.../phoenix/iterate/ParallelIterators.java | 25 +-
.../iterate/RoundRobinResultIterator.java | 4 +-
.../phoenix/iterate/ScanningResultIterator.java | 38 +-
.../apache/phoenix/iterate/SerialIterators.java | 23 +-
.../phoenix/iterate/SpoolingResultIterator.java | 49 +-
.../phoenix/iterate/TableResultIterator.java | 17 +-
.../phoenix/iterate/UnionResultIterators.java | 70 +-
.../apache/phoenix/jdbc/PhoenixConnection.java | 28 +-
.../phoenix/jdbc/PhoenixDatabaseMetaData.java | 21 +-
.../apache/phoenix/jdbc/PhoenixResultSet.java | 48 +-
.../apache/phoenix/jdbc/PhoenixStatement.java | 20 +-
.../java/org/apache/phoenix/job/JobManager.java | 60 +-
.../phoenix/mapreduce/CsvBulkLoadTool.java | 10 +-
.../phoenix/mapreduce/PhoenixRecordReader.java | 12 +-
.../phoenix/memory/GlobalMemoryManager.java | 5 -
.../apache/phoenix/monitoring/AtomicMetric.java | 70 ++
.../phoenix/monitoring/CombinableMetric.java | 77 ++
.../monitoring/CombinableMetricImpl.java | 77 ++
.../org/apache/phoenix/monitoring/Counter.java | 85 --
.../phoenix/monitoring/GlobalClientMetrics.java | 117 +++
.../apache/phoenix/monitoring/GlobalMetric.java | 37 +
.../phoenix/monitoring/GlobalMetricImpl.java | 74 ++
.../phoenix/monitoring/MemoryMetricsHolder.java | 43 +
.../org/apache/phoenix/monitoring/Metric.java | 45 +-
.../apache/phoenix/monitoring/MetricType.java | 55 ++
.../phoenix/monitoring/MetricsStopWatch.java | 59 ++
.../phoenix/monitoring/MutationMetricQueue.java | 131 +++
.../phoenix/monitoring/NonAtomicMetric.java | 71 ++
.../phoenix/monitoring/OverAllQueryMetrics.java | 121 +++
.../phoenix/monitoring/PhoenixMetrics.java | 118 ---
.../phoenix/monitoring/ReadMetricQueue.java | 180 ++++
.../phoenix/monitoring/SizeStatistic.java | 78 --
.../monitoring/SpoolingMetricsHolder.java | 43 +
.../monitoring/TaskExecutionMetricsHolder.java | 68 ++
.../phoenix/query/BaseQueryServicesImpl.java | 2 +-
.../org/apache/phoenix/query/QueryServices.java | 3 +-
.../phoenix/query/QueryServicesOptions.java | 25 +-
.../phoenix/trace/PhoenixMetricsSink.java | 36 +-
.../java/org/apache/phoenix/util/JDBCUtil.java | 5 +
.../org/apache/phoenix/util/PhoenixRuntime.java | 175 +++-
.../iterate/SpoolingResultIteratorTest.java | 4 +-
56 files changed, 2933 insertions(+), 849 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixMetricsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixMetricsIT.java
deleted file mode 100644
index edb4042..0000000
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixMetricsIT.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.end2end;
-
-import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.FAILED_QUERY;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.MUTATION_COUNT;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.NUM_SPOOL_FILE;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.QUERY_COUNT;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.QUERY_TIMEOUT;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.REJECTED_TASK_COUNT;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MUTATION_BATCH_SIZE;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MUTATION_BYTES;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MUTATION_COMMIT_TIME;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.PARALLEL_SCANS;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.QUERY_TIME;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.SCAN_BYTES;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-
-import org.apache.phoenix.monitoring.Metric;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.junit.Test;
-
-public class PhoenixMetricsIT extends BaseHBaseManagedTimeIT {
-
- @Test
- public void testResetPhoenixMetrics() {
- resetMetrics();
- for (Metric m : PhoenixRuntime.getInternalPhoenixMetrics()) {
- assertEquals(0, m.getTotalSum());
- assertEquals(0, m.getNumberOfSamples());
- }
- }
-
- @Test
- public void testPhoenixMetricsForQueries() throws Exception {
- createTableAndInsertValues("T", true);
- resetMetrics(); // we want to count metrics related only to the below query
- Connection conn = DriverManager.getConnection(getUrl());
- String query = "SELECT * FROM T";
- ResultSet rs = conn.createStatement().executeQuery(query);
- while (rs.next()) {
- rs.getString(1);
- rs.getString(2);
- }
- assertEquals(1, PARALLEL_SCANS.getMetric().getTotalSum());
- assertEquals(1, QUERY_COUNT.getMetric().getTotalSum());
- assertEquals(0, REJECTED_TASK_COUNT.getMetric().getTotalSum());
- assertEquals(0, QUERY_TIMEOUT.getMetric().getTotalSum());
- assertEquals(0, FAILED_QUERY.getMetric().getTotalSum());
- assertEquals(0, NUM_SPOOL_FILE.getMetric().getTotalSum());
- assertEquals(0, MUTATION_BATCH_SIZE.getMetric().getTotalSum());
- assertEquals(0, MUTATION_BYTES.getMetric().getTotalSum());
- assertEquals(0, MUTATION_COMMIT_TIME.getMetric().getTotalSum());
-
- assertTrue(SCAN_BYTES.getMetric().getTotalSum() > 0);
- assertTrue(QUERY_TIME.getMetric().getTotalSum() > 0);
- }
-
- @Test
- public void testPhoenixMetricsForMutations() throws Exception {
- createTableAndInsertValues("T", true);
- assertEquals(10, MUTATION_BATCH_SIZE.getMetric().getTotalSum());
- assertEquals(10, MUTATION_COUNT.getMetric().getTotalSum());
- assertTrue(MUTATION_BYTES.getMetric().getTotalSum() > 0);
- assertTrue(MUTATION_COMMIT_TIME.getMetric().getTotalSum() > 0);
- assertEquals(0, PARALLEL_SCANS.getMetric().getTotalSum());
- assertEquals(0, QUERY_COUNT.getMetric().getTotalSum());
- assertEquals(0, REJECTED_TASK_COUNT.getMetric().getTotalSum());
- assertEquals(0, QUERY_TIMEOUT.getMetric().getTotalSum());
- assertEquals(0, FAILED_QUERY.getMetric().getTotalSum());
- assertEquals(0, NUM_SPOOL_FILE.getMetric().getTotalSum());
- }
-
-
- @Test
- public void testPhoenixMetricsForUpsertSelect() throws Exception {
- createTableAndInsertValues("T", true);
- resetMetrics();
- String ddl = "CREATE TABLE T2 (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)";
- Connection conn = DriverManager.getConnection(getUrl());
- conn.createStatement().execute(ddl);
- resetMetrics();
- String dml = "UPSERT INTO T2 (K, V) SELECT K, V FROM T";
- conn.createStatement().executeUpdate(dml);
- conn.commit();
- assertEquals(10, MUTATION_BATCH_SIZE.getMetric().getTotalSum());
- assertEquals(1, MUTATION_COUNT.getMetric().getTotalSum());
- assertEquals(1, PARALLEL_SCANS.getMetric().getTotalSum());
- assertEquals(0, QUERY_TIME.getMetric().getTotalSum());
- assertTrue(SCAN_BYTES.getMetric().getTotalSum() > 0);
- assertTrue(MUTATION_BYTES.getMetric().getTotalSum() > 0);
- assertTrue(MUTATION_COMMIT_TIME.getMetric().getTotalSum() > 0);
- assertEquals(0, QUERY_COUNT.getMetric().getTotalSum());
- assertEquals(0, REJECTED_TASK_COUNT.getMetric().getTotalSum());
- assertEquals(0, QUERY_TIMEOUT.getMetric().getTotalSum());
- assertEquals(0, FAILED_QUERY.getMetric().getTotalSum());
- assertEquals(0, NUM_SPOOL_FILE.getMetric().getTotalSum());
- }
-
- private static void resetMetrics() {
- for (Metric m : PhoenixRuntime.getInternalPhoenixMetrics()) {
- m.reset();
- }
- }
-
- private static void createTableAndInsertValues(String tableName, boolean resetMetricsAfterTableCreate) throws Exception {
- String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)";
- Connection conn = DriverManager.getConnection(getUrl());
- conn.createStatement().execute(ddl);
- if (resetMetricsAfterTableCreate) {
- resetMetrics();
- }
- // executing 10 upserts/mutations.
- String dml = "UPSERT INTO " + tableName + " VALUES (?, ?)";
- PreparedStatement stmt = conn.prepareStatement(dml);
- for (int i = 1; i <= 10; i++) {
- stmt.setString(1, "key" + i);
- stmt.setString(2, "value" + i);
- stmt.executeUpdate();
- }
- conn.commit();
- }
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
index c8696e2..e0f0a3c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
@@ -260,6 +260,7 @@ public class PartialCommitIT {
PhoenixConnection phxCon = new PhoenixConnection(con.unwrap(PhoenixConnection.class));
final Map<TableRef,Map<ImmutableBytesPtr,MutationState.RowMutationState>> mutations = Maps.newTreeMap(new TableRefComparator());
return new PhoenixConnection(phxCon) {
+ @Override
protected MutationState newMutationState(int maxSize) {
return new MutationState(maxSize, this, mutations);
};
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
new file mode 100644
index 0000000..d9ca8e8
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
@@ -0,0 +1,815 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_QUERY_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BATCH_SIZE;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BYTES;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_COMMIT_TIME;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_NUM_PARALLEL_SCANS;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIME;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIMEOUT_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_REJECTED_TASK_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SCAN_BYTES;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SELECT_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SPOOL_FILE_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_END_TO_END_TIME;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_EXECUTION_TIME;
+import static org.apache.phoenix.monitoring.MetricType.MEMORY_CHUNK_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTION_TIME;
+import static org.apache.phoenix.util.PhoenixRuntime.UPSERT_BATCH_SIZE_ATTRIB;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.end2end.BaseOwnClusterHBaseManagedTimeIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+public class PhoenixMetricsIT extends BaseOwnClusterHBaseManagedTimeIT {
+
+ private static final List<String> mutationMetricsToSkip = Lists
+ .newArrayList(MetricType.MUTATION_COMMIT_TIME.name());
+ private static final List<String> readMetricsToSkip = Lists.newArrayList(MetricType.TASK_QUEUE_WAIT_TIME.name(),
+ MetricType.TASK_EXECUTION_TIME.name(), MetricType.TASK_END_TO_END_TIME.name());
+
+ @BeforeClass
+ public static void doSetup() throws Exception {
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+ // Enable request metric collection at the driver level
+ props.put(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, String.valueOf(true));
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ @Test
+ public void testResetGlobalPhoenixMetrics() {
+ resetGlobalMetrics();
+ for (GlobalMetric m : PhoenixRuntime.getGlobalPhoenixClientMetrics()) {
+ assertEquals(0, m.getTotalSum());
+ assertEquals(0, m.getNumberOfSamples());
+ }
+ }
+
+ @Test
+ public void testGlobalPhoenixMetricsForQueries() throws Exception {
+ createTableAndInsertValues("T", true);
+ resetGlobalMetrics(); // we want to count metrics related only to the below query
+ Connection conn = DriverManager.getConnection(getUrl());
+ String query = "SELECT * FROM T";
+ ResultSet rs = conn.createStatement().executeQuery(query);
+ while (rs.next()) {
+ rs.getString(1);
+ rs.getString(2);
+ }
+ assertEquals(1, GLOBAL_NUM_PARALLEL_SCANS.getMetric().getTotalSum());
+ assertEquals(1, GLOBAL_SELECT_SQL_COUNTER.getMetric().getTotalSum());
+ assertEquals(0, GLOBAL_REJECTED_TASK_COUNTER.getMetric().getTotalSum());
+ assertEquals(0, GLOBAL_QUERY_TIMEOUT_COUNTER.getMetric().getTotalSum());
+ assertEquals(0, GLOBAL_FAILED_QUERY_COUNTER.getMetric().getTotalSum());
+ assertEquals(0, GLOBAL_SPOOL_FILE_COUNTER.getMetric().getTotalSum());
+ assertEquals(0, GLOBAL_MUTATION_BATCH_SIZE.getMetric().getTotalSum());
+ assertEquals(0, GLOBAL_MUTATION_BYTES.getMetric().getTotalSum());
+ assertEquals(0, GLOBAL_MUTATION_COMMIT_TIME.getMetric().getTotalSum());
+
+ assertTrue(GLOBAL_SCAN_BYTES.getMetric().getTotalSum() > 0);
+ assertTrue(GLOBAL_QUERY_TIME.getMetric().getTotalSum() > 0);
+ assertTrue(GLOBAL_TASK_END_TO_END_TIME.getMetric().getTotalSum() > 0);
+ assertTrue(GLOBAL_TASK_EXECUTION_TIME.getMetric().getTotalSum() > 0);
+ }
+
+ @Test
+ public void testGlobalPhoenixMetricsForMutations() throws Exception {
+ createTableAndInsertValues("T", true);
+ assertEquals(10, GLOBAL_MUTATION_BATCH_SIZE.getMetric().getTotalSum());
+ assertEquals(10, GLOBAL_MUTATION_SQL_COUNTER.getMetric().getTotalSum());
+ assertTrue(GLOBAL_MUTATION_BYTES.getMetric().getTotalSum() > 0);
+ assertTrue(GLOBAL_MUTATION_COMMIT_TIME.getMetric().getTotalSum() > 0);
+ assertEquals(0, GLOBAL_NUM_PARALLEL_SCANS.getMetric().getTotalSum());
+ assertEquals(0, GLOBAL_SELECT_SQL_COUNTER.getMetric().getTotalSum());
+ assertEquals(0, GLOBAL_REJECTED_TASK_COUNTER.getMetric().getTotalSum());
+ assertEquals(0, GLOBAL_QUERY_TIMEOUT_COUNTER.getMetric().getTotalSum());
+ assertEquals(0, GLOBAL_FAILED_QUERY_COUNTER.getMetric().getTotalSum());
+ assertEquals(0, GLOBAL_SPOOL_FILE_COUNTER.getMetric().getTotalSum());
+ }
+
+ @Test
+ public void testGlobalPhoenixMetricsForUpsertSelect() throws Exception {
+ createTableAndInsertValues("T", true);
+ resetGlobalMetrics();
+ String ddl = "CREATE TABLE T2 (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)";
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.createStatement().execute(ddl);
+ resetGlobalMetrics();
+ String dml = "UPSERT INTO T2 (K, V) SELECT K, V FROM T";
+ conn.createStatement().executeUpdate(dml);
+ conn.commit();
+ assertEquals(10, GLOBAL_MUTATION_BATCH_SIZE.getMetric().getTotalSum());
+ assertEquals(1, GLOBAL_MUTATION_SQL_COUNTER.getMetric().getTotalSum());
+ assertEquals(1, GLOBAL_NUM_PARALLEL_SCANS.getMetric().getTotalSum());
+ assertEquals(0, GLOBAL_QUERY_TIME.getMetric().getTotalSum());
+ assertTrue(GLOBAL_SCAN_BYTES.getMetric().getTotalSum() > 0);
+ assertTrue(GLOBAL_MUTATION_BYTES.getMetric().getTotalSum() > 0);
+ assertTrue(GLOBAL_MUTATION_COMMIT_TIME.getMetric().getTotalSum() > 0);
+ assertEquals(0, GLOBAL_SELECT_SQL_COUNTER.getMetric().getTotalSum());
+ assertEquals(0, GLOBAL_REJECTED_TASK_COUNTER.getMetric().getTotalSum());
+ assertEquals(0, GLOBAL_QUERY_TIMEOUT_COUNTER.getMetric().getTotalSum());
+ assertEquals(0, GLOBAL_FAILED_QUERY_COUNTER.getMetric().getTotalSum());
+ assertEquals(0, GLOBAL_SPOOL_FILE_COUNTER.getMetric().getTotalSum());
+ }
+
+ private static void resetGlobalMetrics() {
+ for (GlobalMetric m : PhoenixRuntime.getGlobalPhoenixClientMetrics()) {
+ m.reset();
+ }
+ }
+
+ private static void createTableAndInsertValues(String tableName, boolean resetGlobalMetricsAfterTableCreate)
+ throws Exception {
+ String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)";
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.createStatement().execute(ddl);
+ if (resetGlobalMetricsAfterTableCreate) {
+ resetGlobalMetrics();
+ }
+ // executing 10 upserts/mutations.
+ String dml = "UPSERT INTO " + tableName + " VALUES (?, ?)";
+ PreparedStatement stmt = conn.prepareStatement(dml);
+ for (int i = 1; i <= 10; i++) {
+ stmt.setString(1, "key" + i);
+ stmt.setString(2, "value" + i);
+ stmt.executeUpdate();
+ }
+ conn.commit();
+ }
+
+ @Test
+ public void testOverallQueryMetricsForSelect() throws Exception {
+ String tableName = "SCANMETRICS";
+ String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = 6";
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.createStatement().execute(ddl);
+ }
+
+ @Test
+ public void testReadMetricsForSelect() throws Exception {
+ String tableName = "READMETRICSFORSELECT";
+ long numSaltBuckets = 6;
+ String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = "
+ + numSaltBuckets;
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.createStatement().execute(ddl);
+
+ long numRows = 1000;
+ long numExpectedTasks = numSaltBuckets;
+ insertRowsInTable(tableName, numRows);
+
+ String query = "SELECT * FROM " + tableName;
+ Statement stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery(query);
+ PhoenixResultSet resultSetBeingTested = rs.unwrap(PhoenixResultSet.class);
+ changeInternalStateForTesting(resultSetBeingTested);
+ while (resultSetBeingTested.next()) {}
+ resultSetBeingTested.close();
+ Set<String> expectedTableNames = Sets.newHashSet(tableName);
+ assertReadMetricValuesForSelectSql(Lists.newArrayList(numRows), Lists.newArrayList(numExpectedTasks),
+ resultSetBeingTested, expectedTableNames);
+ }
+
+ @Test
+ public void testMetricsForUpsert() throws Exception {
+ String tableName = "UPSERTMETRICS";
+ String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = 6";
+ Connection ddlConn = DriverManager.getConnection(getUrl());
+ ddlConn.createStatement().execute(ddl);
+ ddlConn.close();
+
+ int numRows = 10;
+ Connection conn = insertRowsInTable(tableName, numRows);
+ PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+ Map<String, Map<String, Long>> mutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
+ for (Entry<String, Map<String, Long>> entry : mutationMetrics.entrySet()) {
+ String t = entry.getKey();
+ assertEquals("Table names didn't match!", tableName, t);
+ Map<String, Long> p = entry.getValue();
+ assertEquals("There should have been three metrics", 3, p.size());
+ boolean mutationBatchSizePresent = false;
+ boolean mutationCommitTimePresent = false;
+ boolean mutationBytesPresent = false;
+ for (Entry<String, Long> metric : p.entrySet()) {
+ String metricName = metric.getKey();
+ long metricValue = metric.getValue();
+ if (metricName.equals(MetricType.MUTATION_BATCH_SIZE.name())) {
+ assertEquals("Mutation batch sizes didn't match!", numRows, metricValue);
+ mutationBatchSizePresent = true;
+ } else if (metricName.equals(MetricType.MUTATION_COMMIT_TIME.name())) {
+ assertTrue("Mutation commit time should be greater than zero", metricValue > 0);
+ mutationCommitTimePresent = true;
+ } else if (metricName.equals(MetricType.MUTATION_BYTES.name())) {
+ assertTrue("Mutation bytes size should be greater than zero", metricValue > 0);
+ mutationBytesPresent = true;
+ }
+ }
+ assertTrue(mutationBatchSizePresent);
+ assertTrue(mutationCommitTimePresent);
+ assertTrue(mutationBytesPresent);
+ }
+ Map<String, Map<String, Long>> readMetrics = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+ assertEquals("Read metrics should be empty", 0, readMetrics.size());
+ }
+
+ @Test
+ public void testMetricsForUpsertSelect() throws Exception {
+ String tableName1 = "UPSERTFROM";
+ long table1SaltBuckets = 6;
+ String ddl = "CREATE TABLE " + tableName1 + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = "
+ + table1SaltBuckets;
+ Connection ddlConn = DriverManager.getConnection(getUrl());
+ ddlConn.createStatement().execute(ddl);
+ ddlConn.close();
+ int numRows = 10;
+ insertRowsInTable(tableName1, numRows);
+
+ String tableName2 = "UPSERTTO";
+ ddl = "CREATE TABLE " + tableName2 + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = 10";
+ ddlConn = DriverManager.getConnection(getUrl());
+ ddlConn.createStatement().execute(ddl);
+ ddlConn.close();
+
+ Connection conn = DriverManager.getConnection(getUrl());
+ String upsertSelect = "UPSERT INTO " + tableName2 + " SELECT * FROM " + tableName1;
+ conn.createStatement().executeUpdate(upsertSelect);
+ conn.commit();
+ PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+
+ Map<String, Map<String, Long>> mutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
+ assertMutationMetrics(tableName2, numRows, mutationMetrics);
+ Map<String, Map<String, Long>> readMetrics = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+ assertReadMetricsForMutatingSql(tableName1, table1SaltBuckets, readMetrics);
+ }
+
+ @Test
+ public void testMetricsForDelete() throws Exception {
+ String tableName = "DELETEMETRICS";
+ long tableSaltBuckets = 6;
+ String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = "
+ + tableSaltBuckets;
+ Connection ddlConn = DriverManager.getConnection(getUrl());
+ ddlConn.createStatement().execute(ddl);
+ ddlConn.close();
+ int numRows = 10;
+ insertRowsInTable(tableName, numRows);
+ Connection conn = DriverManager.getConnection(getUrl());
+ String delete = "DELETE FROM " + tableName;
+ conn.createStatement().execute(delete);
+ conn.commit();
+ PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+ Map<String, Map<String, Long>> mutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
+ assertMutationMetrics(tableName, numRows, mutationMetrics);
+
+ Map<String, Map<String, Long>> readMetrics = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+ assertReadMetricsForMutatingSql(tableName, tableSaltBuckets, readMetrics);
+ }
+
+ @Test
+ public void testNoMetricsCollectedForConnection() throws Exception {
+ String tableName = "NOMETRICS";
+ long tableSaltBuckets = 6;
+ String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = "
+ + tableSaltBuckets;
+ Connection ddlConn = DriverManager.getConnection(getUrl());
+ ddlConn.createStatement().execute(ddl);
+ ddlConn.close();
+ int numRows = 10;
+ insertRowsInTable(tableName, numRows);
+ Properties props = new Properties();
+ props.setProperty(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, "false");
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName);
+ while (rs.next()) {}
+ rs.close();
+ Map<String, Map<String, Long>> readMetrics = PhoenixRuntime.getRequestReadMetrics(rs);
+ assertTrue("No read metrics should have been generated", readMetrics.size() == 0);
+ conn.createStatement().executeUpdate("UPSERT INTO " + tableName + " VALUES ('KEY', 'VALUE')");
+ conn.commit();
+ Map<String, Map<String, Long>> writeMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
+ assertTrue("No write metrics should have been generated", writeMetrics.size() == 0);
+ }
+
+ @Test
+ public void testMetricsForUpsertWithAutoCommit() throws Exception {
+ String tableName = "VERIFYUPSERTAUTOCOMMIT";
+ long tableSaltBuckets = 6;
+ String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = "
+ + tableSaltBuckets;
+ try (Connection ddlConn = DriverManager.getConnection(getUrl())) {
+ ddlConn.createStatement().execute(ddl);
+ }
+
+ String upsert = "UPSERT INTO " + tableName + " VALUES (?, ?)";
+ int numRows = 10;
+ Map<String, Map<String, Long>> mutationMetricsForAutoCommitOff = null;
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.setAutoCommit(false);
+ upsertRows(upsert, numRows, conn);
+ conn.commit();
+ mutationMetricsForAutoCommitOff = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
+ }
+
+ // Insert rows now with auto-commit on
+ Map<String, Map<String, Long>> mutationMetricsAutoCommitOn = null;
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.setAutoCommit(true);
+ upsertRows(upsert, numRows, conn);
+ mutationMetricsAutoCommitOn = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
+ }
+ // Verify that the mutation metrics are same for both cases
+ assertMetricsAreSame(mutationMetricsForAutoCommitOff, mutationMetricsAutoCommitOn, mutationMetricsToSkip);
+ }
+
+ private void upsertRows(String upsert, int numRows, Connection conn) throws SQLException {
+ PreparedStatement stmt = conn.prepareStatement(upsert);
+ for (int i = 1; i <= numRows; i++) {
+ stmt.setString(1, "key" + i);
+ stmt.setString(2, "value" + i);
+ stmt.executeUpdate();
+ }
+ }
+
+ @Test
+ public void testMetricsForDeleteWithAutoCommit() throws Exception {
+ String tableName = "VERIFYDELETEAUTOCOMMIT";
+ long tableSaltBuckets = 6;
+ String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = "
+ + tableSaltBuckets;
+ try (Connection ddlConn = DriverManager.getConnection(getUrl())) {
+ ddlConn.createStatement().execute(ddl);
+ }
+
+ String upsert = "UPSERT INTO " + tableName + " VALUES (?, ?)";
+ int numRows = 10;
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.setAutoCommit(false);
+ upsertRows(upsert, numRows, conn);
+ conn.commit();
+ }
+
+ String delete = "DELETE FROM " + tableName;
+ // Delete rows now with auto-commit off
+ Map<String, Map<String, Long>> deleteMetricsWithAutoCommitOff = null;
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.setAutoCommit(false);
+ conn.createStatement().executeUpdate(delete);
+ deleteMetricsWithAutoCommitOff = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
+ }
+
+ // Upsert the rows back
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.setAutoCommit(false);
+ upsertRows(upsert, numRows, conn);
+ conn.commit();
+ }
+
+ // Now delete rows with auto-commit on
+ Map<String, Map<String, Long>> deleteMetricsWithAutoCommitOn = null;
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.setAutoCommit(true);
+ conn.createStatement().executeUpdate(delete);
+ deleteMetricsWithAutoCommitOn = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
+ }
+
+ // Verify that the mutation metrics are same for both cases.
+ assertMetricsAreSame(deleteMetricsWithAutoCommitOff, deleteMetricsWithAutoCommitOn, mutationMetricsToSkip);
+ }
+
+ @Test
+ public void testMetricsForUpsertSelectWithAutoCommit() throws Exception {
+ String tableName1 = "UPSERTFROMAUTOCOMMIT";
+ long table1SaltBuckets = 6;
+ String ddl = "CREATE TABLE " + tableName1 + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = "
+ + table1SaltBuckets;
+ Connection ddlConn = DriverManager.getConnection(getUrl());
+ ddlConn.createStatement().execute(ddl);
+ ddlConn.close();
+ int numRows = 10;
+ insertRowsInTable(tableName1, numRows);
+
+ String tableName2 = "UPSERTTOAUTCOMMIT";
+ ddl = "CREATE TABLE " + tableName2 + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = 10";
+ ddlConn = DriverManager.getConnection(getUrl());
+ ddlConn.createStatement().execute(ddl);
+ ddlConn.close();
+
+ String upsertSelect = "UPSERT INTO " + tableName2 + " SELECT * FROM " + tableName1;
+
+ Map<String, Map<String, Long>> mutationMetricsAutoCommitOff = null;
+ Map<String, Map<String, Long>> readMetricsAutoCommitOff = null;
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.setAutoCommit(false);
+ conn.createStatement().executeUpdate(upsertSelect);
+ conn.commit();
+ PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+ mutationMetricsAutoCommitOff = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
+ readMetricsAutoCommitOff = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+ }
+
+ Map<String, Map<String, Long>> mutationMetricsAutoCommitOn = null;
+ Map<String, Map<String, Long>> readMetricsAutoCommitOn = null;
+
+ int autoCommitBatchSize = numRows + 1; // batchsize = 11 is less than numRows and is not a divisor of batchsize
+ Properties props = new Properties();
+ props.setProperty(UPSERT_BATCH_SIZE_ATTRIB, Integer.toString(autoCommitBatchSize));
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ conn.createStatement().executeUpdate(upsertSelect);
+ PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+ mutationMetricsAutoCommitOn = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
+ readMetricsAutoCommitOn = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+ }
+ assertMetricsAreSame(mutationMetricsAutoCommitOff, mutationMetricsAutoCommitOn, mutationMetricsToSkip);
+ assertMetricsAreSame(readMetricsAutoCommitOff, readMetricsAutoCommitOn, readMetricsToSkip);
+
+ autoCommitBatchSize = numRows - 1; // batchsize = 9 is less than numRows and is not a divisor of batchsize
+ props = new Properties();
+ props.setProperty(UPSERT_BATCH_SIZE_ATTRIB, Integer.toString(autoCommitBatchSize));
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ conn.createStatement().executeUpdate(upsertSelect);
+ PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+ mutationMetricsAutoCommitOn = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
+ readMetricsAutoCommitOn = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+ }
+ assertMetricsAreSame(mutationMetricsAutoCommitOff, mutationMetricsAutoCommitOn, mutationMetricsToSkip);
+ assertMetricsAreSame(readMetricsAutoCommitOff, readMetricsAutoCommitOn, readMetricsToSkip);
+
+ autoCommitBatchSize = numRows;
+ props = new Properties();
+ props.setProperty(UPSERT_BATCH_SIZE_ATTRIB, Integer.toString(autoCommitBatchSize));
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ conn.createStatement().executeUpdate(upsertSelect);
+ PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+ mutationMetricsAutoCommitOn = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
+ readMetricsAutoCommitOn = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+ }
+ assertMetricsAreSame(mutationMetricsAutoCommitOff, mutationMetricsAutoCommitOn, mutationMetricsToSkip);
+ assertMetricsAreSame(readMetricsAutoCommitOff, readMetricsAutoCommitOn, readMetricsToSkip);
+
+ autoCommitBatchSize = 2; // multiple batches of equal size
+ props = new Properties();
+ props.setProperty(UPSERT_BATCH_SIZE_ATTRIB, Integer.toString(autoCommitBatchSize));
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ conn.createStatement().executeUpdate(upsertSelect);
+ PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+ mutationMetricsAutoCommitOn = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
+ readMetricsAutoCommitOn = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+ }
+ assertMetricsAreSame(mutationMetricsAutoCommitOff, mutationMetricsAutoCommitOn, mutationMetricsToSkip);
+ assertMetricsAreSame(readMetricsAutoCommitOff, readMetricsAutoCommitOff, readMetricsToSkip);
+ }
+
+ @Test
+ public void testMutationMetricsWhenUpsertingToMultipleTables() throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String table1 = "TABLE1";
+ createTableAndInsertValues(true, 10, conn, table1);
+ String table2 = "TABLE2";
+ createTableAndInsertValues(true, 10, conn, table2);
+ String table3 = "TABLE3";
+ createTableAndInsertValues(true, 10, conn, table3);
+ Map<String, Map<String, Long>> mutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
+ assertTrue("Mutation metrics not present for " + table1, mutationMetrics.get(table1) != null);
+ assertTrue("Mutation metrics not present for " + table2, mutationMetrics.get(table2) != null);
+ assertTrue("Mutation metrics not present for " + table3, mutationMetrics.get(table3) != null);
+ assertMetricsHaveSameValues(mutationMetrics.get(table1), mutationMetrics.get(table2), mutationMetricsToSkip);
+ assertMetricsHaveSameValues(mutationMetrics.get(table1), mutationMetrics.get(table3), mutationMetricsToSkip);
+ }
+ }
+
+ @Test
+ public void testClosingConnectionClearsMetrics() throws Exception {
+ Connection conn = null;
+ try {
+ conn = DriverManager.getConnection(getUrl());
+ createTableAndInsertValues(true, 10, conn, "clearmetrics");
+ assertTrue("Mutation metrics not present", PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn).size() > 0);
+ } finally {
+ if (conn != null) {
+ conn.close();
+ assertTrue("Closing connection didn't clear metrics",
+ PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn).size() == 0);
+ }
+ }
+ }
+
+ @Test
+ public void testMetricsForUpsertingIntoImmutableTableWithIndices() throws Exception {
+ String dataTable = "IMMTABLEWITHINDICES";
+ String tableDdl = "CREATE TABLE "
+ + dataTable
+ + " (K1 VARCHAR NOT NULL, K2 VARCHAR NOT NULL, V1 INTEGER, V2 INTEGER, V3 INTEGER CONSTRAINT NAME_PK PRIMARY KEY(K1, K2)) IMMUTABLE_ROWS = true";
+ String index1 = "I1";
+ String index1Ddl = "CREATE INDEX " + index1 + " ON " + dataTable + " (V1) include (V2)";
+ String index2 = "I2";
+ String index2Ddl = "CREATE INDEX " + index2 + " ON " + dataTable + " (V2) include (V3)";
+ String index3 = "I3";
+ String index3Ddl = "CREATE INDEX " + index3 + " ON " + dataTable + " (V3) include (V1)";
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.createStatement().execute(tableDdl);
+ conn.createStatement().execute(index1Ddl);
+ conn.createStatement().execute(index2Ddl);
+ conn.createStatement().execute(index3Ddl);
+ }
+ String upsert = "UPSERT INTO " + dataTable + " VALUES (?, ?, ?, ?, ?)";
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ /*
+ * Upsert data into table. Because the table is immutable, mutations for updating the indices on it are
+ * handled by the client itself. So mutation metrics should include mutations for the indices as well as the
+ * data table.
+ */
+ PreparedStatement stmt = conn.prepareStatement(upsert);
+ for (int i = 1; i < 10; i++) {
+ stmt.setString(1, "key1" + i);
+ stmt.setString(2, "key2" + i);
+ stmt.setInt(3, i);
+ stmt.setInt(4, i);
+ stmt.setInt(5, i);
+ stmt.executeUpdate();
+ }
+ conn.commit();
+ Map<String, Map<String, Long>> metrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
+ assertTrue(metrics.get(dataTable).size() > 0);
+ assertTrue(metrics.get(index1).size() > 0);
+ assertTrue(metrics.get(index2).size() > 0);
+ assertMetricsHaveSameValues(metrics.get(index1), metrics.get(index2), mutationMetricsToSkip);
+ assertTrue(metrics.get(index3).size() > 0);
+ assertMetricsHaveSameValues(metrics.get(index1), metrics.get(index3), mutationMetricsToSkip);
+ }
+ }
+
+ @Test
+ public void testMetricsForUpsertSelectSameTable() throws Exception {
+ String tableName = "UPSERTSAME";
+ long table1SaltBuckets = 6;
+ String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = "
+ + table1SaltBuckets;
+ Connection ddlConn = DriverManager.getConnection(getUrl());
+ ddlConn.createStatement().execute(ddl);
+ ddlConn.close();
+ int numRows = 10;
+ insertRowsInTable(tableName, numRows);
+
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.setAutoCommit(false);
+ String upsertSelect = "UPSERT INTO " + tableName + " SELECT * FROM " + tableName;
+ conn.createStatement().executeUpdate(upsertSelect);
+ conn.commit();
+ PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+
+ Map<String, Map<String, Long>> mutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
+ // Because auto-commit is off, upsert select into the same table will run on the client.
+ // So we should have client side read and write metrics available.
+ assertMutationMetrics(tableName, numRows, mutationMetrics);
+ Map<String, Map<String, Long>> readMetrics = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+ assertReadMetricsForMutatingSql(tableName, table1SaltBuckets, readMetrics);
+ PhoenixRuntime.resetMetrics(pConn);
+ // With autocommit on, still, this upsert select runs on the client side.
+ conn.setAutoCommit(true);
+ conn.createStatement().executeUpdate(upsertSelect);
+ Map<String, Map<String, Long>> autoCommitMutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
+ Map<String, Map<String, Long>> autoCommitReadMetrics = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+ assertMetricsAreSame(mutationMetrics, autoCommitMutationMetrics, mutationMetricsToSkip);
+ assertMetricsAreSame(readMetrics, autoCommitReadMetrics, readMetricsToSkip);
+ }
+
+ private void createTableAndInsertValues(boolean commit, int numRows, Connection conn, String tableName)
+ throws SQLException {
+ String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)";
+ conn.createStatement().execute(ddl);
+ // executing 10 upserts/mutations.
+ String dml = "UPSERT INTO " + tableName + " VALUES (?, ?)";
+ PreparedStatement stmt = conn.prepareStatement(dml);
+ for (int i = 1; i <= numRows; i++) {
+ stmt.setString(1, "key" + i);
+ stmt.setString(2, "value" + i);
+ stmt.executeUpdate();
+ }
+ if (commit) {
+ conn.commit();
+ }
+ }
+
+ private void assertMetricsAreSame(Map<String, Map<String, Long>> metric1, Map<String, Map<String, Long>> metric2,
+ List<String> metricsToSkip) {
+ assertTrue("The two metrics have different or unequal number of table names ",
+ metric1.keySet().equals(metric2.keySet()));
+ for (Entry<String, Map<String, Long>> entry : metric1.entrySet()) {
+ Map<String, Long> metricNameValueMap1 = entry.getValue();
+ Map<String, Long> metricNameValueMap2 = metric2.get(entry.getKey());
+ assertMetricsHaveSameValues(metricNameValueMap1, metricNameValueMap2, metricsToSkip);
+ }
+ }
+
+ private void assertMetricsHaveSameValues(Map<String, Long> metricNameValueMap1,
+ Map<String, Long> metricNameValueMap2, List<String> metricsToSkip) {
+ assertTrue("The two metrics have different or unequal number of metric names ", metricNameValueMap1.keySet()
+ .equals(metricNameValueMap2.keySet()));
+ for (Entry<String, Long> entry : metricNameValueMap1.entrySet()) {
+ String metricName = entry.getKey();
+ if (!metricsToSkip.contains(metricName)) {
+ assertEquals("Unequal values for metric " + metricName, entry.getValue(),
+ metricNameValueMap2.get(metricName));
+ }
+ }
+ }
+
+ private void changeInternalStateForTesting(PhoenixResultSet rs) {
+ // get and set the internal state for testing purposes.
+ ReadMetricQueue testMetricsQueue = new TestReadMetricsQueue(true);
+ StatementContext ctx = (StatementContext)Whitebox.getInternalState(rs, "context");
+ Whitebox.setInternalState(ctx, "readMetricsQueue", testMetricsQueue);
+ Whitebox.setInternalState(rs, "readMetricsQueue", testMetricsQueue);
+ }
+
+ private void assertReadMetricValuesForSelectSql(ArrayList<Long> numRows, ArrayList<Long> numExpectedTasks,
+ PhoenixResultSet resultSetBeingTested, Set<String> expectedTableNames) throws SQLException {
+ Map<String, Map<String, Long>> metrics = PhoenixRuntime.getRequestReadMetrics(resultSetBeingTested);
+ int counter = 0;
+ for (Entry<String, Map<String, Long>> entry : metrics.entrySet()) {
+ String tableName = entry.getKey();
+ expectedTableNames.remove(tableName);
+ Map<String, Long> metricValues = entry.getValue();
+ boolean scanMetricsPresent = false;
+ boolean taskCounterMetricsPresent = false;
+ boolean taskExecutionTimeMetricsPresent = false;
+ boolean memoryMetricsPresent = false;
+ for (Entry<String, Long> pair : metricValues.entrySet()) {
+ String metricName = pair.getKey();
+ long metricValue = pair.getValue();
+ long n = numRows.get(counter);
+ long numTask = numExpectedTasks.get(counter);
+ if (metricName.equals(SCAN_BYTES.name())) {
+ // we are using a SCAN_BYTES_DELTA of 1. So number of scan bytes read should be number of rows read
+ assertEquals(n, metricValue);
+ scanMetricsPresent = true;
+ } else if (metricName.equals(TASK_EXECUTED_COUNTER.name())) {
+ assertEquals(numTask, metricValue);
+ taskCounterMetricsPresent = true;
+ } else if (metricName.equals(TASK_EXECUTION_TIME.name())) {
+ assertEquals(numTask * TASK_EXECUTION_TIME_DELTA, metricValue);
+ taskExecutionTimeMetricsPresent = true;
+ } else if (metricName.equals(MEMORY_CHUNK_BYTES.name())) {
+ assertEquals(numTask * MEMORY_CHUNK_BYTES_DELTA, metricValue);
+ memoryMetricsPresent = true;
+ }
+ }
+ counter++;
+ assertTrue(scanMetricsPresent);
+ assertTrue(taskCounterMetricsPresent);
+ assertTrue(taskExecutionTimeMetricsPresent);
+ assertTrue(memoryMetricsPresent);
+ }
+ PhoenixRuntime.resetMetrics(resultSetBeingTested);
+ assertTrue("Metrics not found tables " + Joiner.on(",").join(expectedTableNames),
+ expectedTableNames.size() == 0);
+ }
+
+ private Connection insertRowsInTable(String tableName, long numRows) throws SQLException {
+ String dml = "UPSERT INTO " + tableName + " VALUES (?, ?)";
+ Connection conn = DriverManager.getConnection(getUrl());
+ PreparedStatement stmt = conn.prepareStatement(dml);
+ for (int i = 1; i <= numRows; i++) {
+ stmt.setString(1, "key" + i);
+ stmt.setString(2, "value" + i);
+ stmt.executeUpdate();
+ }
+ conn.commit();
+ return conn;
+ }
+
+ // number of records read should be number of bytes at the end
+ public static final long SCAN_BYTES_DELTA = 1;
+
+ // total task execution time should be numTasks * TASK_EXECUTION_TIME_DELTA
+ public static final long TASK_EXECUTION_TIME_DELTA = 10;
+
+ // total task execution time should be numTasks * TASK_EXECUTION_TIME_DELTA
+ public static final long MEMORY_CHUNK_BYTES_DELTA = 100;
+
+ private class TestReadMetricsQueue extends ReadMetricQueue {
+
+ public TestReadMetricsQueue(boolean isRequestMetricsEnabled) {
+ super(isRequestMetricsEnabled);
+ }
+
+ @Override
+ public CombinableMetric getMetric(MetricType type) {
+ switch (type) {
+ case SCAN_BYTES:
+ return new CombinableMetricImpl(type) {
+
+ @Override
+ public void change(long delta) {
+ super.change(SCAN_BYTES_DELTA);
+ }
+ };
+ case TASK_EXECUTION_TIME:
+ return new CombinableMetricImpl(type) {
+
+ @Override
+ public void change(long delta) {
+ super.change(TASK_EXECUTION_TIME_DELTA);
+ }
+ };
+ case MEMORY_CHUNK_BYTES:
+ return new CombinableMetricImpl(type) {
+
+ @Override
+ public void change(long delta) {
+ super.change(MEMORY_CHUNK_BYTES_DELTA);
+ }
+ };
+ }
+ return super.getMetric(type);
+ }
+ }
+
+ private void assertReadMetricsForMutatingSql(String tableName, long tableSaltBuckets,
+ Map<String, Map<String, Long>> readMetrics) {
+ assertTrue("No read metrics present when there should have been!", readMetrics.size() > 0);
+ int numTables = 0;
+ for (Entry<String, Map<String, Long>> entry : readMetrics.entrySet()) {
+ String t = entry.getKey();
+ assertEquals("Table name didn't match for read metrics", tableName, t);
+ numTables++;
+ Map<String, Long> p = entry.getValue();
+ assertTrue("No read metrics present when there should have been", p.size() > 0);
+ for (Entry<String, Long> metric : p.entrySet()) {
+ String metricName = metric.getKey();
+ long metricValue = metric.getValue();
+ if (metricName.equals(TASK_EXECUTED_COUNTER.name())) {
+ assertEquals(tableSaltBuckets, metricValue);
+ } else if (metricName.equals(SCAN_BYTES.name())) {
+ assertTrue("Scan bytes read should be greater than zero", metricValue > 0);
+ }
+ }
+ }
+ assertEquals("There should have been read metrics only for one table: " + tableName, 1, numTables);
+ }
+
+ private void assertMutationMetrics(String tableName, int numRows, Map<String, Map<String, Long>> mutationMetrics) {
+ assertTrue("No mutation metrics present when there should have been", mutationMetrics.size() > 0);
+ for (Entry<String, Map<String, Long>> entry : mutationMetrics.entrySet()) {
+ String t = entry.getKey();
+ assertEquals("Table name didn't match for mutation metrics", tableName, t);
+ Map<String, Long> p = entry.getValue();
+ assertEquals("There should have been three metrics", 3, p.size());
+ for (Entry<String, Long> metric : p.entrySet()) {
+ String metricName = metric.getKey();
+ long metricValue = metric.getValue();
+ if (metricName.equals(MetricType.MUTATION_BATCH_SIZE.name())) {
+ assertEquals("Mutation batch sizes didn't match!", numRows, metricValue);
+ } else if (metricName.equals(MetricType.MUTATION_COMMIT_TIME.name())) {
+ assertTrue("Mutation commit time should be greater than zero", metricValue > 0);
+ } else if (metricName.equals(MetricType.MUTATION_BYTES.name())) {
+ assertTrue("Mutation bytes size should be greater than zero", metricValue > 0);
+ }
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
index 9718709..9ad9ef5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
@@ -17,6 +17,7 @@
*/
package org.apache.phoenix.cache;
+import static org.apache.phoenix.monitoring.TaskExecutionMetricsHolder.NO_OP_INSTANCE;
import static org.apache.phoenix.util.LogUtil.addCustomAnnotations;
import java.io.Closeable;
@@ -57,6 +58,7 @@ import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.ServerCachin
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.job.JobManager.JobCallable;
import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
@@ -226,6 +228,11 @@ public class ServerCacheClient {
public Object getJobId() {
return ServerCacheClient.this;
}
+
+ @Override
+ public TaskExecutionMetricsHolder getTaskExecutionMetric() {
+ return NO_OP_INSTANCE;
+ }
}));
} else {
if (LOG.isDebugEnabled()) {LOG.debug(addCustomAnnotations("NOT adding cache entry to be sent for " + entry + " since one already exists for that entry", connection));}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 575f0f3..a28f614 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -94,8 +94,9 @@ public class DeleteCompiler {
this.statement = statement;
}
- private static MutationState deleteRows(PhoenixStatement statement, TableRef targetTableRef, TableRef indexTableRef, ResultIterator iterator, RowProjector projector, TableRef sourceTableRef) throws SQLException {
+ private static MutationState deleteRows(StatementContext childContext, TableRef targetTableRef, TableRef indexTableRef, ResultIterator iterator, RowProjector projector, TableRef sourceTableRef) throws SQLException {
PTable table = targetTableRef.getTable();
+ PhoenixStatement statement = childContext.getStatement();
PhoenixConnection connection = statement.getConnection();
PName tenantId = connection.getTenantId();
byte[] tenantIdBytes = null;
@@ -114,19 +115,18 @@ public class DeleteCompiler {
if (indexTableRef != null) {
indexMutations = Maps.newHashMapWithExpectedSize(batchSize);
}
- try {
- List<PColumn> pkColumns = table.getPKColumns();
- boolean isMultiTenant = table.isMultiTenant() && tenantIdBytes != null;
- boolean isSharedViewIndex = table.getViewIndexId() != null;
- int offset = (table.getBucketNum() == null ? 0 : 1);
- byte[][] values = new byte[pkColumns.size()][];
- if (isMultiTenant) {
- values[offset++] = tenantIdBytes;
- }
- if (isSharedViewIndex) {
- values[offset++] = MetaDataUtil.getViewIndexIdDataType().toBytes(table.getViewIndexId());
- }
- PhoenixResultSet rs = new PhoenixResultSet(iterator, projector, statement);
+ List<PColumn> pkColumns = table.getPKColumns();
+ boolean isMultiTenant = table.isMultiTenant() && tenantIdBytes != null;
+ boolean isSharedViewIndex = table.getViewIndexId() != null;
+ int offset = (table.getBucketNum() == null ? 0 : 1);
+ byte[][] values = new byte[pkColumns.size()][];
+ if (isMultiTenant) {
+ values[offset++] = tenantIdBytes;
+ }
+ if (isSharedViewIndex) {
+ values[offset++] = MetaDataUtil.getViewIndexIdDataType().toBytes(table.getViewIndexId());
+ }
+ try (PhoenixResultSet rs = new PhoenixResultSet(iterator, projector, childContext)) {
int rowCount = 0;
while (rs.next()) {
ImmutableBytesPtr ptr = new ImmutableBytesPtr(); // allocate new as this is a key in a Map
@@ -183,8 +183,6 @@ public class DeleteCompiler {
state.join(indexState);
}
return state;
- } finally {
- iterator.close();
}
}
@@ -199,9 +197,16 @@ public class DeleteCompiler {
}
@Override
- protected MutationState mutate(StatementContext context, ResultIterator iterator, PhoenixConnection connection) throws SQLException {
+ protected MutationState mutate(StatementContext parentContext, ResultIterator iterator, PhoenixConnection connection) throws SQLException {
PhoenixStatement statement = new PhoenixStatement(connection);
- return deleteRows(statement, targetTableRef, indexTableRef, iterator, projector, sourceTableRef);
+ /*
+ * We don't want to collect any read metrics within the child context. This is because any read metrics that
+ * need to be captured are already getting collected in the parent statement context enclosed in the result
+ * iterator being used for reading rows out.
+ */
+ StatementContext ctx = new StatementContext(statement, false);
+ MutationState state = deleteRows(ctx, targetTableRef, indexTableRef, iterator, projector, sourceTableRef);
+ return state;
}
public void setTargetTableRef(TableRef tableRef) {
@@ -559,9 +564,14 @@ public class DeleteCompiler {
}
// Return total number of rows that have been delete. In the case of auto commit being off
// the mutations will all be in the mutation state of the current connection.
- return new MutationState(maxSize, connection, totalRowCount);
+ MutationState state = new MutationState(maxSize, connection, totalRowCount);
+
+ // set the read metrics accumulated in the parent context so that it can be published when the mutations are committed.
+ state.setReadMetricQueue(plan.getContext().getReadMetricsQueue());
+
+ return state;
} else {
- return deleteRows(statement, tableRef, deleteFromImmutableIndexToo ? plan.getTableRef() : null, iterator, plan.getProjector(), plan.getTableRef());
+ return deleteRows(plan.getContext(), tableRef, deleteFromImmutableIndexToo ? plan.getTableRef() : null, iterator, plan.getProjector(), plan.getTableRef());
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
index bcac17d..630760c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
@@ -35,9 +35,9 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.util.KeyValueUtil;
/**
@@ -53,21 +53,34 @@ public abstract class MutatingParallelIteratorFactory implements ParallelIterato
/**
* Method that does the actual mutation work
*/
- abstract protected MutationState mutate(StatementContext context, ResultIterator iterator, PhoenixConnection connection) throws SQLException;
+ abstract protected MutationState mutate(StatementContext parentContext, ResultIterator iterator, PhoenixConnection connection) throws SQLException;
@Override
- public PeekingResultIterator newIterator(StatementContext context, ResultIterator iterator, Scan scan) throws SQLException {
- final PhoenixConnection connection = new PhoenixConnection(this.connection);
- MutationState state = mutate(context, iterator, connection);
+ public PeekingResultIterator newIterator(final StatementContext parentContext, ResultIterator iterator, Scan scan, String tableName) throws SQLException {
+ final PhoenixConnection clonedConnection = new PhoenixConnection(this.connection);
+
+ MutationState state = mutate(parentContext, iterator, clonedConnection);
+
long totalRowCount = state.getUpdateCount();
- if (connection.getAutoCommit()) {
- connection.getMutationState().join(state);
- connection.commit();
- ConnectionQueryServices services = connection.getQueryServices();
- int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
- state = new MutationState(maxSize, connection, totalRowCount);
+ if (clonedConnection.getAutoCommit()) {
+ clonedConnection.getMutationState().join(state);
+ clonedConnection.commit();
+ ConnectionQueryServices services = clonedConnection.getQueryServices();
+ int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
+ /*
+ * Everything that was mutated as part of the clonedConnection has been committed. However, we want to
+ * report the mutation work done using this clonedConnection as part of the overall mutation work of the
+ * parent connection. So we need to set those metrics in the empty mutation state so that they could be
+ * combined with the parent connection's mutation metrics (as part of combining mutation state) in the
+ * close() method of the iterator being returned. Don't combine the read metrics in parent context yet
+ * though because they are possibly being concurrently modified by other threads at this stage. Instead we
+ * will get hold of the read metrics when all the mutating iterators are done.
+ */
+ state = MutationState.emptyMutationState(maxSize, clonedConnection);
+ state.getMutationMetricQueue().combineMetricQueues(clonedConnection.getMutationState().getMutationMetricQueue());
}
final MutationState finalState = state;
+
byte[] value = PLong.INSTANCE.toBytes(totalRowCount);
KeyValue keyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length);
final Tuple tuple = new SingleKeyValueTuple(keyValue);
@@ -90,13 +103,17 @@ public abstract class MutatingParallelIteratorFactory implements ParallelIterato
@Override
public void close() throws SQLException {
try {
- // Join the child mutation states in close, since this is called in a single threaded manner
- // after the parallel results have been processed.
- if (!connection.getAutoCommit()) {
- MutatingParallelIteratorFactory.this.connection.getMutationState().join(finalState);
- }
+ /*
+ * Join the child mutation states in close, since this is called in a single threaded manner
+ * after the parallel results have been processed.
+ * If auto-commit is on for the cloned child connection, then the finalState here is an empty mutation
+ * state (with no mutations). However, it still has the metrics for mutation work done by the
+ * mutating-iterator. Joining the mutation state makes sure those metrics are passed over
+ * to the parent connection.
+ */
+ MutatingParallelIteratorFactory.this.connection.getMutationState().join(finalState);
} finally {
- connection.close();
+ clonedConnection.close();
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
index d726488..52bb7f2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
@@ -32,6 +32,8 @@ import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.monitoring.OverAllQueryMetrics;
+import org.apache.phoenix.monitoring.ReadMetricQueue;
import org.apache.phoenix.parse.SelectStatement;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
@@ -41,6 +43,7 @@ import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.util.DateUtil;
import org.apache.phoenix.util.NumberUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
import com.google.common.collect.Maps;
@@ -80,10 +83,19 @@ public class StatementContext {
private TimeRange scanTimeRange = null;
private Map<SelectStatement, Object> subqueryResults;
-
+ private final ReadMetricQueue readMetricsQueue;
+ private final OverAllQueryMetrics overAllQueryMetrics;
+
public StatementContext(PhoenixStatement statement) {
this(statement, new Scan());
}
+
+ /**
+ * Constructor that lets you override whether or not to collect request level metrics.
+ */
+ public StatementContext(PhoenixStatement statement, boolean collectRequestLevelMetrics) {
+ this(statement, FromCompiler.EMPTY_TABLE_RESOLVER, new Scan(), new SequenceManager(statement), collectRequestLevelMetrics);
+ }
public StatementContext(PhoenixStatement statement, Scan scan) {
this(statement, FromCompiler.EMPTY_TABLE_RESOLVER, new Scan(), new SequenceManager(statement));
@@ -94,6 +106,10 @@ public class StatementContext {
}
public StatementContext(PhoenixStatement statement, ColumnResolver resolver, Scan scan, SequenceManager seqManager) {
+ this(statement, resolver, scan, seqManager, statement.getConnection().isRequestLevelMetricsEnabled());
+ }
+
+ public StatementContext(PhoenixStatement statement, ColumnResolver resolver, Scan scan, SequenceManager seqManager, boolean isRequestMetricsEnabled) {
this.statement = statement;
this.resolver = resolver;
this.scan = scan;
@@ -102,20 +118,24 @@ public class StatementContext {
this.aggregates = new AggregationManager();
this.expressions = new ExpressionManager();
PhoenixConnection connection = statement.getConnection();
- this.dateFormat = connection.getQueryServices().getProps().get(QueryServices.DATE_FORMAT_ATTRIB, DateUtil.DEFAULT_DATE_FORMAT);
+ ReadOnlyProps props = connection.getQueryServices().getProps();
+ this.dateFormat = props.get(QueryServices.DATE_FORMAT_ATTRIB, DateUtil.DEFAULT_DATE_FORMAT);
this.dateFormatter = DateUtil.getDateFormatter(dateFormat);
- this.timeFormat = connection.getQueryServices().getProps().get(QueryServices.TIME_FORMAT_ATTRIB, DateUtil.DEFAULT_TIME_FORMAT);
+ this.timeFormat = props.get(QueryServices.TIME_FORMAT_ATTRIB, DateUtil.DEFAULT_TIME_FORMAT);
this.timeFormatter = DateUtil.getTimeFormatter(timeFormat);
- this.timestampFormat = connection.getQueryServices().getProps().get(QueryServices.TIMESTAMP_FORMAT_ATTRIB, DateUtil.DEFAULT_TIMESTAMP_FORMAT);
+ this.timestampFormat = props.get(QueryServices.TIMESTAMP_FORMAT_ATTRIB, DateUtil.DEFAULT_TIMESTAMP_FORMAT);
this.timestampFormatter = DateUtil.getTimestampFormatter(timestampFormat);
- this.dateFormatTimeZone = TimeZone.getTimeZone(
- connection.getQueryServices().getProps().get(QueryServices.DATE_FORMAT_TIMEZONE_ATTRIB, DateUtil.DEFAULT_TIME_ZONE_ID));
- this.numberFormat = connection.getQueryServices().getProps().get(QueryServices.NUMBER_FORMAT_ATTRIB, NumberUtil.DEFAULT_NUMBER_FORMAT);
+ this.dateFormatTimeZone = TimeZone.getTimeZone(props.get(QueryServices.DATE_FORMAT_TIMEZONE_ATTRIB,
+ DateUtil.DEFAULT_TIME_ZONE_ID));
+ this.numberFormat = props.get(QueryServices.NUMBER_FORMAT_ATTRIB, NumberUtil.DEFAULT_NUMBER_FORMAT);
this.tempPtr = new ImmutableBytesWritable();
this.currentTable = resolver != null && !resolver.getTables().isEmpty() ? resolver.getTables().get(0) : null;
- this.whereConditionColumns = new ArrayList<Pair<byte[],byte[]>>();
- this.dataColumns = this.currentTable == null ? Collections.<PColumn, Integer>emptyMap() : Maps.<PColumn, Integer>newLinkedHashMap();
- this.subqueryResults = Maps.<SelectStatement, Object>newHashMap();
+ this.whereConditionColumns = new ArrayList<Pair<byte[], byte[]>>();
+ this.dataColumns = this.currentTable == null ? Collections.<PColumn, Integer> emptyMap() : Maps
+ .<PColumn, Integer> newLinkedHashMap();
+ this.subqueryResults = Maps.<SelectStatement, Object> newHashMap();
+ this.readMetricsQueue = new ReadMetricQueue(isRequestMetricsEnabled);
+ this.overAllQueryMetrics = new OverAllQueryMetrics(isRequestMetricsEnabled);
}
/**
@@ -285,4 +305,13 @@ public class StatementContext {
public void setSubqueryResult(SelectStatement select, Object result) {
subqueryResults.put(select, result);
}
+
+ public ReadMetricQueue getReadMetricsQueue() {
+ return readMetricsQueue;
+ }
+
+ public OverAllQueryMetrics getOverallQueryMetrics() {
+ return overAllQueryMetrics;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 2b35d4f..7b39a28 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -118,43 +118,40 @@ public class UpsertCompiler {
mutation.put(ptr, new RowMutationState(columnValues, statement.getConnection().getStatementExecutionCounter()));
}
- private static MutationState upsertSelect(PhoenixStatement statement,
- TableRef tableRef, RowProjector projector, ResultIterator iterator, int[] columnIndexes,
- int[] pkSlotIndexes) throws SQLException {
- try {
- PhoenixConnection connection = statement.getConnection();
- ConnectionQueryServices services = connection.getQueryServices();
- int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
- int batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
- boolean isAutoCommit = connection.getAutoCommit();
- byte[][] values = new byte[columnIndexes.length][];
- int rowCount = 0;
- Map<ImmutableBytesPtr,RowMutationState> mutation = Maps.newHashMapWithExpectedSize(batchSize);
- PTable table = tableRef.getTable();
- ResultSet rs = new PhoenixResultSet(iterator, projector, statement);
+ private static MutationState upsertSelect(StatementContext childContext, TableRef tableRef, RowProjector projector,
+ ResultIterator iterator, int[] columnIndexes, int[] pkSlotIndexes) throws SQLException {
+ PhoenixStatement statement = childContext.getStatement();
+ PhoenixConnection connection = statement.getConnection();
+ ConnectionQueryServices services = connection.getQueryServices();
+ int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,
+ QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
+ int batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
+ boolean isAutoCommit = connection.getAutoCommit();
+ byte[][] values = new byte[columnIndexes.length][];
+ int rowCount = 0;
+ Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(batchSize);
+ PTable table = tableRef.getTable();
+ try (ResultSet rs = new PhoenixResultSet(iterator, projector, childContext)) {
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
while (rs.next()) {
for (int i = 0; i < values.length; i++) {
PColumn column = table.getColumns().get(columnIndexes[i]);
- byte[] bytes = rs.getBytes(i+1);
+ byte[] bytes = rs.getBytes(i + 1);
ptr.set(bytes == null ? ByteUtil.EMPTY_BYTE_ARRAY : bytes);
- Object value = rs.getObject(i+1);
- int rsPrecision = rs.getMetaData().getPrecision(i+1);
+ Object value = rs.getObject(i + 1);
+ int rsPrecision = rs.getMetaData().getPrecision(i + 1);
Integer precision = rsPrecision == 0 ? null : rsPrecision;
- int rsScale = rs.getMetaData().getScale(i+1);
+ int rsScale = rs.getMetaData().getScale(i + 1);
Integer scale = rsScale == 0 ? null : rsScale;
// We are guaranteed that the two column will have compatible types,
// as we checked that before.
- if (!column.getDataType().isSizeCompatible(ptr, value, column.getDataType(),
- precision, scale,
- column.getMaxLength(),column.getScale())) {
- throw new SQLExceptionInfo.Builder(SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY)
- .setColumnName(column.getName().getString())
- .setMessage("value=" + column.getDataType().toStringLiteral(ptr, null)).build().buildException();
- }
- column.getDataType().coerceBytes(ptr, value, column.getDataType(),
- precision, scale, SortOrder.getDefault(),
- column.getMaxLength(), column.getScale(), column.getSortOrder());
+ if (!column.getDataType().isSizeCompatible(ptr, value, column.getDataType(), precision, scale,
+ column.getMaxLength(), column.getScale())) { throw new SQLExceptionInfo.Builder(
+ SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY).setColumnName(column.getName().getString())
+ .setMessage("value=" + column.getDataType().toStringLiteral(ptr, null)).build()
+ .buildException(); }
+ column.getDataType().coerceBytes(ptr, value, column.getDataType(), precision, scale,
+ SortOrder.getDefault(), column.getMaxLength(), column.getScale(), column.getSortOrder());
values[i] = ByteUtil.copyKeyBytesIfNecessary(ptr);
}
setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement);
@@ -169,8 +166,6 @@ public class UpsertCompiler {
}
// If auto commit is true, this last batch will be committed upon return
return new MutationState(tableRef, mutation, rowCount / batchSize * batchSize, maxSize, connection);
- } finally {
- iterator.close();
}
}
@@ -186,14 +181,21 @@ public class UpsertCompiler {
}
@Override
- protected MutationState mutate(StatementContext context, ResultIterator iterator, PhoenixConnection connection) throws SQLException {
- if (context.getSequenceManager().getSequenceCount() > 0) {
+ protected MutationState mutate(StatementContext parentContext, ResultIterator iterator, PhoenixConnection connection) throws SQLException {
+ if (parentContext.getSequenceManager().getSequenceCount() > 0) {
throw new IllegalStateException("Cannot pipeline upsert when sequence is referenced");
}
PhoenixStatement statement = new PhoenixStatement(connection);
+ /*
+ * We don't want to collect any read metrics within the child context. This is because any read metrics that
+ * need to be captured are already getting collected in the parent statement context enclosed in the result
+ * iterator being used for reading rows out.
+ */
+ StatementContext childContext = new StatementContext(statement, false);
// Clone the row projector as it's not thread safe and would be used simultaneously by
// multiple threads otherwise.
- return upsertSelect(statement, tableRef, projector.cloneIfNecessary(), iterator, columnIndexes, pkSlotIndexes);
+ MutationState state = upsertSelect(childContext, tableRef, projector.cloneIfNecessary(), iterator, columnIndexes, pkSlotIndexes);
+ return state;
}
public void setRowProjector(RowProjector projector) {
@@ -669,7 +671,7 @@ public class UpsertCompiler {
public MutationState execute() throws SQLException {
ResultIterator iterator = queryPlan.iterator();
if (parallelIteratorFactory == null) {
- return upsertSelect(statement, tableRef, projector, iterator, columnIndexes, pkSlotIndexes);
+ return upsertSelect(new StatementContext(statement), tableRef, projector, iterator, columnIndexes, pkSlotIndexes);
}
try {
parallelIteratorFactory.setRowProjector(projector);
@@ -677,13 +679,21 @@ public class UpsertCompiler {
parallelIteratorFactory.setPkSlotIndexes(pkSlotIndexes);
Tuple tuple;
long totalRowCount = 0;
+ StatementContext context = queryPlan.getContext();
while ((tuple=iterator.next()) != null) {// Runs query
Cell kv = tuple.getValue(0);
totalRowCount += PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(), kv.getValueOffset(), SortOrder.getDefault());
}
// Return total number of rows that have been updated. In the case of auto commit being off
// the mutations will all be in the mutation state of the current connection.
- return new MutationState(maxSize, statement.getConnection(), totalRowCount);
+ MutationState mutationState = new MutationState(maxSize, statement.getConnection(), totalRowCount);
+ /*
+ * All the metrics collected for measuring the reads done by the parallel mutating iterators
+ * is included in the ReadMetricHolder of the statement context. Include these metrics in the
+ * returned mutation state so they can be published on commit.
+ */
+ mutationState.setReadMetricQueue(context.getReadMetricsQueue());
+ return mutationState;
} finally {
iterator.close();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index ba137f8..00e843d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -102,7 +102,7 @@ public class AggregatePlan extends BaseQueryPlan {
this.services = services;
}
@Override
- public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException {
+ public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String tableName) throws SQLException {
Expression expression = RowKeyExpression.INSTANCE;
OrderByExpression orderByExpression = new OrderByExpression(expression, false, true);
int threshold = services.getProps().getInt(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
@@ -119,9 +119,9 @@ public class AggregatePlan extends BaseQueryPlan {
this.outerFactory = outerFactory;
}
@Override
- public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException {
- PeekingResultIterator iterator = innerFactory.newIterator(context, scanner, scan);
- return outerFactory.newIterator(context, iterator, scan);
+ public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String tableName) throws SQLException {
+ PeekingResultIterator iterator = innerFactory.newIterator(context, scanner, scan, tableName);
+ return outerFactory.newIterator(context, iterator, scan, tableName);
}
}