You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by mi...@apache.org on 2023/11/09 02:18:03 UTC
(phoenix) branch master updated: PHOENIX-7038 : Implement Connection Query Service Metrics (#1682)
This is an automated email from the ASF dual-hosted git repository.
mihir6692 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 5d5aaca213 PHOENIX-7038 : Implement Connection Query Service Metrics (#1682)
5d5aaca213 is described below
commit 5d5aaca21397f91b9f3fa682cd06da7798d9fc12
Author: Monani Mihir <mo...@gmail.com>
AuthorDate: Wed Nov 8 18:17:58 2023 -0800
PHOENIX-7038 : Implement Connection Query Service Metrics (#1682)
* PHOENIX-7038 Implement Connection Query Service Metrics
* build fix
* Get connection count values from ConnectionLimiter.java
* CheckStyle length fixes
* Checkstyle fixed
* NoOpConnectionQueryServicesMetricsManager when it's disabled
* Test fix
* Spotbugs and checkstyle fixes
* Update PhoenixRuntime.java
---------
Co-authored-by: Mihir Monani <mi...@apache.org>
---
.../end2end/RebuildIndexConnectionPropsIT.java | 4 +-
.../ConnectionQueryServicesMetricsIT.java | 366 +++++++++++++++++++++
.../org/apache/phoenix/jdbc/PhoenixConnection.java | 53 ++-
.../apache/phoenix/log/BaseConnectionLimiter.java | 15 +-
.../org/apache/phoenix/log/ConnectionLimiter.java | 4 +
.../apache/phoenix/monitoring/AtomicMetric.java | 10 +
.../phoenix/monitoring/CombinableMetric.java | 8 +
.../phoenix/monitoring/CombinableMetricImpl.java | 10 +
.../ConnectionQueryServicesMetric.java} | 29 +-
...java => ConnectionQueryServicesMetricImpl.java} | 25 +-
.../phoenix/monitoring/GlobalMetricImpl.java | 10 +
.../java/org/apache/phoenix/monitoring/Metric.java | 4 +
.../phoenix/monitoring/NoOpGlobalMetricImpl.java | 10 +
.../apache/phoenix/monitoring/NonAtomicMetric.java | 10 +
.../phoenix/monitoring/PhoenixTableMetricImpl.java | 10 +
.../ConnectionQueryServicesHistogram.java | 43 +++
.../ConnectionQueryServicesMetrics.java | 120 +++++++
.../ConnectionQueryServicesMetricsHistograms.java | 71 ++++
.../ConnectionQueryServicesMetricsManager.java | 343 +++++++++++++++++++
.../NoOpConnectionQueryServicesMetricsManager.java | 62 ++++
.../phoenix/query/ConnectionQueryServices.java | 1 +
.../phoenix/query/ConnectionQueryServicesImpl.java | 15 +-
.../query/ConnectionlessQueryServicesImpl.java | 5 +
.../query/DelegateConnectionQueryServices.java | 5 +
.../org/apache/phoenix/query/QueryServices.java | 12 +
.../apache/phoenix/query/QueryServicesOptions.java | 43 +++
.../org/apache/phoenix/util/PhoenixRuntime.java | 18 +
.../ConnectionQueryServicesHistogramTest.java | 73 ++++
...nnectionQueryServicesMetricsHistogramsTest.java | 37 +++
.../ConnectionQueryServicesMetricsManagerTest.java | 112 +++++++
.../ConnectionQueryServicesMetricsTest.java | 106 ++++++
.../ConnectionQueryServicesNameMetricsTest.java | 87 +++++
32 files changed, 1685 insertions(+), 36 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/RebuildIndexConnectionPropsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/RebuildIndexConnectionPropsIT.java
index 75ebfa8c35..a7000b69ce 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/RebuildIndexConnectionPropsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/RebuildIndexConnectionPropsIT.java
@@ -107,8 +107,8 @@ public class RebuildIndexConnectionPropsIT extends BaseTest {
rebuildQueryServicesConfig.get(HConstants.HBASE_CLIENT_RETRIES_NUMBER));
ConnectionQueryServices rebuildQueryServices = rebuildIndexConnection.getQueryServices();
Connection rebuildIndexHConnection =
- (Connection) Whitebox.getInternalState(rebuildQueryServices,
- "connection");
+ (Connection) Whitebox.getInternalState(Whitebox.getInternalState(rebuildQueryServices,
+ "parent"), "connection");
Connection regularHConnection =
(Connection) Whitebox.getInternalState(
regularConnection.getQueryServices(), "connection");
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsIT.java
new file mode 100644
index 0000000000..f7d065ec93
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsIT.java
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring.connectionqueryservice;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.monitoring.ConnectionQueryServicesMetric;
+import org.apache.phoenix.monitoring.HistogramDistribution;
+import org.apache.phoenix.monitoring.Metric;
+import org.apache.phoenix.monitoring.MetricType;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.ConfigurationFactory;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.InstanceResolver;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.phoenix.monitoring.MetricType.OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.OPEN_PHOENIX_CONNECTIONS_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.PHOENIX_CONNECTIONS_THROTTLED_COUNTER;
+import static org.apache.phoenix.query.QueryServices.CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS;
+import static org.apache.phoenix.query.QueryServices.CONNECTION_QUERY_SERVICE_METRICS_ENABLED;
+import static org.apache.phoenix.query.QueryServices.INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS;
+import static org.apache.phoenix.query.QueryServices.QUERY_SERVICES_NAME;
+import static org.apache.phoenix.util.PhoenixRuntime.clearAllConnectionQueryServiceMetrics;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class ConnectionQueryServicesMetricsIT extends BaseTest {
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(ConnectionQueryServicesMetricsIT.class);
+ private AtomicInteger counter = new AtomicInteger();
+ private static HBaseTestingUtility hbaseTestUtil;
+ private String tableName;
+ private static final String CONN_QUERY_SERVICE_1 = "CONN_QUERY_SERVICE_1";
+ private static final String
+ CONN_QUERY_SERVICE_CHECK_CONN_THROTTLE = "CONN_QUERY_SERVICE_CHECK_CONN_THROTTLE";
+ private static final String CONN_QUERY_SERVICE_2 = "CONN_QUERY_SERVICE_2";
+ private static final String CONN_QUERY_SERVICE_NULL = null;
+ private enum CompareOp {
+ LT, EQ, GT, LTEQ, GTEQ
+ }
+
+ @BeforeClass
+ public static void doSetup() throws Exception {
+ InstanceResolver.clearSingletons();
+ // Override to get required config for static fields loaded that require HBase config
+ InstanceResolver.getSingleton(ConfigurationFactory.class, new ConfigurationFactory() {
+
+ @Override public Configuration getConfiguration() {
+ Configuration conf = HBaseConfiguration.create();
+ conf.set(CONNECTION_QUERY_SERVICE_METRICS_ENABLED, String.valueOf(true));
+ // Without this config, unlimited connections are allowed from client and connection
+ // counter won't be increased at all. So we need to set max allowed connection count
+ conf.set(CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS, "2");
+ conf.set(INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS, "1");
+ return conf;
+ }
+
+ @Override public Configuration getConfiguration(Configuration confToClone) {
+ Configuration conf = HBaseConfiguration.create();
+ conf.set(CONNECTION_QUERY_SERVICE_METRICS_ENABLED, String.valueOf(true));
+ // Without this config, unlimited connections are allowed from client and connection
+ // counter won't be increased at all. So we need to set max allowed connection count
+ conf.set(CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS, "2");
+ conf.set(INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS, "1");
+ Configuration copy = new Configuration(conf);
+ copy.addResource(confToClone);
+ return copy;
+ }
+ });
+ Configuration conf = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+ hbaseTestUtil = new HBaseTestingUtility(conf);
+ setUpConfigForMiniCluster(conf);
+ conf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
+ QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
+ hbaseTestUtil.startMiniCluster();
+ // establish url and quorum. Need to use PhoenixDriver and not PhoenixTestDriver
+ String zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort();
+ url = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;
+ DriverManager.registerDriver(PhoenixDriver.INSTANCE);
+ }
+
+ @AfterClass
+ public static void tearDownMiniCluster() {
+ try {
+ if (hbaseTestUtil != null) {
+ hbaseTestUtil.shutdownMiniCluster();
+ }
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+
+ @Before
+ public void resetTableLevelMetrics() {
+ clearAllConnectionQueryServiceMetrics();
+ tableName = generateUniqueName();
+ }
+
+ @After
+ public void cleanUp() {
+ clearAllConnectionQueryServiceMetrics();
+ }
+
+ private String connUrlWithPrincipal(String principalName) {
+ return url + (principalName == null ? "" : PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + principalName);
+ }
+
+ @Test
+ public void testMultipleCQSIMetricsInParallel() throws Exception {
+ Thread csqi1 = new Thread(() -> {
+ try {
+ checkConnectionQueryServiceMetricsValues(CONN_QUERY_SERVICE_1);
+ // Increment counter for successful check
+ counter.incrementAndGet();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+ Thread csqi2 = new Thread(() -> {
+ try {
+ // We have set limit of 2 for phoenix connection counter in doSetup() function.
+ // For this one, we will create more than 2 connections and
+ // test that Connection Throttle Count Metric is also working as expected.
+ checkConnectionQueryServiceMetricsValues(CONN_QUERY_SERVICE_CHECK_CONN_THROTTLE);
+ } catch (Exception e) {
+ e.printStackTrace();
+ if(!e.getMessage().equals("This should not be thrown for "
+ + CONN_QUERY_SERVICE_CHECK_CONN_THROTTLE)) {
+ // Increment counter for successful check. For this Connection Query Service,
+ // code will throw error since it will try to create more than 2 connections.
+ // So we would count exception as success here and increment the counter.
+ counter.incrementAndGet();
+ }
+ }
+ });
+ Thread csqi3 = new Thread(() -> {
+ try {
+ checkConnectionQueryServiceMetricsValues(CONN_QUERY_SERVICE_2);
+ // Increment counter for successful check
+ counter.incrementAndGet();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+ Thread csqi4 = new Thread(() -> {
+ try {
+ // Test default CQS name
+ checkConnectionQueryServiceMetricsValues(CONN_QUERY_SERVICE_NULL);
+ // Increment counter for successful check
+ counter.incrementAndGet();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+
+ // Start Single Query Service Test
+ csqi1.start();
+ csqi1.join();
+
+ // Start 3 Query Service Test in parallel
+ csqi2.start();
+ csqi3.start();
+ csqi4.start();
+ csqi2.join();
+ csqi3.join();
+ csqi4.join();
+
+ // Check If all CSQI Metric check passed or not
+ assertEquals("Number of passing CSQI Metrics check should be : ",4, counter.get());
+ }
+
+ private void checkConnectionQueryServiceMetricsValues(
+ String queryServiceName) throws Exception {
+ String CREATE_TABLE_DDL = "CREATE TABLE IF NOT EXISTS %s (K VARCHAR(10) NOT NULL"
+ + " PRIMARY KEY, V VARCHAR)";
+ String princURL = connUrlWithPrincipal(queryServiceName);
+ LOGGER.info("Connection Query Service : " + queryServiceName + " URL : " + princURL);
+
+ String connQueryServiceName;
+ try (Connection conn = DriverManager.getConnection(princURL);
+ Statement stmt = conn.createStatement()) {
+ connQueryServiceName = conn.unwrap(PhoenixConnection.class).getQueryServices()
+ .getConfiguration().get(QUERY_SERVICES_NAME);
+ // When queryServiceName is passed as null, Phoenix will change query service name
+ // to DEFAULT_CQSN. That's why we are re-assigning the query service name here to check
+ // metric in finally block.
+ queryServiceName = connQueryServiceName;
+ stmt.execute(String.format(CREATE_TABLE_DDL, tableName + "_" + connQueryServiceName));
+ if (connQueryServiceName.equals(CONN_QUERY_SERVICE_CHECK_CONN_THROTTLE)) {
+ try(Connection conn1 = DriverManager.getConnection(princURL)) {
+ assertMetricValues(connQueryServiceName, 2, 0, 0);
+ assertHistogramMetricsForMutations(connQueryServiceName, 2, 0, 0,
+ 0);
+ try(Connection conn2 = DriverManager.getConnection(princURL)) {
+ // This should never execute in this test.
+ throw new RuntimeException("This should not be thrown for "
+ + CONN_QUERY_SERVICE_CHECK_CONN_THROTTLE);
+ }
+ }
+ } else {
+ // We only create one connection so,
+ // Open Connection Count : 1
+ // Open Internal Connection Count : 0
+ // Connection Throttled Count : 0
+ assertMetricValues(connQueryServiceName, 1, 0, 0);
+ assertHistogramMetricsForMutations(connQueryServiceName, 1, 0, 0, 0);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ } finally {
+ if (queryServiceName.equals(CONN_QUERY_SERVICE_CHECK_CONN_THROTTLE)) {
+ // We have closed the connection in try-resource-catch block so,
+ // Open Connection Count : 0
+ // Connection Throttled Count : 1
+ // Open Internal Connection Count : 0
+ assertMetricValues(queryServiceName, 0, 1, 0);
+ // In histogram, we will still have max open connection count as 2
+ // while rest of the values will be 0.
+ assertHistogramMetricsForMutations(queryServiceName, 2, 0, 0, 0);
+ } else {
+ // We have closed the connection in try-resource-catch block so,
+ // Open Connection Count : 0
+ // Connection Throttled Count : 0
+ // Open Internal Connection Count : 0
+ assertMetricValues(queryServiceName, 0, 0, 0);
+ // In histogram, we will still have max open connection count as 1 while rest of the values will be 0.
+ assertHistogramMetricsForMutations(queryServiceName, 1, 0, 0, 0);
+ }
+ }
+ }
+
+ /**
+ * check min/max value in histogram
+ * @param queryServiceName Connection Query Service Name
+ * @param oMaxValue Max value of {@link MetricType#OPEN_PHOENIX_CONNECTIONS_COUNTER}
+ * @param oMinValue Min value of {@link MetricType#OPEN_PHOENIX_CONNECTIONS_COUNTER}
+ * @param ioMaxValue Max value of {@link MetricType#OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER}
+ * @param ioMinValue Min value of {@link MetricType#OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER}
+ */
+ private void assertHistogramMetricsForMutations(
+ String queryServiceName, int oMaxValue, int oMinValue, int ioMaxValue, int ioMinValue) {
+ Map<String, List<HistogramDistribution>> listOfHistoDistribution =
+ PhoenixRuntime.getAllConnectionQueryServicesHistograms();
+ for(HistogramDistribution histo : listOfHistoDistribution.get(queryServiceName)) {
+ assertHistogram(histo, "PhoenixInternalOpenConn", ioMaxValue, ioMinValue,
+ CompareOp.EQ);
+ assertHistogram(histo, "PhoenixOpenConn", oMaxValue, oMinValue, CompareOp.EQ);
+ }
+ }
+
+ public void assertHistogram(HistogramDistribution histo, String histoName, long maxValue,
+ long minValue, CompareOp op) {
+ if (histo.getHistoName().equals(histoName)) {
+ switch (op) {
+ case EQ:
+ assertEquals(maxValue, histo.getMax());
+ assertEquals(minValue, histo.getMin());
+ break;
+ }
+ }
+ }
+
+ /**
+ * check metric value for connection query service
+ * @param queryServiceName Connection Query Service Name
+ * @param o {@link MetricType#OPEN_PHOENIX_CONNECTIONS_COUNTER}
+ * @param ct {@link MetricType#PHOENIX_CONNECTIONS_THROTTLED_COUNTER}
+ * @param io {@link MetricType#OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER}
+ */
+ public void assertMetricValues(String queryServiceName, int o, int ct, int io) {
+ Map<String, List<ConnectionQueryServicesMetric>> listOfMetrics =
+ PhoenixRuntime.getAllConnectionQueryServicesCounters();
+ /*
+ There are 3 metrics which are tracked as part of Phoenix Connection Query Service Metrics.
+ Defined here : {@link ConnectionQueryServicesMetrics.QueryServiceMetrics}
+ OPEN_PHOENIX_CONNECTIONS_COUNTER
+ OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER
+ PHOENIX_CONNECTIONS_THROTTLED_COUNTER
+ */
+ assertEquals(3, listOfMetrics.get(queryServiceName).size());
+ for (ConnectionQueryServicesMetric metric : listOfMetrics.get(queryServiceName)) {
+ assertMetricValue(metric, OPEN_PHOENIX_CONNECTIONS_COUNTER, o, CompareOp.EQ);
+ assertMetricValue(metric, PHOENIX_CONNECTIONS_THROTTLED_COUNTER, ct, CompareOp.EQ);
+ assertMetricValue(metric, OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER, io, CompareOp.EQ);
+ }
+ }
+
+ /**
+ * Check if metrics collection is empty.
+ */
+ public void assertMetricListIsEmpty() {
+ Map<String, List<ConnectionQueryServicesMetric>> listOfMetrics =
+ PhoenixRuntime.getAllConnectionQueryServicesCounters();
+ assertTrue(listOfMetrics.isEmpty());
+ }
+
+ /**
+ * Checks that if the metric is of the passed in type, it has the expected value
+ * (based on the CompareOp). If the metric type is different than checkType, ignore
+ * @param m metric to check
+ * @param checkType type to check for
+ * @param compareValue value to compare against
+ * @param op CompareOp
+ */
+ private static void assertMetricValue(Metric m, MetricType checkType, long compareValue,
+ CompareOp op) {
+ if (m.getMetricType().equals(checkType)) {
+ switch (op) {
+ case EQ:
+ assertEquals(compareValue, m.getValue());
+ break;
+ case LT:
+ assertTrue(m.getValue() < compareValue);
+ break;
+ case LTEQ:
+ assertTrue(m.getValue() <= compareValue);
+ break;
+ case GT:
+ assertTrue(m.getValue() > compareValue);
+ break;
+ case GTEQ:
+ assertTrue(m.getValue() >= compareValue);
+ break;
+ }
+ }
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index 493fdf5037..3c078d6ecc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -17,13 +17,13 @@
*/
package org.apache.phoenix.jdbc;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
+import static org.apache.phoenix.monitoring.MetricType.OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.OPEN_PHOENIX_CONNECTIONS_COUNTER;
+import static org.apache.phoenix.query.QueryServices.QUERY_SERVICES_NAME;
import static org.apache.phoenix.thirdparty.com.google.common.base.Preconditions.checkNotNull;
import static java.util.Collections.emptyMap;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_PHOENIX_CONNECTIONS;
-import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER;
-import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_PHOENIX_CONNECTIONS;
import java.io.EOFException;
import java.io.IOException;
@@ -88,6 +88,7 @@ import org.apache.phoenix.log.ConnectionActivityLogger;
import org.apache.phoenix.log.LogLevel;
import org.apache.phoenix.monitoring.MetricType;
import org.apache.phoenix.monitoring.TableMetricsManager;
+import org.apache.phoenix.monitoring.connectionqueryservice.ConnectionQueryServicesMetricsManager;
import org.apache.phoenix.parse.PFunction;
import org.apache.phoenix.parse.PSchema;
import org.apache.phoenix.query.ConnectionQueryServices;
@@ -398,13 +399,29 @@ public class PhoenixConnection implements MetaDataMutated, SQLCloseable, Phoenix
this.logSamplingRate = Double.parseDouble(this.services.getProps().get(QueryServices.LOG_SAMPLE_RATE,
QueryServicesOptions.DEFAULT_LOG_SAMPLE_RATE));
- if (isInternalConnection) {
- GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS.increment();
- } else {
- GLOBAL_OPEN_PHOENIX_CONNECTIONS.increment();
+ String connectionQueryServiceName =
+ this.services.getConfiguration().get(QUERY_SERVICES_NAME);
+ if (isInternalConnection) {
+ GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS.increment();
+ long currentInternalConnectionCount =
+ this.getQueryServices().getConnectionCount(isInternalConnection);
+ ConnectionQueryServicesMetricsManager.updateMetrics(connectionQueryServiceName,
+ OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER, currentInternalConnectionCount);
+ ConnectionQueryServicesMetricsManager
+ .updateConnectionQueryServiceOpenInternalConnectionHistogram(
+ currentInternalConnectionCount, connectionQueryServiceName);
+ } else {
+ GLOBAL_OPEN_PHOENIX_CONNECTIONS.increment();
+ long currentConnectionCount =
+ this.getQueryServices().getConnectionCount(isInternalConnection);
+ ConnectionQueryServicesMetricsManager.updateMetrics(connectionQueryServiceName,
+ OPEN_PHOENIX_CONNECTIONS_COUNTER, currentConnectionCount);
+ ConnectionQueryServicesMetricsManager
+ .updateConnectionQueryServiceOpenConnectionHistogram(currentConnectionCount,
+ connectionQueryServiceName);
}
- this.sourceOfOperation =
- this.services.getProps().get(QueryServices.SOURCE_OPERATION_ATTRIB, null);
+ this.sourceOfOperation = this.services.getProps()
+ .get(QueryServices.SOURCE_OPERATION_ATTRIB, null);
}
private static void checkScn(Long scnParam) throws SQLException {
@@ -756,6 +773,8 @@ public class PhoenixConnection implements MetaDataMutated, SQLCloseable, Phoenix
return;
}
+ String connectionQueryServiceName =
+ this.services.getConfiguration().get(QUERY_SERVICES_NAME);
try {
isClosing = true;
TableMetricsManager.pushMetricsFromConnInstanceMethod(getMutationMetrics());
@@ -781,10 +800,24 @@ public class PhoenixConnection implements MetaDataMutated, SQLCloseable, Phoenix
} finally {
isClosing = false;
isClosed = true;
- if(isInternalConnection()){
+ if (isInternalConnection()){
GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS.decrement();
+ long currentInternalConnectionCount =
+ this.getQueryServices().getConnectionCount(isInternalConnection());
+ ConnectionQueryServicesMetricsManager.updateMetrics(connectionQueryServiceName,
+ OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER, currentInternalConnectionCount);
+ ConnectionQueryServicesMetricsManager
+ .updateConnectionQueryServiceOpenInternalConnectionHistogram(
+ currentInternalConnectionCount, connectionQueryServiceName);
} else {
GLOBAL_OPEN_PHOENIX_CONNECTIONS.decrement();
+ long currentConnectionCount =
+ this.getQueryServices().getConnectionCount(isInternalConnection());
+ ConnectionQueryServicesMetricsManager.updateMetrics(connectionQueryServiceName,
+ OPEN_PHOENIX_CONNECTIONS_COUNTER, currentConnectionCount);
+ ConnectionQueryServicesMetricsManager
+ .updateConnectionQueryServiceOpenConnectionHistogram(
+ currentConnectionCount, connectionQueryServiceName);
}
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/BaseConnectionLimiter.java b/phoenix-core/src/main/java/org/apache/phoenix/log/BaseConnectionLimiter.java
index ad720c6155..ec07deb465 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/log/BaseConnectionLimiter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/BaseConnectionLimiter.java
@@ -21,6 +21,7 @@ import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.monitoring.MetricType;
+import org.apache.phoenix.monitoring.connectionqueryservice.ConnectionQueryServicesMetricsManager;
import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
@@ -29,6 +30,8 @@ import javax.annotation.concurrent.GuardedBy;
import java.sql.SQLException;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.PHOENIX_CONNECTIONS_THROTTLED_COUNTER;
+import static org.apache.phoenix.query.QueryServices.QUERY_SERVICES_NAME;
/**
* A base class for concrete implementation of ConnectionLimiter.
@@ -39,6 +42,7 @@ import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PHOENIX_C
public abstract class BaseConnectionLimiter implements ConnectionLimiter {
protected int connectionCount = 0;
protected int internalConnectionCount = 0;
+ protected int connectionThrottledCounter = 0;
protected String profileName;
protected int maxConnectionsAllowed;
protected int maxInternalConnectionsAllowed;
@@ -65,8 +69,15 @@ public abstract class BaseConnectionLimiter implements ConnectionLimiter {
// if throttling threshold is reached, try reclaiming garbage collected phoenix connections.
if ((allowedConnections != 0) && (futureConnections > allowedConnections) && (onSweep(connection.isInternalConnection()) == 0)) {
GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER.increment();
-
- //TODO:- After PHOENIX-7038 per profile Phoenix Throttled Counter should be updated here.
+ connectionThrottledCounter++;
+ String connectionQueryServiceName = connection.getQueryServices()
+ .getConfiguration().get(QUERY_SERVICES_NAME);
+ // Since this is ever-increasing counter and only gets reset at JVM restart
+ // Both global and connection query service level,
+ // we won't create histogram for this metric.
+ ConnectionQueryServicesMetricsManager.updateMetrics(
+ connectionQueryServiceName,
+ PHOENIX_CONNECTIONS_THROTTLED_COUNTER, connectionThrottledCounter);
// Let the concrete classes handle the onLimit.
// They can either throw the exception back or handle it.
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/ConnectionLimiter.java b/phoenix-core/src/main/java/org/apache/phoenix/log/ConnectionLimiter.java
index 3bc6d465b2..6e9f98b9b2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/log/ConnectionLimiter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/ConnectionLimiter.java
@@ -36,4 +36,8 @@ public interface ConnectionLimiter {
boolean isLastConnection();
boolean isShouldThrottleNumConnections();
+
+ int getConnectionCount();
+
+ int getInternalConnectionCount();
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/AtomicMetric.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/AtomicMetric.java
index 728e734da0..3368bceae9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/AtomicMetric.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/AtomicMetric.java
@@ -62,6 +62,16 @@ public class AtomicMetric implements Metric {
value.set(0);
}
+ /**
+ * Set the Metric value as current value
+ *
+ * @param value
+ */
+ @Override
+ public void set(long value) {
+ this.value.set(value);
+ }
+
@Override
public void decrement() {
value.decrementAndGet();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetric.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetric.java
index 07cd25dfcc..d1ed8ba4fd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetric.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetric.java
@@ -58,6 +58,14 @@ public interface CombinableMetric extends Metric {
@Override
public void reset() {}
+ /**
+ * Set the Metric value as current value
+ *
+ * @param value
+ */
+ @Override
+ public void set(long value) {}
+
@Override
public String getPublishString() {
return EMPTY_STRING;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetricImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetricImpl.java
index 40cb5166c3..c2a6e7ba8d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetricImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetricImpl.java
@@ -59,6 +59,16 @@ public class CombinableMetricImpl implements CombinableMetric, Cloneable {
metric.reset();
}
+ /**
+ * Set the Metric value as current value
+ *
+ * @param value
+ */
+ @Override
+ public void set(long value) {
+ metric.set(value);
+ }
+
@Override
public String getPublishString() {
return getCurrentMetricState();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/ConnectionLimiter.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ConnectionQueryServicesMetric.java
similarity index 54%
copy from phoenix-core/src/main/java/org/apache/phoenix/log/ConnectionLimiter.java
copy to phoenix-core/src/main/java/org/apache/phoenix/monitoring/ConnectionQueryServicesMetric.java
index 3bc6d465b2..75ba7e0131 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/log/ConnectionLimiter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ConnectionQueryServicesMetric.java
@@ -15,25 +15,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.phoenix.log;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-
-import java.sql.SQLException;
+package org.apache.phoenix.monitoring;
/**
- * This interface defines the contract for storing information about Phoenix connections
- * for debugging client-side issues like
- * {@link org.apache.phoenix.exception.SQLExceptionCode#NEW_CONNECTION_THROTTLED}
+ * Class that exposes the various phoenix metrics collected at the Phoenix Query Service level.
+ * Because metrics are dynamic in nature, it is not guaranteed that the state exposed will always
+ * be in sync with each other. One should use these metrics primarily for monitoring and debugging
+ * purposes.
*/
-public interface ConnectionLimiter {
-
- void acquireConnection(PhoenixConnection connection) throws SQLException;
-
- void returnConnection(PhoenixConnection connection);
+public interface ConnectionQueryServicesMetric extends Metric {
- int onSweep(boolean internal) ;
+ /**
+ * @return Number of samples collected since the last {@link #reset()} call.
+ */
+ long getNumberOfSamples();
- boolean isLastConnection();
+ /**
+ * @return Sum of the values of the metric sampled since the last {@link #reset()} call.
+ */
+ long getTotalSum();
- boolean isShouldThrottleNumConnections();
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixTableMetricImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ConnectionQueryServicesMetricImpl.java
similarity index 78%
copy from phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixTableMetricImpl.java
copy to phoenix-core/src/main/java/org/apache/phoenix/monitoring/ConnectionQueryServicesMetricImpl.java
index 91bade1bd7..bbefbf34d8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixTableMetricImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ConnectionQueryServicesMetricImpl.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -19,27 +19,40 @@ package org.apache.phoenix.monitoring;
import java.util.concurrent.atomic.AtomicLong;
-public class PhoenixTableMetricImpl implements PhoenixTableMetric {
+/**
+ * Metric class for Connection Query Services Metric.
+ */
+public class ConnectionQueryServicesMetricImpl implements ConnectionQueryServicesMetric {
private AtomicLong numberOfSamples = new AtomicLong(0);
private Metric metric;
/**
- * Default implementation used when TableLevel Metrics are enabled
+ * Default implementation used when Phoenix Connection Query Service Metrics are enabled
*/
- public PhoenixTableMetricImpl(MetricType type) {
+ public ConnectionQueryServicesMetricImpl(MetricType type) {
this.metric = new AtomicMetric(type);
}
/**
- * Reset the internal state. Typically called after metric information has been collected and a new phase of
- * collection is being requested for the next interval.
+ * Reset the internal state. Typically called after metric information has been
+ * collected and a new phase of collection is being requested for the next interval.
*/
@Override public void reset() {
metric.reset();
numberOfSamples.set(0);
}
+ /**
+ * Set the Metric value as current value
+ *
+ * @param value
+ */
+ @Override
+ public void set(long value) {
+ metric.set(value);
+ }
+
@Override public long getNumberOfSamples() {
return numberOfSamples.get();
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java
index 8c2128b2c3..ca19c580a2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java
@@ -33,6 +33,16 @@ public class GlobalMetricImpl implements GlobalMetric {
numberOfSamples.set(0);
}
+ /**
+ * Set the Metric value as current value
+ *
+ * @param value
+ */
+ @Override
+ public void set(long value) {
+ metric.set(value);
+ }
+
@Override
public long getNumberOfSamples() {
return numberOfSamples.get();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java
index 0e51fc02cb..1f3bdb858d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java
@@ -60,5 +60,9 @@ public interface Metric {
*/
public void reset();
+ /**
+ * Set the Metric value as current value
+ */
+ void set(long value);
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NoOpGlobalMetricImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NoOpGlobalMetricImpl.java
index 2dfe941d2f..d03b27e405 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NoOpGlobalMetricImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NoOpGlobalMetricImpl.java
@@ -64,4 +64,14 @@ public class NoOpGlobalMetricImpl implements GlobalMetric {
public void reset() {
}
+
+ /**
+ * Set the Metric value as current value
+ *
+ * @param value
+ */
+ @Override
+ public void set(long value) {
+
+ }
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NonAtomicMetric.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NonAtomicMetric.java
index 4e611c5674..77fe093a40 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NonAtomicMetric.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NonAtomicMetric.java
@@ -63,6 +63,16 @@ class NonAtomicMetric implements Metric {
value = 0;
}
+ /**
+ * Set the Metric value as current value
+ *
+ * @param value
+ */
+ @Override
+ public void set(long value) {
+ this.value = value;
+ }
+
@Override
public void decrement() {
value--;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixTableMetricImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixTableMetricImpl.java
index 91bade1bd7..0d6ef7868b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixTableMetricImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixTableMetricImpl.java
@@ -40,6 +40,16 @@ public class PhoenixTableMetricImpl implements PhoenixTableMetric {
numberOfSamples.set(0);
}
+ /**
+ * Set the Metric value as current value
+ *
+ * @param value
+ */
+ @Override
+ public void set(long value) {
+ metric.set(value);
+ }
+
@Override public long getNumberOfSamples() {
return numberOfSamples.get();
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesHistogram.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesHistogram.java
new file mode 100644
index 0000000000..883be7b145
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesHistogram.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring.connectionqueryservice;
+
+import static org.apache.phoenix.query.QueryServices.CONNECTION_QUERY_SERVICE_HISTOGRAM_SIZE_RANGES;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.monitoring.RangeHistogram;
+import org.apache.phoenix.query.QueryServices;
+
+/**
+ * Histogram for calculating phoenix connection. We read ranges using
+ * config property {@link QueryServices#PHOENIX_HISTOGRAM_SIZE_RANGES}.
+ * If this property is not set then it will default to DEFAULT_RANGE values.
+ */
+public class ConnectionQueryServicesHistogram extends RangeHistogram {
+ static final long[] DEFAULT_RANGE = {1, 10, 100, 500, 1000};
+ public ConnectionQueryServicesHistogram(String name, String description, Configuration conf) {
+ super(initializeRanges(conf), name, description);
+ }
+
+ private static long[] initializeRanges(Configuration conf) {
+ long[] ranges = PhoenixConfigurationUtil.getLongs(
+ conf, CONNECTION_QUERY_SERVICE_HISTOGRAM_SIZE_RANGES);
+ return ranges != null ? ranges : DEFAULT_RANGE;
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetrics.java
new file mode 100644
index 0000000000..b3a8a1c3c1
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetrics.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring.connectionqueryservice;
+
+import static org.apache.phoenix.monitoring.MetricType.OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.OPEN_PHOENIX_CONNECTIONS_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.PHOENIX_CONNECTIONS_THROTTLED_COUNTER;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.monitoring.ConnectionQueryServicesMetric;
+import org.apache.phoenix.monitoring.ConnectionQueryServicesMetricImpl;
+import org.apache.phoenix.monitoring.MetricType;
+
+/**
+ * Class for Connection Query Service Metrics.
+ */
+public class ConnectionQueryServicesMetrics {
+ /**
+ * List Metrics tracked in Connection Query Service Metrics
+ */
+ public enum QueryServiceMetrics {
+ CONNECTION_QUERY_SERVICE_OPEN_PHOENIX_CONNECTIONS_COUNTER(OPEN_PHOENIX_CONNECTIONS_COUNTER),
+ CONNECTION_QUERY_SERVICE_OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER(
+ OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER),
+ CONNECTION_QUERY_SERVICE_PHOENIX_CONNECTIONS_THROTTLED_COUNTER(
+ PHOENIX_CONNECTIONS_THROTTLED_COUNTER);
+
+ private MetricType metricType;
+ private ConnectionQueryServicesMetric metric;
+
+ QueryServiceMetrics(MetricType metricType) {
+ this.metricType = metricType;
+ }
+ }
+
+ private final String connectionQueryServiceName;
+ private Map<MetricType, ConnectionQueryServicesMetric> metricRegister;
+ private ConnectionQueryServicesMetricsHistograms connectionQueryServiceMetricsHistograms;
+
+ public ConnectionQueryServicesMetrics(final String connectionQueryServiceName,
+ Configuration conf) {
+ this.connectionQueryServiceName = connectionQueryServiceName;
+ metricRegister = new HashMap<>();
+ for (QueryServiceMetrics connectionQueryServiceMetric
+ : QueryServiceMetrics.values()) {
+ connectionQueryServiceMetric.metric =
+ new ConnectionQueryServicesMetricImpl(connectionQueryServiceMetric.metricType);
+ metricRegister.put(connectionQueryServiceMetric.metricType,
+ connectionQueryServiceMetric.metric);
+ }
+ connectionQueryServiceMetricsHistograms =
+ new ConnectionQueryServicesMetricsHistograms(connectionQueryServiceName, conf);
+ }
+
+ /**
+ * This function is used to update the value of Metric
+ * In case of counter val will be passed as 1.
+ *
+ * @param type metric type
+ * @param val update value. In case of counters, this will be 1
+ */
+ public void setMetricValue(MetricType type, long val) {
+ if (!metricRegister.containsKey(type)) {
+ return;
+ }
+ ConnectionQueryServicesMetric metric = metricRegister.get(type);
+ metric.set(val);
+ }
+
+ /**
+ * This function is used to get the value of Metric.
+ *
+ * @param type metric type
+ * @return val current value of metric.
+ */
+ public long getMetricValue(MetricType type) {
+ if (!metricRegister.containsKey(type)) {
+ return 0;
+ }
+ ConnectionQueryServicesMetric metric = metricRegister.get(type);
+ return metric.getValue();
+ }
+
+ public String getConnectionQueryServiceName() {
+ return connectionQueryServiceName;
+ }
+
+ /**
+ * This method is called to aggregate all the Metrics across all Connection Query Service.
+ *
+ * @return map of Connection Query Service name -> list of ConnectionQueryServicesMetric.
+ */
+ public List<ConnectionQueryServicesMetric> getAllMetrics() {
+ return new ArrayList<>(metricRegister.values());
+ }
+
+ public ConnectionQueryServicesMetricsHistograms getConnectionQueryServiceHistograms() {
+ return connectionQueryServiceMetricsHistograms;
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsHistograms.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsHistograms.java
new file mode 100644
index 0000000000..29e8a4b4f4
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsHistograms.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring.connectionqueryservice;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import edu.umd.cs.findbugs.annotations.SuppressWarnings;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.monitoring.HistogramDistribution;
+
+/**
+ * Histogram Metrics for Connection Query Service Metrics.
+ * 1. Connection count
+ * 2. Internal Connection Count.
+ */
+public class ConnectionQueryServicesMetricsHistograms {
+ private String connectionQueryServicesName;
+ private ConnectionQueryServicesHistogram connectionQueryServiceOpenInternalSizeHistogram;
+ private ConnectionQueryServicesHistogram connectionQueryServicesOpenConnSizeHistogram;
+
+ public ConnectionQueryServicesMetricsHistograms(String connectionQueryServiceName,
+ Configuration conf) {
+ connectionQueryServicesName = connectionQueryServiceName;
+ connectionQueryServiceOpenInternalSizeHistogram = new ConnectionQueryServicesHistogram(
+ "PhoenixInternalOpenConn",
+ "histogram for number of open internal phoenix connections", conf);
+ connectionQueryServicesOpenConnSizeHistogram = new ConnectionQueryServicesHistogram(
+ "PhoenixOpenConn", "histogram for number of open phoenix connections", conf);
+ }
+
+ public String getConnectionQueryServicesName() {
+ return this.connectionQueryServicesName;
+ }
+
+ @SuppressWarnings(value = "EI_EXPOSE_REP",
+ justification = "It's only used in internally for metrics storage")
+ public ConnectionQueryServicesHistogram getConnectionQueryServicesInternalOpenConnHisto() {
+ return connectionQueryServiceOpenInternalSizeHistogram;
+ }
+
+ @SuppressWarnings(value = "EI_EXPOSE_REP",
+ justification = "It's only used in internally for metrics storage")
+ public ConnectionQueryServicesHistogram getConnectionQueryServicesOpenConnHisto() {
+ return connectionQueryServicesOpenConnSizeHistogram;
+ }
+
+ public List<HistogramDistribution> getConnectionQueryServicesHistogramsDistribution() {
+ List<HistogramDistribution> list = new ArrayList(Arrays.asList(
+ this.connectionQueryServiceOpenInternalSizeHistogram.getRangeHistogramDistribution(),
+ this.connectionQueryServicesOpenConnSizeHistogram.getRangeHistogramDistribution()));
+ return Collections.unmodifiableList(list);
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsManager.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsManager.java
new file mode 100644
index 0000000000..0175924653
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsManager.java
@@ -0,0 +1,343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring.connectionqueryservice;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import edu.umd.cs.findbugs.annotations.SuppressWarnings;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.phoenix.monitoring.ConnectionQueryServicesMetric;
+import org.apache.phoenix.monitoring.HistogramDistribution;
+import org.apache.phoenix.monitoring.MetricPublisherSupplierFactory;
+import org.apache.phoenix.monitoring.MetricServiceResolver;
+import org.apache.phoenix.monitoring.MetricType;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Central place where we keep track of all the Connection Query Service metrics. Register each
+ * Connection Query Service and store the instance of it associated with ConnectionServiceName in a
+ * map This class exposes following functions as static functions to help catch all exception
+ * 1.clearAllConnectionQueryServiceMetrics
+ * 2.getConnectionQueryServicesMetrics
+ * 3.updateMetrics
+ */
+public class ConnectionQueryServicesMetricsManager {
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(ConnectionQueryServicesMetricsManager.class);
+ private static volatile boolean isConnectionQueryServiceMetricsEnabled;
+ private static volatile boolean isConnectionQueryServiceMetricPublisherEnabled;
+ private static ConcurrentMap<String, ConnectionQueryServicesMetrics>
+ connectionQueryServiceMetricsMapping;
+ // Singleton object
+ private static volatile ConnectionQueryServicesMetricsManager
+ connectionQueryServicesMetricsManager = null;
+ private static volatile MetricPublisherSupplierFactory mPublisher = null;
+ private static volatile QueryServicesOptions options;
+
+ @SuppressWarnings(value = "ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD", justification = "This " +
+ "Object is only created once for the JVM")
+ public ConnectionQueryServicesMetricsManager(QueryServicesOptions opts) {
+ options = opts;
+ connectionQueryServiceMetricsMapping = new ConcurrentHashMap<>();
+ isConnectionQueryServiceMetricsEnabled = options.isConnectionQueryServiceMetricsEnabled();
+ isConnectionQueryServiceMetricPublisherEnabled =
+ options.isConnectionQueryServiceMetricsPublisherEnabled();
+ LOGGER.info("Connection query service metrics enabled : "
+ + isConnectionQueryServiceMetricsEnabled + " publisher enabled : "
+ + isConnectionQueryServiceMetricPublisherEnabled);
+ }
+
+ @SuppressWarnings(value = "EI_EXPOSE_STATIC_REP2", justification = "Only used for testing")
+ public static void setInstance(ConnectionQueryServicesMetricsManager metricsManager) {
+ connectionQueryServicesMetricsManager = metricsManager;
+ }
+
+ /**
+ * Function to provide instance of ConnectionQueryServiceMetricsManager(Create if needed in
+ * thread safe manner)
+ * @return returns instance of ConnectionQueryServicesMetricsManager
+ */
+ @SuppressWarnings(value = "MS_EXPOSE_REP", justification = "Only used internally, not exposed" +
+ " to external client")
+ public static ConnectionQueryServicesMetricsManager getInstance() {
+ if (connectionQueryServicesMetricsManager == null) {
+ synchronized (ConnectionQueryServicesMetricsManager.class) {
+ if (connectionQueryServicesMetricsManager == null) {
+ QueryServicesOptions options = QueryServicesOptions.withDefaults();
+ if (options.isConnectionQueryServiceMetricsEnabled()) {
+ connectionQueryServicesMetricsManager =
+ new ConnectionQueryServicesMetricsManager(options);
+ LOGGER.info("Created object for Connection query service metrics manager");
+ } else {
+ connectionQueryServicesMetricsManager =
+ NoOpConnectionQueryServicesMetricsManager.NO_OP_CONN_QUERY_SERVICES_METRICS_MANAGER;
+ LOGGER.info("Created object for NoOp Connection query service metrics manager");
+ return connectionQueryServicesMetricsManager;
+ }
+ registerMetricsPublisher();
+ }
+ }
+ }
+ return connectionQueryServicesMetricsManager;
+ }
+
+ ConnectionQueryServicesMetricsManager() {
+
+ }
+
+ public static void registerMetricsPublisher() {
+ if (isConnectionQueryServiceMetricPublisherEnabled) {
+ String className = options.getConnectionQueryServiceMetricsPublisherClass();
+ if (className != null) {
+ MetricServiceResolver mResolver = new MetricServiceResolver();
+ LOGGER.info("Connection query service metrics publisher className "
+ + className);
+ try {
+ mPublisher = mResolver.instantiate(className);
+ mPublisher.registerMetricProvider();
+ } catch (Throwable e) {
+ LOGGER.error("The exception from metric publish Function", e);
+ }
+
+ } else {
+ LOGGER.warn("Connection query service metrics publisher className"
+ + " can't be null");
+ }
+ }
+ }
+
+ /**
+ * Function to provide Object of ConnectionQueryServicesMetrics (Create if needed in
+ * thread safe manner) for connectionQueryServiceName
+ * @param connectionQueryServiceName Connection Query Service Name
+ * @return returns instance of ConnectionQueryServicesMetrics for connectionQueryServiceName
+ */
+ ConnectionQueryServicesMetrics getConnectionQueryServiceMetricsInstance(
+ String connectionQueryServiceName) {
+ if (Strings.isNullOrEmpty(connectionQueryServiceName)) {
+ LOGGER.warn("Connection query service Name can't be null or empty");
+ return null;
+ }
+
+ ConnectionQueryServicesMetrics cqsInstance =
+ connectionQueryServiceMetricsMapping.get(connectionQueryServiceName);
+ if (cqsInstance == null) {
+ synchronized (ConnectionQueryServicesMetricsManager.class) {
+ cqsInstance = connectionQueryServiceMetricsMapping.get(connectionQueryServiceName);
+ if (cqsInstance == null) {
+
+ LOGGER.info("Creating connection query service metrics object for : "
+ + connectionQueryServiceName);
+ cqsInstance = new ConnectionQueryServicesMetrics(connectionQueryServiceName,
+ options.getConfiguration());
+ connectionQueryServiceMetricsMapping
+ .put(connectionQueryServiceName, cqsInstance);
+ }
+ }
+ }
+ return cqsInstance;
+ }
+
+ /**
+ * This function will be used to add individual MetricType to LocalStore. Also this will serve
+ * as LocalStore to store connection query service metrics before their current value is added
+ * to histogram.
+ * This func is only used for metrics which are counter based, where values increases or
+ * decreases frequently. Like Open Conn Counter. This function will first retrieve it's current
+ * value and increment or decrement (by +/-1) it as required then update the new values.
+ * <br>
+ * Example :- OPEN_PHOENIX_CONNECTIONS_COUNTER, OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER
+ * <br>
+ * <br>
+ * histogram will update with each increment/decrement.
+ * @param connectionQueryServiceName
+ * @param type
+ * @param value
+ */
+ void updateMetricsValue(String connectionQueryServiceName, MetricType type,
+ long value) {
+
+ long startTime = EnvironmentEdgeManager.currentTime();
+
+ ConnectionQueryServicesMetrics cqsInstance =
+ getConnectionQueryServiceMetricsInstance(connectionQueryServiceName);
+ if (cqsInstance == null) {
+ return;
+ }
+ cqsInstance.setMetricValue(type, value);
+
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("Connection query service metrics completed updating metric "
+ + type + " to value " + value + ", timetaken = "
+ + (EnvironmentEdgeManager.currentTime() - startTime));
+ }
+ }
+
+ /**
+ * static functions to push, update or retrieve ConnectionQueryService Metrics.
+ * @param connectionQueryServiceName name of the connection query service
+ * @param type type of metric
+ * @param value metric value
+ */
+ public static void updateMetrics(String connectionQueryServiceName, MetricType type,
+ long value) {
+ try {
+ ConnectionQueryServicesMetricsManager.getInstance()
+ .updateMetricsValue(connectionQueryServiceName, type, value);
+ } catch (Exception e) {
+ LOGGER.error("Failed updating connection query service metrics", e);
+ }
+ }
+
+ public static Map<String, List<ConnectionQueryServicesMetric>> getAllConnectionQueryServicesMetrics() {
+ return ConnectionQueryServicesMetricsManager.getInstance()
+ .getConnectionQueryServicesMetrics();
+ }
+
+ /**
+ * This function will return all the counters for Phoenix connection query service.
+ * @return Map of all ConnectionQueryService Metrics.
+ */
+ Map<String, List<ConnectionQueryServicesMetric>> getConnectionQueryServicesMetrics() {
+ try {
+ long startTime = EnvironmentEdgeManager.currentTime();
+ Map<String, List<ConnectionQueryServicesMetric>> map = new HashMap<>();
+ for (Map.Entry<String, ConnectionQueryServicesMetrics> entry
+ : connectionQueryServiceMetricsMapping.entrySet()) {
+ map.put(entry.getKey(), entry.getValue().getAllMetrics());
+ }
+ long timeTakenForMetricConversion = EnvironmentEdgeManager.currentTime() - startTime;
+ LOGGER.info("Connection query service metrics fetching complete, timeTaken: "
+ + timeTakenForMetricConversion);
+ return map;
+ } catch (Exception e) {
+ LOGGER.error("Failed retrieving connection query service Metrics", e);
+ }
+ return null;
+ }
+
+ public static Map<String, List<HistogramDistribution>> getHistogramsForAllConnectionQueryServices() {
+ return ConnectionQueryServicesMetricsManager.getInstance()
+ .getHistogramsForConnectionQueryServices();
+ }
+
+ /**
+ * This function will return histogram for all the Phoenix connection query service metrics.
+ * @return Map of all ConnectionServiceMetrics Histogram
+ */
+ Map<String, List<HistogramDistribution>> getHistogramsForConnectionQueryServices() {
+ Map<String, List<HistogramDistribution>> map = new HashMap<>();
+ for (Map.Entry<String, ConnectionQueryServicesMetrics> entry
+ : connectionQueryServiceMetricsMapping.entrySet()) {
+ ConnectionQueryServicesMetricsHistograms connectionQueryServiceHistogramsHistograms =
+ entry.getValue().getConnectionQueryServiceHistograms();
+ map.put(entry.getKey(), connectionQueryServiceHistogramsHistograms
+ .getConnectionQueryServicesHistogramsDistribution());
+ }
+ return map;
+ }
+
+ /**
+ * Function to update {@link MetricType#OPEN_PHOENIX_CONNECTIONS_COUNTER} counter value in
+ * Histogram
+ * @param connCount current count of
+ * {@link MetricType#OPEN_PHOENIX_CONNECTIONS_COUNTER}
+ * @param connectionQueryServiceName ConnectionQueryService name
+ */
+ public static void updateConnectionQueryServiceOpenConnectionHistogram(long connCount,
+ String connectionQueryServiceName) {
+ ConnectionQueryServicesMetrics metrics =
+ getInstance().getConnectionQueryServiceMetricsInstance(connectionQueryServiceName);
+ if (metrics == null) {
+ return;
+ }
+ metrics.getConnectionQueryServiceHistograms().getConnectionQueryServicesOpenConnHisto()
+ .add(connCount);
+ }
+
+ /**
+ * Function to update {@link MetricType#OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER} counter value
+ * in Histogram
+ * @param connCount current count of
+ * {@link
+ * MetricType#OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER}
+ * @param connectionQueryServiceName ConnectionQueryService name
+ */
+ public static void updateConnectionQueryServiceOpenInternalConnectionHistogram(long connCount,
+ String connectionQueryServiceName) {
+ ConnectionQueryServicesMetrics metrics =
+ getInstance().getConnectionQueryServiceMetricsInstance(connectionQueryServiceName);
+ if (metrics == null) {
+ return;
+ }
+ metrics.getConnectionQueryServiceHistograms()
+ .getConnectionQueryServicesInternalOpenConnHisto().add(connCount);
+ }
+
+ /////////////////////////////////////////////////////////
+ ////// Below Functions are majorly used in testing //////
+ /////////////////////////////////////////////////////////
+
+ public static ConnectionQueryServicesHistogram getConnectionQueryServiceOpenInternalConnectionHistogram(
+ String connectionQueryServiceName) {
+ ConnectionQueryServicesMetrics metrics =
+ getInstance().getConnectionQueryServiceMetricsInstance(connectionQueryServiceName);
+ if (metrics == null) {
+ return null;
+ }
+ return metrics.getConnectionQueryServiceHistograms()
+ .getConnectionQueryServicesInternalOpenConnHisto();
+ }
+
+ public static ConnectionQueryServicesHistogram
+ getConnectionQueryServiceOpenConnectionHistogram(String connectionQueryServiceName) {
+ ConnectionQueryServicesMetrics metrics =
+ getInstance().getConnectionQueryServiceMetricsInstance(connectionQueryServiceName);
+ if (metrics == null) {
+ return null;
+ }
+ return metrics.getConnectionQueryServiceHistograms()
+ .getConnectionQueryServicesOpenConnHisto();
+ }
+
+ /**
+ * Helps reset the localstore(connectionQueryServiceMetricsMapping)
+ */
+ void clearConnectionQueryServiceMetrics() {
+ if (connectionQueryServiceMetricsMapping != null) {
+ connectionQueryServiceMetricsMapping.clear();
+ }
+ LOGGER.info("Connection query service metrics clearing complete");
+ }
+
+ public static void clearAllConnectionQueryServiceMetrics() {
+ try {
+ ConnectionQueryServicesMetricsManager.getInstance()
+ .clearConnectionQueryServiceMetrics();
+ } catch (Exception e) {
+ LOGGER.error("Failed resetting connection query service Metrics", e);
+ }
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/NoOpConnectionQueryServicesMetricsManager.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/NoOpConnectionQueryServicesMetricsManager.java
new file mode 100644
index 0000000000..cf3ec00de3
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/NoOpConnectionQueryServicesMetricsManager.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring.connectionqueryservice;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.phoenix.monitoring.ConnectionQueryServicesMetric;
+import org.apache.phoenix.monitoring.HistogramDistribution;
+import org.apache.phoenix.monitoring.MetricType;
+
+/**
+ * ConnectionQueryServicesMetricsManager will be replaced by this class when
+ * {@link org.apache.phoenix.query.QueryServices#CONNECTION_QUERY_SERVICE_METRICS_ENABLED} flag is
+ * set to false.
+ */
+public class NoOpConnectionQueryServicesMetricsManager extends ConnectionQueryServicesMetricsManager {
+
+ public static final NoOpConnectionQueryServicesMetricsManager NO_OP_CONN_QUERY_SERVICES_METRICS_MANAGER =
+ new NoOpConnectionQueryServicesMetricsManager();
+
+ private NoOpConnectionQueryServicesMetricsManager() {
+ super();
+ }
+
+ void updateMetricsValue(String connectionQueryServiceName, MetricType type,
+ long value) {
+ }
+
+ Map<String, List<ConnectionQueryServicesMetric>> getConnectionQueryServicesMetrics() {
+ return Collections.emptyMap();
+ }
+
+ Map<String, List<HistogramDistribution>> getHistogramsForConnectionQueryServices() {
+ return Collections.emptyMap();
+ }
+
+ void clearConnectionQueryServiceMetrics() {
+
+ }
+
+ ConnectionQueryServicesMetrics getConnectionQueryServiceMetricsInstance(
+ String connectionQueryServiceName) {
+ return null;
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index b89ad3e435..b26c6cf75e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -226,4 +226,5 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated
throw new UnsupportedOperationException();
}
+ int getConnectionCount(boolean isInternal);
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index dcc29f980e..ad4d2a48e2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -306,7 +306,6 @@ import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.ServerUtil;
import org.apache.phoenix.util.StringUtil;
-import org.apache.phoenix.util.TimeKeeper;
import org.apache.phoenix.util.UpgradeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -455,6 +454,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
for (Entry<String,String> entry : connectionInfo.asProps()) {
config.set(entry.getKey(), entry.getValue());
}
+ if (connectionInfo.getPrincipal() != null) {
+ config.set(QUERY_SERVICES_NAME, connectionInfo.getPrincipal());
+ }
+ LOGGER.info(String.format("CQS initialized with connection query service : %s",
+ config.get(QUERY_SERVICES_NAME)));
this.connectionInfo = connectionInfo;
// Without making a copy of the configuration we cons up, we lose some of our properties
@@ -789,6 +793,15 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
return latestMetaData;
}
+ @Override
+ public int getConnectionCount(boolean isInternal) {
+ if (isInternal) {
+ return connectionLimiter.getInternalConnectionCount();
+ } else {
+ return connectionLimiter.getConnectionCount();
+ }
+ }
+
@Override
public void addTable(PTable table, long resolvedTime) throws SQLException {
synchronized (latestMetaDataLock) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 7da182857d..87f0bd63fe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -809,4 +809,9 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
public PMetaData getMetaDataCache() {
return metaData;
}
+
+ @Override
+ public int getConnectionCount(boolean isInternal) {
+ return 0;
+ }
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index 72ee10c215..46edad7ce9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -416,4 +416,9 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
public ConnectionLimiter getConnectionLimiter() {
return getDelegate().getConnectionLimiter();
}
+
+ @Override
+ public int getConnectionCount(boolean isInternal) {
+ return getDelegate().getConnectionCount(isInternal);
+ }
}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index d300929ac7..af2c518a1c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -56,6 +56,7 @@ public interface QueryServices extends SQLCloseable {
"phoenix.query.server.orderBy.spooling.enabled";
public static final String HBASE_CLIENT_KEYTAB = "hbase.myclient.keytab";
public static final String HBASE_CLIENT_PRINCIPAL = "hbase.myclient.principal";
+ String QUERY_SERVICES_NAME = "phoenix.query.services.name";
public static final String SPOOL_DIRECTORY = "phoenix.spool.directory";
public static final String AUTO_COMMIT_ATTRIB = "phoenix.connection.autoCommit";
// consistency configuration setting
@@ -392,6 +393,17 @@ public interface QueryServices extends SQLCloseable {
public static final String PHOENIX_HISTOGRAM_LATENCY_RANGES = "phoenix.histogram.latency.ranges";
// The range of bins for size metrics for histogram.
public static final String PHOENIX_HISTOGRAM_SIZE_RANGES = "phoenix.histogram.size.ranges";
+
+ // Connection Query Service Metrics Configs
+ String CONNECTION_QUERY_SERVICE_METRICS_ENABLED = "phoenix.conn.query.service.metrics.enabled";
+ String CONNECTION_QUERY_SERVICE_METRICS_PUBLISHER_CLASSNAME =
+ "phoenix.monitoring.connection.query.service.metricProvider.className";
+ String CONNECTION_QUERY_SERVICE_METRICS_PUBLISHER_ENABLED =
+ "phoenix.conn.query.service.metricsPublisher.enabled";
+ // The range of bins for Connection Query Service Metrics of histogram.
+ String CONNECTION_QUERY_SERVICE_HISTOGRAM_SIZE_RANGES =
+ "phoenix.conn.query.service.histogram.size.ranges";
+
// This config is used to move (copy and delete) the child links from the SYSTEM.CATALOG to SYSTEM.CHILD_LINK table.
// As opposed to a copy and async (out of band) delete.
public static final String MOVE_CHILD_LINKS_DURING_UPGRADE_ENABLED = "phoenix.move.child_link.during.upgrade";
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 81a2a29004..b39fb788a6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -30,6 +30,10 @@ import static org.apache.phoenix.query.QueryServices.COLLECT_REQUEST_LEVEL_METRI
import static org.apache.phoenix.query.QueryServices.COMMIT_STATS_ASYNC;
import static org.apache.phoenix.query.QueryServices.CONNECTION_ACTIVITY_LOGGING_ENABLED;
import static org.apache.phoenix.query.QueryServices.CONNECTION_ACTIVITY_LOGGING_INTERVAL;
+import static org.apache.phoenix.query.QueryServices.CONNECTION_QUERY_SERVICE_HISTOGRAM_SIZE_RANGES;
+import static org.apache.phoenix.query.QueryServices.CONNECTION_QUERY_SERVICE_METRICS_ENABLED;
+import static org.apache.phoenix.query.QueryServices.CONNECTION_QUERY_SERVICE_METRICS_PUBLISHER_ENABLED;
+import static org.apache.phoenix.query.QueryServices.CONNECTION_QUERY_SERVICE_METRICS_PUBLISHER_CLASSNAME;
import static org.apache.phoenix.query.QueryServices.COST_BASED_OPTIMIZER_ENABLED;
import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB;
import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_TIMEZONE_ATTRIB;
@@ -71,6 +75,7 @@ import static org.apache.phoenix.query.QueryServices.MIN_STATS_UPDATE_FREQ_MS_AT
import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK;
import static org.apache.phoenix.query.QueryServices.PHOENIX_ACLS_ENABLED;
+import static org.apache.phoenix.query.QueryServices.QUERY_SERVICES_NAME;
import static org.apache.phoenix.query.QueryServices.QUEUE_SIZE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.REGIONSERVER_INFO_PORT_ATTRIB;
import static org.apache.phoenix.query.QueryServices.RENEW_LEASE_ENABLED;
@@ -322,6 +327,14 @@ public class QueryServicesOptions {
public static final long DEFAULT_INDEX_POPULATION_SLEEP_TIME = 5000;
+ // Phoenix Connection Query Service configuration Defaults
+ public static final String DEFAULT_QUERY_SERVICES_NAME = "DEFAULT_CQSN";
+ public static final String DEFAULT_CONNECTION_QUERY_SERVICE_HISTOGRAM_SIZE_RANGES =
+ "1, 10, 100, 500, 1000";
+ public static final boolean DEFAULT_IS_CONNECTION_QUERY_SERVICE_METRICS_PUBLISHER_ENABLED =
+ false;
+ public static final boolean DEFAULT_IS_CONNECTION_QUERY_SERVICE_METRICS_ENABLED = false;
+
public static final boolean DEFAULT_RENEW_LEASE_ENABLED = true;
public static final int DEFAULT_RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS =
DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD / 2;
@@ -495,7 +508,14 @@ public class QueryServicesOptions {
.setIfUnset(CLIENT_METRICS_TAG, DEFAULT_CLIENT_METRICS_TAG)
.setIfUnset(CLIENT_INDEX_ASYNC_THRESHOLD, DEFAULT_CLIENT_INDEX_ASYNC_THRESHOLD)
.setIfUnset(PHOENIX_TTL_SERVER_SIDE_MASKING_ENABLED, DEFAULT_SERVER_SIDE_MASKING_ENABLED)
+ .setIfUnset(QUERY_SERVICES_NAME, DEFAULT_QUERY_SERVICES_NAME)
.setIfUnset(INDEX_CREATE_DEFAULT_STATE, DEFAULT_CREATE_INDEX_STATE)
+ .setIfUnset(CONNECTION_QUERY_SERVICE_HISTOGRAM_SIZE_RANGES,
+ DEFAULT_CONNECTION_QUERY_SERVICE_HISTOGRAM_SIZE_RANGES)
+ .setIfUnset(CONNECTION_QUERY_SERVICE_METRICS_ENABLED,
+ DEFAULT_IS_CONNECTION_QUERY_SERVICE_METRICS_ENABLED)
+ .setIfUnset(CONNECTION_QUERY_SERVICE_METRICS_PUBLISHER_ENABLED,
+ DEFAULT_IS_CONNECTION_QUERY_SERVICE_METRICS_PUBLISHER_ENABLED)
.setIfUnset(SKIP_SYSTEM_TABLES_EXISTENCE_CHECK,
DEFAULT_SKIP_SYSTEM_TABLES_EXISTENCE_CHECK)
.setIfUnset(MAX_IN_LIST_SKIP_SCAN_SIZE, DEFAULT_MAX_IN_LIST_SKIP_SCAN_SIZE)
@@ -754,6 +774,29 @@ public class QueryServicesOptions {
return config.getBoolean(METRIC_PUBLISHER_ENABLED, DEFAULT_IS_METRIC_PUBLISHER_ENABLED);
}
+ public boolean isConnectionQueryServiceMetricsEnabled() {
+ return config.getBoolean(CONNECTION_QUERY_SERVICE_METRICS_ENABLED,
+ DEFAULT_IS_CONNECTION_QUERY_SERVICE_METRICS_ENABLED);
+ }
+
+ public boolean isConnectionQueryServiceMetricsPublisherEnabled() {
+ return config.getBoolean(CONNECTION_QUERY_SERVICE_METRICS_PUBLISHER_ENABLED,
+ DEFAULT_IS_CONNECTION_QUERY_SERVICE_METRICS_PUBLISHER_ENABLED);
+ }
+
+ public String getQueryServicesName() {
+ return config.get(QUERY_SERVICES_NAME, DEFAULT_QUERY_SERVICES_NAME);
+ }
+
+ public void setConnectionQueryServiceMetricsEnabled() {
+ set(CONNECTION_QUERY_SERVICE_METRICS_ENABLED, true);
+ }
+
+ public String getConnectionQueryServiceMetricsPublisherClass() {
+ return config.get(CONNECTION_QUERY_SERVICE_METRICS_PUBLISHER_CLASSNAME,
+ DEFAULT_METRIC_PUBLISHER_CLASS_NAME);
+ }
+
@VisibleForTesting
public void setAllowedListForTableLevelMetrics(String tableNameList){
set(ALLOWED_LIST_FOR_TABLE_LEVEL_METRICS,tableNameList);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
index d4448a786c..f92a92c02b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -67,12 +67,14 @@ import org.apache.phoenix.jdbc.PhoenixMonitoredConnection;
import org.apache.phoenix.jdbc.PhoenixMonitoredResultSet;
import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.monitoring.ConnectionQueryServicesMetric;
import org.apache.phoenix.monitoring.GlobalClientMetrics;
import org.apache.phoenix.monitoring.GlobalMetric;
import org.apache.phoenix.monitoring.HistogramDistribution;
import org.apache.phoenix.monitoring.MetricType;
import org.apache.phoenix.monitoring.PhoenixTableMetric;
import org.apache.phoenix.monitoring.TableMetricsManager;
+import org.apache.phoenix.monitoring.connectionqueryservice.ConnectionQueryServicesMetricsManager;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.AmbiguousColumnException;
@@ -1465,6 +1467,22 @@ public class PhoenixRuntime {
return TableMetricsManager.getSizeHistogramsForAllTables();
}
+ public static Map<String, List<HistogramDistribution>> getAllConnectionQueryServicesHistograms() {
+ return ConnectionQueryServicesMetricsManager.getHistogramsForAllConnectionQueryServices();
+ }
+
+ public static Map<String, List<ConnectionQueryServicesMetric>> getAllConnectionQueryServicesCounters() {
+ return ConnectionQueryServicesMetricsManager.getAllConnectionQueryServicesMetrics();
+ }
+
+ /**
+ * This is only used in testcases to reset the connection query services Metrics data
+ */
+ @VisibleForTesting
+ public static void clearAllConnectionQueryServiceMetrics() {
+ ConnectionQueryServicesMetricsManager.clearAllConnectionQueryServiceMetrics();
+ }
+
/**
* This is only used in testcases to reset the tableLevel Metrics data
*/
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesHistogramTest.java b/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesHistogramTest.java
new file mode 100644
index 0000000000..5b3b107df7
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesHistogramTest.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring.connectionqueryservice;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.query.QueryServices;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ConnectionQueryServicesHistogramTest {
+
+ @Test
+ public void testConnectionQueryServiceHistogramRangeOverride() {
+ String histoName = "PhoenixInternalOpenConn";
+ Configuration conf = new Configuration();
+ conf.set(QueryServices.CONNECTION_QUERY_SERVICE_HISTOGRAM_SIZE_RANGES, "2, 5, 8");
+ ConnectionQueryServicesHistogram histogram = new ConnectionQueryServicesHistogram(histoName,
+ "histogram for Number of open internal phoenix connections", conf);
+ Assert.assertEquals(histoName, histogram.getName());
+ long[] ranges = histogram.getRanges();
+ Assert.assertNotNull(ranges);
+ long[] expectRanges = {2,5,8};
+ Assert.assertArrayEquals(expectRanges, ranges);
+ }
+
+ @Test
+ public void testEveryRangeInDefaultRange() {
+ //1, 3, 7, 9, 15, 30, 120, 600
+ Configuration conf = new Configuration();
+ String histoName = "PhoenixInternalOpenConn";
+ conf.unset(QueryServices.CONNECTION_QUERY_SERVICE_HISTOGRAM_SIZE_RANGES);
+ ConnectionQueryServicesHistogram histogram = new ConnectionQueryServicesHistogram(histoName,
+ "histogram for Number of open internal phoenix connections", conf);
+ Assert.assertEquals(histoName, histogram.getName());
+ Assert.assertEquals(ConnectionQueryServicesHistogram.DEFAULT_RANGE, histogram.getRanges());
+
+ histogram.add(1);
+ histogram.add(3);
+ histogram.add(7);
+ histogram.add(9);
+ histogram.add(15);
+ histogram.add(30);
+ histogram.add(120);
+ histogram.add(600);
+
+ Map<String, Long> distribution = histogram.getRangeHistogramDistribution().getRangeDistributionMap();
+ Map<String, Long> expectedMap = new HashMap<>();
+ expectedMap.put("0,1", 1l);
+ expectedMap.put("1,10", 3l);
+ expectedMap.put("10,100", 2l);
+ expectedMap.put("100,500", 1l);
+ expectedMap.put("500,1000", 1l);
+ Assert.assertEquals(expectedMap, distribution);
+ }
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsHistogramsTest.java b/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsHistogramsTest.java
new file mode 100644
index 0000000000..f9fa36375d
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsHistogramsTest.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring.connectionqueryservice;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ConnectionQueryServicesMetricsHistogramsTest {
+ @Test
+ public void testConnectionQueryServiceMetricsHistograms() {
+ String connectionQueryServiceName = "USE_CASE_1";
+ Configuration conf = new Configuration();
+ ConnectionQueryServicesMetricsHistograms
+ connectionQueryServiceMetricsHistograms = new ConnectionQueryServicesMetricsHistograms(connectionQueryServiceName, conf);
+ Assert.assertEquals(connectionQueryServiceName, connectionQueryServiceMetricsHistograms.getConnectionQueryServicesName());
+ Assert.assertNotNull(connectionQueryServiceMetricsHistograms.getConnectionQueryServicesOpenConnHisto());
+ Assert.assertNotNull(connectionQueryServiceMetricsHistograms.getConnectionQueryServicesInternalOpenConnHisto());
+
+ Assert.assertEquals(2, connectionQueryServiceMetricsHistograms.getConnectionQueryServicesHistogramsDistribution().size());
+ }
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsManagerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsManagerTest.java
new file mode 100644
index 0000000000..b2073433c4
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsManagerTest.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring.connectionqueryservice;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.monitoring.ConnectionQueryServicesMetric;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.phoenix.monitoring.MetricType.OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.OPEN_PHOENIX_CONNECTIONS_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.PHOENIX_CONNECTIONS_THROTTLED_COUNTER;
+import static org.apache.phoenix.monitoring.connectionqueryservice.ConnectionQueryServicesNameMetricsTest.connectionQueryServiceNames;
+import static org.apache.phoenix.monitoring.connectionqueryservice.ConnectionQueryServicesNameMetricsTest.openInternalPhoenixConnCounter;
+import static org.apache.phoenix.monitoring.connectionqueryservice.ConnectionQueryServicesNameMetricsTest.openPhoenixConnCounter;
+import static org.apache.phoenix.monitoring.connectionqueryservice.ConnectionQueryServicesNameMetricsTest.phoenixConnThrottledCounter;
+import static org.junit.Assert.assertTrue;
+
+public class ConnectionQueryServicesMetricsManagerTest {
+ public boolean verifyMetricsReset(){
+ Map<String, List<ConnectionQueryServicesMetric>> map =
+ ConnectionQueryServicesMetricsManager.getAllConnectionQueryServicesMetrics();
+ return map != null && map.isEmpty();
+ }
+
+ public boolean verifyConnectionQueryServiceNamesExists(String connectionQueryServiceName){
+ Map<String,List<ConnectionQueryServicesMetric>>map =
+ ConnectionQueryServicesMetricsManager.getAllConnectionQueryServicesMetrics();
+ return map != null && map.containsKey(connectionQueryServiceName);
+ }
+
+ @Test
+ public void testConnectionQueryServiceMetricsForUpdateMetricsMethod() {
+
+ QueryServicesOptions options = QueryServicesOptions.withDefaults();
+ options.setConnectionQueryServiceMetricsEnabled();
+ ConnectionQueryServicesMetricsManager connectionQueryServicesMetricsManager =
+ new ConnectionQueryServicesMetricsManager(options);
+ ConnectionQueryServicesMetricsManager.setInstance(connectionQueryServicesMetricsManager);
+
+ ConnectionQueryServicesNameMetricsTest
+ testData = new ConnectionQueryServicesNameMetricsTest();
+ testData.populateMetrics();
+ for(int i = 0; i < connectionQueryServiceNames.length; i++) {
+ ConnectionQueryServicesMetricsManager.updateMetrics(connectionQueryServiceNames[i],
+ OPEN_PHOENIX_CONNECTIONS_COUNTER, openPhoenixConnCounter[i]);
+ ConnectionQueryServicesMetricsManager.updateMetrics(connectionQueryServiceNames[i],
+ OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER, openInternalPhoenixConnCounter[i]);
+ ConnectionQueryServicesMetricsManager.updateMetrics(connectionQueryServiceNames[i],
+ PHOENIX_CONNECTIONS_THROTTLED_COUNTER, phoenixConnThrottledCounter[i]);
+ }
+ testData.verfiyCountOfConnectionQueryServices(connectionQueryServiceNames.length);
+ ConnectionQueryServicesMetricsManager.clearAllConnectionQueryServiceMetrics();
+ assertTrue(verifyMetricsReset());
+ }
+
+ @Test
+ public void testHistogramMetricsForOpenPhoenixConnectionCounter() {
+ String connectionQueryServiceName = "USE_CASE_1";
+ Configuration conf = new Configuration();
+ conf.set(QueryServices.CONNECTION_QUERY_SERVICE_HISTOGRAM_SIZE_RANGES, "3, 6, 9");
+
+ QueryServicesOptions mockOptions = Mockito.mock(QueryServicesOptions.class);
+ Mockito.doReturn(true).when(mockOptions)
+ .isConnectionQueryServiceMetricsEnabled();
+ Mockito.doReturn(conf).when(mockOptions).getConfiguration();
+ ConnectionQueryServicesMetricsManager connectionQueryServicesMetricsManager =
+ new ConnectionQueryServicesMetricsManager(mockOptions);
+ ConnectionQueryServicesMetricsManager.setInstance(connectionQueryServicesMetricsManager);
+ for (int i=0; i<9; i++) {
+ updateMetricsAndHistogram(i+1, connectionQueryServiceName);
+ }
+
+
+ // Generate distribution map from histogram snapshots.
+ ConnectionQueryServicesHistogram connectionQueryServicesHistogram =
+ ConnectionQueryServicesMetricsManager.getConnectionQueryServiceOpenConnectionHistogram(connectionQueryServiceName);
+
+ Map<String, Long> openPhoenixConnMap = connectionQueryServicesHistogram.getRangeHistogramDistribution().getRangeDistributionMap();
+ for (Long count: openPhoenixConnMap.values()) {
+ Assert.assertEquals(new Long(3), count);
+ }
+ }
+
+ private void updateMetricsAndHistogram (long counter, String connectionQueryServiceName) {
+ ConnectionQueryServicesMetricsManager.updateMetrics(connectionQueryServiceName,
+ OPEN_PHOENIX_CONNECTIONS_COUNTER, counter);
+ ConnectionQueryServicesMetricsManager.updateConnectionQueryServiceOpenConnectionHistogram(counter,
+ connectionQueryServiceName);
+ }
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsTest.java b/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsTest.java
new file mode 100644
index 0000000000..d05a6824b3
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsTest.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring.connectionqueryservice;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.monitoring.ConnectionQueryServicesMetric;
+import org.apache.phoenix.monitoring.MetricType;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+import static org.apache.phoenix.monitoring.connectionqueryservice.ConnectionQueryServicesNameMetricsTest.openInternalPhoenixConnCounter;
+import static org.apache.phoenix.monitoring.connectionqueryservice.ConnectionQueryServicesNameMetricsTest.openPhoenixConnCounter;
+import static org.apache.phoenix.monitoring.connectionqueryservice.ConnectionQueryServicesNameMetricsTest.connectionQueryServiceNames;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ConnectionQueryServicesMetricsTest {
+ static Map<String, ConnectionQueryServicesMetrics> phoenixConnectionQueryServiceSet =
+ new HashMap<>();
+
+ public boolean verifyConnectionQueryServiceName() {
+
+ if (phoenixConnectionQueryServiceSet.isEmpty()) {
+ return false;
+ }
+ for (String connectionQueryServiceName : connectionQueryServiceNames) {
+ ConnectionQueryServicesMetrics instance =
+ phoenixConnectionQueryServiceSet.get(connectionQueryServiceName);
+ if (!instance.getConnectionQueryServiceName().equals(connectionQueryServiceName)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public void verifyMetricsFromPhoenixConnectionQueryServiceMetrics() {
+ assertFalse(phoenixConnectionQueryServiceSet.isEmpty());
+ for (int i = 0; i < connectionQueryServiceNames.length; i++) {
+ ConnectionQueryServicesMetrics instance =
+ phoenixConnectionQueryServiceSet.get(connectionQueryServiceNames[i]);
+ assertEquals(instance.getConnectionQueryServiceName(), connectionQueryServiceNames[i]);
+ List<ConnectionQueryServicesMetric> metricList = instance.getAllMetrics();
+ for (ConnectionQueryServicesMetric metric : metricList) {
+
+ if (metric.getMetricType()
+ .equals(MetricType.OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER)) {
+ assertEquals(openInternalPhoenixConnCounter[i], metric.getValue());
+ }
+ if (metric.getMetricType().equals(MetricType.OPEN_PHOENIX_CONNECTIONS_COUNTER)) {
+ assertEquals(openPhoenixConnCounter[i], metric.getValue());
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testPhoenixConnectionQueryServiceMetricsForPhoenixConnectionQueryServiceName() {
+ Configuration conf = new Configuration();
+ for (int i = 0; i < connectionQueryServiceNames.length; i++) {
+ ConnectionQueryServicesMetrics instance =
+ new ConnectionQueryServicesMetrics(connectionQueryServiceNames[i], conf);
+ phoenixConnectionQueryServiceSet.put(connectionQueryServiceNames[i], instance);
+ }
+ assertTrue(verifyConnectionQueryServiceName());
+ }
+
+ /**
+ * This test is for changeMetricValue() Method and getMetricMap()
+ */
+ @Test
+ public void testPhoenixConnectionQueryServiceMetrics() {
+ Configuration conf = new Configuration();
+ for (int i = 0; i < connectionQueryServiceNames.length; i++) {
+ ConnectionQueryServicesMetrics instance =
+ new ConnectionQueryServicesMetrics(connectionQueryServiceNames[i], conf);
+ phoenixConnectionQueryServiceSet.put(connectionQueryServiceNames[i], instance);
+
+ instance.setMetricValue(MetricType.OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER,
+ openInternalPhoenixConnCounter[i]);
+ instance.setMetricValue(
+ MetricType.OPEN_PHOENIX_CONNECTIONS_COUNTER, openPhoenixConnCounter[i]);
+ }
+ verifyMetricsFromPhoenixConnectionQueryServiceMetrics();
+ phoenixConnectionQueryServiceSet.clear();
+ }
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesNameMetricsTest.java b/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesNameMetricsTest.java
new file mode 100644
index 0000000000..6a1d26e05f
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesNameMetricsTest.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring.connectionqueryservice;
+
+import org.apache.phoenix.monitoring.ConnectionQueryServicesMetric;
+import org.apache.phoenix.monitoring.MetricType;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.phoenix.monitoring.MetricType.OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.OPEN_PHOENIX_CONNECTIONS_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.PHOENIX_CONNECTIONS_THROTTLED_COUNTER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This class is used primarily to populate data and
+ * verification methods
+ */
+
+public class ConnectionQueryServicesNameMetricsTest {
+
+ public static final String[] connectionQueryServiceNames =
+ { "USE_CASE_1", "USE_CASE_2", "USE_CASE_3" };
+ public static Map<String, Map<MetricType, Long>>[] connectionQueryServiceNameMetricMap =
+ new Map[connectionQueryServiceNames.length];
+ public static final long[] openPhoenixConnCounter = { 1, 1, 1 };
+ public static final long[] openInternalPhoenixConnCounter = { 1, 1, 1 };
+ public static final long[] phoenixConnThrottledCounter = { 1, 2, 3 };
+
+
+ public void populateMetrics() {
+ for (int i = 0; i < connectionQueryServiceNameMetricMap.length; i++) {
+ connectionQueryServiceNameMetricMap[i] = new HashMap<>();
+ }
+ for (int i = 0; i < connectionQueryServiceNames.length; i++) {
+ Map<MetricType, Long> metrics = new HashMap<>();
+ metrics.put(OPEN_PHOENIX_CONNECTIONS_COUNTER, openPhoenixConnCounter[i]);
+ metrics.put(
+ OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER, openInternalPhoenixConnCounter[i]);
+ metrics.put(PHOENIX_CONNECTIONS_THROTTLED_COUNTER, phoenixConnThrottledCounter[i]);
+
+ connectionQueryServiceNameMetricMap[i].put(connectionQueryServiceNames[i], metrics);
+ }
+ }
+
+ public void verfiyCountOfConnectionQueryServices(int noOfConnectionQueryServiceName) {
+ Map<String, List<ConnectionQueryServicesMetric>> map =
+ ConnectionQueryServicesMetricsManager.getAllConnectionQueryServicesMetrics();
+ assertFalse(map == null || map.isEmpty());
+ for (int i = 0; i < noOfConnectionQueryServiceName; i++) {
+ assertTrue(map.containsKey(connectionQueryServiceNames[i]));
+ List<ConnectionQueryServicesMetric> connectionQueryServiceNameMetric =
+ map.get(connectionQueryServiceNames[i]);
+ for (ConnectionQueryServicesMetric metric : connectionQueryServiceNameMetric) {
+ if (metric.getMetricType().equals(OPEN_PHOENIX_CONNECTIONS_COUNTER)) {
+ assertEquals(openPhoenixConnCounter[i], metric.getValue());
+ }
+ if (metric.getMetricType().equals(OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER)) {
+ assertEquals(openInternalPhoenixConnCounter[i], metric.getValue());
+ }
+ if (metric.getMetricType().equals(PHOENIX_CONNECTIONS_THROTTLED_COUNTER)) {
+ assertEquals(phoenixConnThrottledCounter[i], metric.getValue());
+ }
+ }
+ }
+ }
+
+}