You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by mi...@apache.org on 2023/11/09 02:18:03 UTC

(phoenix) branch master updated: PHOENIX-7038 : Implement Connection Query Service Metrics (#1682)

This is an automated email from the ASF dual-hosted git repository.

mihir6692 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d5aaca213 PHOENIX-7038 : Implement Connection Query Service Metrics (#1682)
5d5aaca213 is described below

commit 5d5aaca21397f91b9f3fa682cd06da7798d9fc12
Author: Monani Mihir <mo...@gmail.com>
AuthorDate: Wed Nov 8 18:17:58 2023 -0800

    PHOENIX-7038 : Implement Connection Query Service Metrics (#1682)
    
    * PHOENIX-7038 Implement Connection Query Service Metrics
    
    * build fix
    
    * Get connection count values from ConnectionLimiter.java
    
    * CheckStyle length fixes
    
    * Checkstyle fixed
    
    * NoOpConnectionQueryServicesMetricsManager when it's disabled
    
    * Test fix
    
    * Spotbugs and checkstyle fixes
    
    * Update PhoenixRuntime.java
    
    ---------
    
    Co-authored-by: Mihir Monani <mi...@apache.org>
---
 .../end2end/RebuildIndexConnectionPropsIT.java     |   4 +-
 .../ConnectionQueryServicesMetricsIT.java          | 366 +++++++++++++++++++++
 .../org/apache/phoenix/jdbc/PhoenixConnection.java |  53 ++-
 .../apache/phoenix/log/BaseConnectionLimiter.java  |  15 +-
 .../org/apache/phoenix/log/ConnectionLimiter.java  |   4 +
 .../apache/phoenix/monitoring/AtomicMetric.java    |  10 +
 .../phoenix/monitoring/CombinableMetric.java       |   8 +
 .../phoenix/monitoring/CombinableMetricImpl.java   |  10 +
 .../ConnectionQueryServicesMetric.java}            |  29 +-
 ...java => ConnectionQueryServicesMetricImpl.java} |  25 +-
 .../phoenix/monitoring/GlobalMetricImpl.java       |  10 +
 .../java/org/apache/phoenix/monitoring/Metric.java |   4 +
 .../phoenix/monitoring/NoOpGlobalMetricImpl.java   |  10 +
 .../apache/phoenix/monitoring/NonAtomicMetric.java |  10 +
 .../phoenix/monitoring/PhoenixTableMetricImpl.java |  10 +
 .../ConnectionQueryServicesHistogram.java          |  43 +++
 .../ConnectionQueryServicesMetrics.java            | 120 +++++++
 .../ConnectionQueryServicesMetricsHistograms.java  |  71 ++++
 .../ConnectionQueryServicesMetricsManager.java     | 343 +++++++++++++++++++
 .../NoOpConnectionQueryServicesMetricsManager.java |  62 ++++
 .../phoenix/query/ConnectionQueryServices.java     |   1 +
 .../phoenix/query/ConnectionQueryServicesImpl.java |  15 +-
 .../query/ConnectionlessQueryServicesImpl.java     |   5 +
 .../query/DelegateConnectionQueryServices.java     |   5 +
 .../org/apache/phoenix/query/QueryServices.java    |  12 +
 .../apache/phoenix/query/QueryServicesOptions.java |  43 +++
 .../org/apache/phoenix/util/PhoenixRuntime.java    |  18 +
 .../ConnectionQueryServicesHistogramTest.java      |  73 ++++
 ...nnectionQueryServicesMetricsHistogramsTest.java |  37 +++
 .../ConnectionQueryServicesMetricsManagerTest.java | 112 +++++++
 .../ConnectionQueryServicesMetricsTest.java        | 106 ++++++
 .../ConnectionQueryServicesNameMetricsTest.java    |  87 +++++
 32 files changed, 1685 insertions(+), 36 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/RebuildIndexConnectionPropsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/RebuildIndexConnectionPropsIT.java
index 75ebfa8c35..a7000b69ce 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/RebuildIndexConnectionPropsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/RebuildIndexConnectionPropsIT.java
@@ -107,8 +107,8 @@ public class RebuildIndexConnectionPropsIT extends BaseTest {
                     rebuildQueryServicesConfig.get(HConstants.HBASE_CLIENT_RETRIES_NUMBER));
                 ConnectionQueryServices rebuildQueryServices = rebuildIndexConnection.getQueryServices();
                 Connection rebuildIndexHConnection =
-                        (Connection) Whitebox.getInternalState(rebuildQueryServices,
-                            "connection");
+                        (Connection) Whitebox.getInternalState(Whitebox.getInternalState(rebuildQueryServices,
+                        "parent"), "connection");
                 Connection regularHConnection =
                         (Connection) Whitebox.getInternalState(
                             regularConnection.getQueryServices(), "connection");
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsIT.java
new file mode 100644
index 0000000000..f7d065ec93
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsIT.java
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring.connectionqueryservice;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.monitoring.ConnectionQueryServicesMetric;
+import org.apache.phoenix.monitoring.HistogramDistribution;
+import org.apache.phoenix.monitoring.Metric;
+import org.apache.phoenix.monitoring.MetricType;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.ConfigurationFactory;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.InstanceResolver;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.phoenix.monitoring.MetricType.OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.OPEN_PHOENIX_CONNECTIONS_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.PHOENIX_CONNECTIONS_THROTTLED_COUNTER;
+import static org.apache.phoenix.query.QueryServices.CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS;
+import static org.apache.phoenix.query.QueryServices.CONNECTION_QUERY_SERVICE_METRICS_ENABLED;
+import static org.apache.phoenix.query.QueryServices.INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS;
+import static org.apache.phoenix.query.QueryServices.QUERY_SERVICES_NAME;
+import static org.apache.phoenix.util.PhoenixRuntime.clearAllConnectionQueryServiceMetrics;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class ConnectionQueryServicesMetricsIT extends BaseTest {
+    private static final Logger LOGGER =
+            LoggerFactory.getLogger(ConnectionQueryServicesMetricsIT.class);
+    private AtomicInteger counter = new AtomicInteger();
+    private static HBaseTestingUtility hbaseTestUtil;
+    private String tableName;
+    private static final String CONN_QUERY_SERVICE_1 = "CONN_QUERY_SERVICE_1";
+    private static final String
+            CONN_QUERY_SERVICE_CHECK_CONN_THROTTLE = "CONN_QUERY_SERVICE_CHECK_CONN_THROTTLE";
+    private static final String CONN_QUERY_SERVICE_2 = "CONN_QUERY_SERVICE_2";
+    private static final String CONN_QUERY_SERVICE_NULL = null;
+    private enum CompareOp {
+        LT, EQ, GT, LTEQ, GTEQ
+    }
+
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        InstanceResolver.clearSingletons();
+        // Override to get required config for static fields loaded that require HBase config
+        InstanceResolver.getSingleton(ConfigurationFactory.class, new ConfigurationFactory() {
+
+            @Override public Configuration getConfiguration() {
+                Configuration conf = HBaseConfiguration.create();
+                conf.set(CONNECTION_QUERY_SERVICE_METRICS_ENABLED, String.valueOf(true));
+                // Without this config, unlimited connections are allowed from client and connection
+                // counter won't be increased at all. So we need to set max allowed connection count
+                conf.set(CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS, "2");
+                conf.set(INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS, "1");
+                return conf;
+            }
+
+            @Override public Configuration getConfiguration(Configuration confToClone) {
+                Configuration conf = HBaseConfiguration.create();
+                conf.set(CONNECTION_QUERY_SERVICE_METRICS_ENABLED, String.valueOf(true));
+                // Without this config, unlimited connections are allowed from client and connection
+                // counter won't be increased at all. So we need to set max allowed connection count
+                conf.set(CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS, "2");
+                conf.set(INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS, "1");
+                Configuration copy = new Configuration(conf);
+                copy.addResource(confToClone);
+                return copy;
+            }
+        });
+        Configuration conf = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+        hbaseTestUtil = new HBaseTestingUtility(conf);
+        setUpConfigForMiniCluster(conf);
+        conf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
+                QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
+        hbaseTestUtil.startMiniCluster();
+        // establish url and quorum. Need to use PhoenixDriver and not PhoenixTestDriver
+        String zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort();
+        url = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;
+        DriverManager.registerDriver(PhoenixDriver.INSTANCE);
+    }
+
+    @AfterClass
+    public static void tearDownMiniCluster() {
+        try {
+            if (hbaseTestUtil != null) {
+                hbaseTestUtil.shutdownMiniCluster();
+            }
+        } catch (Exception e) {
+            // ignore
+        }
+    }
+
+    @Before
+    public void resetTableLevelMetrics() {
+        clearAllConnectionQueryServiceMetrics();
+        tableName = generateUniqueName();
+    }
+
+    @After
+    public void cleanUp() {
+        clearAllConnectionQueryServiceMetrics();
+    }
+
+    private String connUrlWithPrincipal(String principalName) {
+        return url + (principalName == null ? "" : PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + principalName);
+    }
+
+    @Test
+    public void testMultipleCQSIMetricsInParallel() throws Exception {
+        Thread csqi1 = new Thread(() -> {
+            try {
+                checkConnectionQueryServiceMetricsValues(CONN_QUERY_SERVICE_1);
+                // Increment counter for successful check
+                counter.incrementAndGet();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        });
+        Thread csqi2 = new Thread(() -> {
+            try {
+                // We have set limit of 2 for phoenix connection counter in doSetup() function.
+                // For this one, we will create more than 2 connections and
+                // test that Connection Throttle Count Metric is also working as expected.
+                checkConnectionQueryServiceMetricsValues(CONN_QUERY_SERVICE_CHECK_CONN_THROTTLE);
+            } catch (Exception e) {
+                e.printStackTrace();
+                if(!e.getMessage().equals("This should not be thrown for "
+                        + CONN_QUERY_SERVICE_CHECK_CONN_THROTTLE)) {
+                    // Increment counter for successful check. For this Connection Query Service,
+                    // code will throw error since it will try to create more than 2 connections.
+                    // So we would count exception as success here and increment the counter.
+                    counter.incrementAndGet();
+                }
+            }
+        });
+        Thread csqi3 = new Thread(() -> {
+            try {
+                checkConnectionQueryServiceMetricsValues(CONN_QUERY_SERVICE_2);
+                // Increment counter for successful check
+                counter.incrementAndGet();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        });
+        Thread csqi4 = new Thread(() -> {
+            try {
+                // Test default CQS name
+                checkConnectionQueryServiceMetricsValues(CONN_QUERY_SERVICE_NULL);
+                // Increment counter for successful check
+                counter.incrementAndGet();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        });
+
+        // Start Single Query Service Test
+        csqi1.start();
+        csqi1.join();
+
+        // Start 3 Query Service Test in parallel
+        csqi2.start();
+        csqi3.start();
+        csqi4.start();
+        csqi2.join();
+        csqi3.join();
+        csqi4.join();
+
+        // Check If all CSQI Metric check passed or not
+        assertEquals("Number of passing CSQI Metrics check should be : ",4, counter.get());
+    }
+
+    private void checkConnectionQueryServiceMetricsValues(
+            String queryServiceName) throws Exception {
+        String CREATE_TABLE_DDL = "CREATE TABLE IF NOT EXISTS %s (K VARCHAR(10) NOT NULL"
+                + " PRIMARY KEY, V VARCHAR)";
+        String princURL = connUrlWithPrincipal(queryServiceName);
+        LOGGER.info("Connection Query Service : " + queryServiceName + " URL : " + princURL);
+
+        String connQueryServiceName;
+        try (Connection conn = DriverManager.getConnection(princURL);
+             Statement stmt = conn.createStatement()) {
+            connQueryServiceName = conn.unwrap(PhoenixConnection.class).getQueryServices()
+                    .getConfiguration().get(QUERY_SERVICES_NAME);
+            // When queryServiceName is passed as null, Phoenix will change query service name
+            // to DEFAULT_CQSN. That's why we are re-assigning the query service name here to check
+            // metric in finally block.
+            queryServiceName = connQueryServiceName;
+            stmt.execute(String.format(CREATE_TABLE_DDL, tableName + "_" + connQueryServiceName));
+            if (connQueryServiceName.equals(CONN_QUERY_SERVICE_CHECK_CONN_THROTTLE)) {
+                try(Connection conn1 = DriverManager.getConnection(princURL)) {
+                    assertMetricValues(connQueryServiceName, 2, 0, 0);
+                    assertHistogramMetricsForMutations(connQueryServiceName, 2, 0, 0,
+                            0);
+                    try(Connection conn2 = DriverManager.getConnection(princURL)) {
+                        // This should never execute in this test.
+                        throw new RuntimeException("This should not be thrown for "
+                                + CONN_QUERY_SERVICE_CHECK_CONN_THROTTLE);
+                    }
+                }
+            } else {
+                // We only create one connection so,
+                // Open Connection Count : 1
+                // Open Internal Connection Count : 0
+                // Connection Throttled Count : 0
+                assertMetricValues(connQueryServiceName, 1, 0, 0);
+                assertHistogramMetricsForMutations(connQueryServiceName, 1, 0, 0, 0);
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw e;
+        } finally {
+            if (queryServiceName.equals(CONN_QUERY_SERVICE_CHECK_CONN_THROTTLE)) {
+                // We have closed the connection in try-resource-catch block so,
+                // Open Connection Count : 0
+                // Connection Throttled Count : 1
+                // Open Internal Connection Count : 0
+                assertMetricValues(queryServiceName, 0, 1, 0);
+                // In histogram, we will still have max open connection count as 2
+                // while rest of the values will be 0.
+                assertHistogramMetricsForMutations(queryServiceName, 2, 0, 0, 0);
+            } else {
+                // We have closed the connection in try-resource-catch block so,
+                // Open Connection Count : 0
+                // Connection Throttled Count : 0
+                // Open Internal Connection Count : 0
+                assertMetricValues(queryServiceName, 0, 0, 0);
+                // In histogram, we will still have max open connection count as 1 while rest of the values will be 0.
+                assertHistogramMetricsForMutations(queryServiceName, 1, 0, 0, 0);
+            }
+        }
+    }
+
+    /**
+     * check min/max value in histogram
+     * @param queryServiceName Connection Query Service Name
+     * @param oMaxValue Max value of {@link MetricType#OPEN_PHOENIX_CONNECTIONS_COUNTER}
+     * @param oMinValue Min value of {@link MetricType#OPEN_PHOENIX_CONNECTIONS_COUNTER}
+     * @param ioMaxValue Max value of {@link MetricType#OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER}
+     * @param ioMinValue Min value of {@link MetricType#OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER}
+     */
+    private void assertHistogramMetricsForMutations(
+            String queryServiceName, int oMaxValue, int oMinValue, int ioMaxValue, int ioMinValue) {
+        Map<String, List<HistogramDistribution>> listOfHistoDistribution =
+                PhoenixRuntime.getAllConnectionQueryServicesHistograms();
+        for(HistogramDistribution histo : listOfHistoDistribution.get(queryServiceName)) {
+            assertHistogram(histo, "PhoenixInternalOpenConn", ioMaxValue, ioMinValue,
+                    CompareOp.EQ);
+            assertHistogram(histo, "PhoenixOpenConn", oMaxValue, oMinValue, CompareOp.EQ);
+        }
+    }
+
+    public void assertHistogram(HistogramDistribution histo, String histoName, long maxValue,
+            long minValue, CompareOp op) {
+        if (histo.getHistoName().equals(histoName)) {
+            switch (op) {
+                case EQ:
+                    assertEquals(maxValue, histo.getMax());
+                    assertEquals(minValue, histo.getMin());
+                    break;
+            }
+        }
+    }
+
+    /**
+     * check metric value for connection query service
+     * @param queryServiceName Connection Query Service Name
+     * @param o {@link MetricType#OPEN_PHOENIX_CONNECTIONS_COUNTER}
+     * @param ct {@link MetricType#PHOENIX_CONNECTIONS_THROTTLED_COUNTER}
+     * @param io {@link MetricType#OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER}
+     */
+    public void assertMetricValues(String queryServiceName, int o, int ct, int io) {
+        Map<String, List<ConnectionQueryServicesMetric>> listOfMetrics =
+                PhoenixRuntime.getAllConnectionQueryServicesCounters();
+        /*
+          There are 3 metrics which are tracked as part of Phoenix Connection Query Service Metrics.
+          Defined here : {@link ConnectionQueryServicesMetrics.QueryServiceMetrics}
+          OPEN_PHOENIX_CONNECTIONS_COUNTER
+          OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER
+          PHOENIX_CONNECTIONS_THROTTLED_COUNTER
+         */
+        assertEquals(3, listOfMetrics.get(queryServiceName).size());
+        for (ConnectionQueryServicesMetric metric : listOfMetrics.get(queryServiceName)) {
+            assertMetricValue(metric, OPEN_PHOENIX_CONNECTIONS_COUNTER, o, CompareOp.EQ);
+            assertMetricValue(metric, PHOENIX_CONNECTIONS_THROTTLED_COUNTER, ct, CompareOp.EQ);
+            assertMetricValue(metric, OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER, io, CompareOp.EQ);
+        }
+    }
+
+    /**
+     * Check if metrics collection is empty.
+     */
+    public void assertMetricListIsEmpty() {
+        Map<String, List<ConnectionQueryServicesMetric>> listOfMetrics =
+                PhoenixRuntime.getAllConnectionQueryServicesCounters();
+        assertTrue(listOfMetrics.isEmpty());
+    }
+
+    /**
+     * Checks that if the metric is of the passed in type, it has the expected value
+     * (based on the CompareOp). If the metric type is different than checkType, ignore
+     * @param m metric to check
+     * @param checkType type to check for
+     * @param compareValue value to compare against
+     * @param op CompareOp
+     */
+    private static void assertMetricValue(Metric m, MetricType checkType, long compareValue,
+                                          CompareOp op) {
+        if (m.getMetricType().equals(checkType)) {
+            switch (op) {
+                case EQ:
+                    assertEquals(compareValue, m.getValue());
+                    break;
+                case LT:
+                    assertTrue(m.getValue() < compareValue);
+                    break;
+                case LTEQ:
+                    assertTrue(m.getValue() <= compareValue);
+                    break;
+                case GT:
+                    assertTrue(m.getValue() > compareValue);
+                    break;
+                case GTEQ:
+                    assertTrue(m.getValue() >= compareValue);
+                    break;
+            }
+        }
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index 493fdf5037..3c078d6ecc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -17,13 +17,13 @@
  */
 package org.apache.phoenix.jdbc;
 
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
+import static org.apache.phoenix.monitoring.MetricType.OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.OPEN_PHOENIX_CONNECTIONS_COUNTER;
+import static org.apache.phoenix.query.QueryServices.QUERY_SERVICES_NAME;
 import static org.apache.phoenix.thirdparty.com.google.common.base.Preconditions.checkNotNull;
 import static java.util.Collections.emptyMap;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_PHOENIX_CONNECTIONS;
-import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER;
-import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_PHOENIX_CONNECTIONS;
 
 import java.io.EOFException;
 import java.io.IOException;
@@ -88,6 +88,7 @@ import org.apache.phoenix.log.ConnectionActivityLogger;
 import org.apache.phoenix.log.LogLevel;
 import org.apache.phoenix.monitoring.MetricType;
 import org.apache.phoenix.monitoring.TableMetricsManager;
+import org.apache.phoenix.monitoring.connectionqueryservice.ConnectionQueryServicesMetricsManager;
 import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.parse.PSchema;
 import org.apache.phoenix.query.ConnectionQueryServices;
@@ -398,13 +399,29 @@ public class PhoenixConnection implements MetaDataMutated, SQLCloseable, Phoenix
 
             this.logSamplingRate = Double.parseDouble(this.services.getProps().get(QueryServices.LOG_SAMPLE_RATE,
                     QueryServicesOptions.DEFAULT_LOG_SAMPLE_RATE));
-            if (isInternalConnection) {
-                GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS.increment();
-            } else {
-                GLOBAL_OPEN_PHOENIX_CONNECTIONS.increment();
+        String connectionQueryServiceName =
+                this.services.getConfiguration().get(QUERY_SERVICES_NAME);
+        if (isInternalConnection) {
+            GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS.increment();
+            long currentInternalConnectionCount =
+                    this.getQueryServices().getConnectionCount(isInternalConnection);
+            ConnectionQueryServicesMetricsManager.updateMetrics(connectionQueryServiceName,
+                    OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER, currentInternalConnectionCount);
+            ConnectionQueryServicesMetricsManager
+                .updateConnectionQueryServiceOpenInternalConnectionHistogram(
+                        currentInternalConnectionCount, connectionQueryServiceName);
+        } else {
+            GLOBAL_OPEN_PHOENIX_CONNECTIONS.increment();
+            long currentConnectionCount =
+                    this.getQueryServices().getConnectionCount(isInternalConnection);
+            ConnectionQueryServicesMetricsManager.updateMetrics(connectionQueryServiceName,
+                    OPEN_PHOENIX_CONNECTIONS_COUNTER, currentConnectionCount);
+            ConnectionQueryServicesMetricsManager
+                .updateConnectionQueryServiceOpenConnectionHistogram(currentConnectionCount,
+                        connectionQueryServiceName);
             }
-            this.sourceOfOperation =
-                    this.services.getProps().get(QueryServices.SOURCE_OPERATION_ATTRIB, null);
+        this.sourceOfOperation = this.services.getProps()
+            .get(QueryServices.SOURCE_OPERATION_ATTRIB, null);
     }
 
     private static void checkScn(Long scnParam) throws SQLException {
@@ -756,6 +773,8 @@ public class PhoenixConnection implements MetaDataMutated, SQLCloseable, Phoenix
             return;
         }
 
+        String connectionQueryServiceName =
+                this.services.getConfiguration().get(QUERY_SERVICES_NAME);
         try {
             isClosing = true;
             TableMetricsManager.pushMetricsFromConnInstanceMethod(getMutationMetrics());
@@ -781,10 +800,24 @@ public class PhoenixConnection implements MetaDataMutated, SQLCloseable, Phoenix
         } finally {
             isClosing = false;
             isClosed = true;
-            if(isInternalConnection()){
+            if (isInternalConnection()){
                 GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS.decrement();
+                long currentInternalConnectionCount =
+                        this.getQueryServices().getConnectionCount(isInternalConnection());
+                ConnectionQueryServicesMetricsManager.updateMetrics(connectionQueryServiceName,
+                        OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER, currentInternalConnectionCount);
+                ConnectionQueryServicesMetricsManager
+                        .updateConnectionQueryServiceOpenInternalConnectionHistogram(
+                                currentInternalConnectionCount, connectionQueryServiceName);
             } else {
                 GLOBAL_OPEN_PHOENIX_CONNECTIONS.decrement();
+                long currentConnectionCount =
+                        this.getQueryServices().getConnectionCount(isInternalConnection());
+                ConnectionQueryServicesMetricsManager.updateMetrics(connectionQueryServiceName,
+                                OPEN_PHOENIX_CONNECTIONS_COUNTER, currentConnectionCount);
+                ConnectionQueryServicesMetricsManager
+                        .updateConnectionQueryServiceOpenConnectionHistogram(
+                                currentConnectionCount, connectionQueryServiceName);
             }
         }
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/BaseConnectionLimiter.java b/phoenix-core/src/main/java/org/apache/phoenix/log/BaseConnectionLimiter.java
index ad720c6155..ec07deb465 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/log/BaseConnectionLimiter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/BaseConnectionLimiter.java
@@ -21,6 +21,7 @@ import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.monitoring.MetricType;
+import org.apache.phoenix.monitoring.connectionqueryservice.ConnectionQueryServicesMetricsManager;
 import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
 import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
 import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
@@ -29,6 +30,8 @@ import javax.annotation.concurrent.GuardedBy;
 import java.sql.SQLException;
 
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.PHOENIX_CONNECTIONS_THROTTLED_COUNTER;
+import static org.apache.phoenix.query.QueryServices.QUERY_SERVICES_NAME;
 
 /**
  * A base class for concrete implementation of ConnectionLimiter.
@@ -39,6 +42,7 @@ import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PHOENIX_C
 public abstract class BaseConnectionLimiter implements ConnectionLimiter {
     protected int connectionCount = 0;
     protected int internalConnectionCount = 0;
+    protected int connectionThrottledCounter = 0;
     protected String profileName;
     protected int maxConnectionsAllowed;
     protected int maxInternalConnectionsAllowed;
@@ -65,8 +69,15 @@ public abstract class BaseConnectionLimiter implements ConnectionLimiter {
             // if throttling threshold is reached, try reclaiming garbage collected phoenix connections.
             if ((allowedConnections != 0) && (futureConnections > allowedConnections) && (onSweep(connection.isInternalConnection()) == 0)) {
                 GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER.increment();
-
-                //TODO:- After PHOENIX-7038 per profile Phoenix Throttled Counter should be updated here.
+                connectionThrottledCounter++;
+                String connectionQueryServiceName = connection.getQueryServices()
+                        .getConfiguration().get(QUERY_SERVICES_NAME);
+                // Since this is ever-increasing counter and only gets reset at JVM restart
+                // Both global and connection query service level,
+                // we won't create histogram for this metric.
+                ConnectionQueryServicesMetricsManager.updateMetrics(
+                        connectionQueryServiceName,
+                        PHOENIX_CONNECTIONS_THROTTLED_COUNTER, connectionThrottledCounter);
 
                 // Let the concrete classes handle the onLimit.
                 // They can either throw the exception back or handle it.
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/ConnectionLimiter.java b/phoenix-core/src/main/java/org/apache/phoenix/log/ConnectionLimiter.java
index 3bc6d465b2..6e9f98b9b2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/log/ConnectionLimiter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/ConnectionLimiter.java
@@ -36,4 +36,8 @@ public interface ConnectionLimiter {
     boolean isLastConnection();
 
     boolean isShouldThrottleNumConnections();
+
+    int getConnectionCount();
+
+    int getInternalConnectionCount();
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/AtomicMetric.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/AtomicMetric.java
index 728e734da0..3368bceae9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/AtomicMetric.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/AtomicMetric.java
@@ -62,6 +62,16 @@ public class AtomicMetric implements Metric {
         value.set(0);
     }
 
+    /**
+     * Set the Metric value as current value
+     *
+     * @param value
+     */
+    @Override
+    public void set(long value) {
+        this.value.set(value);
+    }
+
     @Override
     public void decrement() {
         value.decrementAndGet();        
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetric.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetric.java
index 07cd25dfcc..d1ed8ba4fd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetric.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetric.java
@@ -58,6 +58,14 @@ public interface CombinableMetric extends Metric {
         @Override
         public void reset() {}
 
+        /**
+         * Set the Metric value as current value
+         *
+         * @param value
+         */
+        @Override
+        public void set(long value) {}
+
         @Override
         public String getPublishString() {
             return EMPTY_STRING;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetricImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetricImpl.java
index 40cb5166c3..c2a6e7ba8d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetricImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetricImpl.java
@@ -59,6 +59,16 @@ public class CombinableMetricImpl implements CombinableMetric, Cloneable {
         metric.reset();
     }
 
+    /**
+     * Set the Metric value as current value
+     *
+     * @param value
+     */
+    @Override
+    public void set(long value) {
+        metric.set(value);
+    }
+
     @Override
     public String getPublishString() {
         return getCurrentMetricState();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/ConnectionLimiter.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ConnectionQueryServicesMetric.java
similarity index 54%
copy from phoenix-core/src/main/java/org/apache/phoenix/log/ConnectionLimiter.java
copy to phoenix-core/src/main/java/org/apache/phoenix/monitoring/ConnectionQueryServicesMetric.java
index 3bc6d465b2..75ba7e0131 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/log/ConnectionLimiter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ConnectionQueryServicesMetric.java
@@ -15,25 +15,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.phoenix.log;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-
-import java.sql.SQLException;
+package org.apache.phoenix.monitoring;
 
 /**
- * This interface defines the contract for storing information about Phoenix connections
- * for debugging client-side issues like
- * {@link org.apache.phoenix.exception.SQLExceptionCode#NEW_CONNECTION_THROTTLED}
+ * Class that exposes the various phoenix metrics collected at the Phoenix Query Service level.
+ * Because metrics are dynamic in nature, it is not guaranteed that the state exposed will always
+ * be in sync with each other. One should use these metrics primarily for monitoring and debugging
+ * purposes.
  */
-public interface ConnectionLimiter {
-
-    void acquireConnection(PhoenixConnection connection) throws SQLException;
-
-    void returnConnection(PhoenixConnection connection);
+public interface ConnectionQueryServicesMetric extends Metric {
 
-    int onSweep(boolean internal) ;
+    /**
+     * @return Number of samples collected since the last {@link #reset()} call.
+     */
+    long getNumberOfSamples();
 
-    boolean isLastConnection();
+    /**
+     * @return Sum of the values of the metric sampled since the last {@link #reset()} call.
+     */
+    long getTotalSum();
 
-    boolean isShouldThrottleNumConnections();
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixTableMetricImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ConnectionQueryServicesMetricImpl.java
similarity index 78%
copy from phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixTableMetricImpl.java
copy to phoenix-core/src/main/java/org/apache/phoenix/monitoring/ConnectionQueryServicesMetricImpl.java
index 91bade1bd7..bbefbf34d8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixTableMetricImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ConnectionQueryServicesMetricImpl.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -19,27 +19,40 @@ package org.apache.phoenix.monitoring;
 
 import java.util.concurrent.atomic.AtomicLong;
 
-public class PhoenixTableMetricImpl implements PhoenixTableMetric {
+/**
+ * Metric class for Connection Query Services Metric.
+ */
+public class ConnectionQueryServicesMetricImpl implements ConnectionQueryServicesMetric {
 
     private AtomicLong numberOfSamples = new AtomicLong(0);
     private Metric metric;
 
     /**
-     * Default implementation used when TableLevel Metrics are enabled
+     * Default implementation used when Phoenix Connection Query Service Metrics are enabled
      */
-    public PhoenixTableMetricImpl(MetricType type) {
+    public ConnectionQueryServicesMetricImpl(MetricType type) {
         this.metric = new AtomicMetric(type);
     }
 
     /**
-     * Reset the internal state. Typically called after metric information has been collected and a new phase of
-     * collection is being requested for the next interval.
+     * Reset the internal state. Typically called after metric information has been
+     * collected and a new phase of collection is being requested for the next interval.
      */
     @Override public void reset() {
         metric.reset();
         numberOfSamples.set(0);
     }
 
+    /**
+     * Set the Metric value as current value
+     *
+     * @param value
+     */
+    @Override
+    public void set(long value) {
+        metric.set(value);
+    }
+
     @Override public long getNumberOfSamples() {
         return numberOfSamples.get();
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java
index 8c2128b2c3..ca19c580a2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java
@@ -33,6 +33,16 @@ public class GlobalMetricImpl implements GlobalMetric {
         numberOfSamples.set(0);
     }
 
+    /**
+     * Set the Metric value as current value
+     *
+     * @param value
+     */
+    @Override
+    public void set(long value) {
+        metric.set(value);
+    }
+
     @Override
     public long getNumberOfSamples() {
         return numberOfSamples.get();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java
index 0e51fc02cb..1f3bdb858d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java
@@ -60,5 +60,9 @@ public interface Metric {
      */
     public void reset();
 
+    /**
+     * Set the Metric value as current value
+     */
+    void set(long value);
 }
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NoOpGlobalMetricImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NoOpGlobalMetricImpl.java
index 2dfe941d2f..d03b27e405 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NoOpGlobalMetricImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NoOpGlobalMetricImpl.java
@@ -64,4 +64,14 @@ public class NoOpGlobalMetricImpl implements GlobalMetric {
     public void reset() {
 
     }
+
+    /**
+     * Set the Metric value as current value
+     *
+     * @param value
+     */
+    @Override
+    public void set(long value) {
+
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NonAtomicMetric.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NonAtomicMetric.java
index 4e611c5674..77fe093a40 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NonAtomicMetric.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NonAtomicMetric.java
@@ -63,6 +63,16 @@ class NonAtomicMetric implements Metric {
         value = 0;
     }
 
+    /**
+     * Set the Metric value as current value
+     *
+     * @param value
+     */
+    @Override
+    public void set(long value) {
+        this.value = value;
+    }
+
     @Override
     public void decrement() {
         value--;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixTableMetricImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixTableMetricImpl.java
index 91bade1bd7..0d6ef7868b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixTableMetricImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixTableMetricImpl.java
@@ -40,6 +40,16 @@ public class PhoenixTableMetricImpl implements PhoenixTableMetric {
         numberOfSamples.set(0);
     }
 
+    /**
+     * Set the Metric value as current value
+     *
+     * @param value
+     */
+    @Override
+    public void set(long value) {
+        metric.set(value);
+    }
+
     @Override public long getNumberOfSamples() {
         return numberOfSamples.get();
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesHistogram.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesHistogram.java
new file mode 100644
index 0000000000..883be7b145
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesHistogram.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring.connectionqueryservice;
+
+import static org.apache.phoenix.query.QueryServices.CONNECTION_QUERY_SERVICE_HISTOGRAM_SIZE_RANGES;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.monitoring.RangeHistogram;
+import org.apache.phoenix.query.QueryServices;
+
+/**
+ * Histogram for calculating phoenix connection. We read ranges using
+ * config property {@link QueryServices#PHOENIX_HISTOGRAM_SIZE_RANGES}.
+ * If this property is not set then it will default to DEFAULT_RANGE values.
+ */
+public class ConnectionQueryServicesHistogram extends RangeHistogram {
+    static final long[] DEFAULT_RANGE = {1, 10, 100, 500, 1000};
+    public ConnectionQueryServicesHistogram(String name, String description, Configuration conf) {
+        super(initializeRanges(conf), name, description);
+    }
+
+    private static long[] initializeRanges(Configuration conf) {
+        long[] ranges = PhoenixConfigurationUtil.getLongs(
+                conf, CONNECTION_QUERY_SERVICE_HISTOGRAM_SIZE_RANGES);
+        return ranges != null ? ranges : DEFAULT_RANGE;
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetrics.java
new file mode 100644
index 0000000000..b3a8a1c3c1
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetrics.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring.connectionqueryservice;
+
+import static org.apache.phoenix.monitoring.MetricType.OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.OPEN_PHOENIX_CONNECTIONS_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.PHOENIX_CONNECTIONS_THROTTLED_COUNTER;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.monitoring.ConnectionQueryServicesMetric;
+import org.apache.phoenix.monitoring.ConnectionQueryServicesMetricImpl;
+import org.apache.phoenix.monitoring.MetricType;
+
+/**
+ * Class for Connection Query Service Metrics.
+ */
+public class ConnectionQueryServicesMetrics {
+    /**
+     * List Metrics tracked in Connection Query Service Metrics
+     */
+    public enum QueryServiceMetrics {
+        CONNECTION_QUERY_SERVICE_OPEN_PHOENIX_CONNECTIONS_COUNTER(OPEN_PHOENIX_CONNECTIONS_COUNTER),
+        CONNECTION_QUERY_SERVICE_OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER(
+                OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER),
+        CONNECTION_QUERY_SERVICE_PHOENIX_CONNECTIONS_THROTTLED_COUNTER(
+                PHOENIX_CONNECTIONS_THROTTLED_COUNTER);
+
+        private MetricType metricType;
+        private ConnectionQueryServicesMetric metric;
+
+        QueryServiceMetrics(MetricType metricType) {
+            this.metricType = metricType;
+        }
+    }
+
+    private final String connectionQueryServiceName;
+    private Map<MetricType, ConnectionQueryServicesMetric> metricRegister;
+    private ConnectionQueryServicesMetricsHistograms connectionQueryServiceMetricsHistograms;
+
+    public ConnectionQueryServicesMetrics(final String connectionQueryServiceName,
+            Configuration conf) {
+        this.connectionQueryServiceName = connectionQueryServiceName;
+        metricRegister = new HashMap<>();
+        for (QueryServiceMetrics connectionQueryServiceMetric
+                : QueryServiceMetrics.values()) {
+            connectionQueryServiceMetric.metric =
+                    new ConnectionQueryServicesMetricImpl(connectionQueryServiceMetric.metricType);
+            metricRegister.put(connectionQueryServiceMetric.metricType,
+                    connectionQueryServiceMetric.metric);
+        }
+        connectionQueryServiceMetricsHistograms =
+                new ConnectionQueryServicesMetricsHistograms(connectionQueryServiceName, conf);
+    }
+
+    /**
+     * This function is used to update the value of Metric
+     * In case of counter val will be passed as 1.
+     *
+     * @param type metric type
+     * @param val update value. In case of counters, this will be 1
+     */
+    public void setMetricValue(MetricType type, long val) {
+        if (!metricRegister.containsKey(type)) {
+            return;
+        }
+        ConnectionQueryServicesMetric metric = metricRegister.get(type);
+        metric.set(val);
+    }
+
+    /**
+     * This function is used to get the value of Metric.
+     *
+     * @param type metric type
+     * @return val current value of metric.
+     */
+    public long getMetricValue(MetricType type) {
+        if (!metricRegister.containsKey(type)) {
+            return 0;
+        }
+        ConnectionQueryServicesMetric metric = metricRegister.get(type);
+        return metric.getValue();
+    }
+
+    public String getConnectionQueryServiceName() {
+        return connectionQueryServiceName;
+    }
+
+    /**
+     * This method is called to aggregate all the Metrics across all Connection Query Service.
+     *
+     * @return map of Connection Query Service name -> list of ConnectionQueryServicesMetric.
+     */
+    public List<ConnectionQueryServicesMetric> getAllMetrics() {
+        return new ArrayList<>(metricRegister.values());
+    }
+
+    public ConnectionQueryServicesMetricsHistograms getConnectionQueryServiceHistograms() {
+        return connectionQueryServiceMetricsHistograms;
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsHistograms.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsHistograms.java
new file mode 100644
index 0000000000..29e8a4b4f4
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsHistograms.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring.connectionqueryservice;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import edu.umd.cs.findbugs.annotations.SuppressWarnings;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.monitoring.HistogramDistribution;
+
+/**
+ * Histogram Metrics for Connection Query Service Metrics.
+ * 1. Connection count
+ * 2. Internal Connection Count.
+ */
+public class ConnectionQueryServicesMetricsHistograms {
+    private String connectionQueryServicesName;
+    private ConnectionQueryServicesHistogram connectionQueryServiceOpenInternalSizeHistogram;
+    private ConnectionQueryServicesHistogram connectionQueryServicesOpenConnSizeHistogram;
+
+    public ConnectionQueryServicesMetricsHistograms(String connectionQueryServiceName,
+                                                   Configuration conf) {
+        connectionQueryServicesName = connectionQueryServiceName;
+        connectionQueryServiceOpenInternalSizeHistogram = new ConnectionQueryServicesHistogram(
+                "PhoenixInternalOpenConn",
+                "histogram for number of open internal phoenix connections", conf);
+        connectionQueryServicesOpenConnSizeHistogram = new ConnectionQueryServicesHistogram(
+                "PhoenixOpenConn", "histogram for number of open phoenix connections", conf);
+    }
+
+    public String getConnectionQueryServicesName() {
+        return this.connectionQueryServicesName;
+    }
+
+    @SuppressWarnings(value = "EI_EXPOSE_REP",
+            justification = "It's only used in internally for metrics storage")
+    public ConnectionQueryServicesHistogram getConnectionQueryServicesInternalOpenConnHisto() {
+        return connectionQueryServiceOpenInternalSizeHistogram;
+    }
+
+    @SuppressWarnings(value = "EI_EXPOSE_REP",
+            justification = "It's only used in internally for metrics storage")
+    public ConnectionQueryServicesHistogram getConnectionQueryServicesOpenConnHisto() {
+        return connectionQueryServicesOpenConnSizeHistogram;
+    }
+
+    public List<HistogramDistribution> getConnectionQueryServicesHistogramsDistribution() {
+        List<HistogramDistribution> list = new ArrayList(Arrays.asList(
+            this.connectionQueryServiceOpenInternalSizeHistogram.getRangeHistogramDistribution(),
+            this.connectionQueryServicesOpenConnSizeHistogram.getRangeHistogramDistribution()));
+        return Collections.unmodifiableList(list);
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsManager.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsManager.java
new file mode 100644
index 0000000000..0175924653
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsManager.java
@@ -0,0 +1,343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring.connectionqueryservice;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import edu.umd.cs.findbugs.annotations.SuppressWarnings;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.phoenix.monitoring.ConnectionQueryServicesMetric;
+import org.apache.phoenix.monitoring.HistogramDistribution;
+import org.apache.phoenix.monitoring.MetricPublisherSupplierFactory;
+import org.apache.phoenix.monitoring.MetricServiceResolver;
+import org.apache.phoenix.monitoring.MetricType;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Central place where we keep track of all the Connection Query Service metrics. Register each
+ * Connection Query Service and store the instance of it associated with ConnectionServiceName in a
+ * map This class exposes following functions as static functions to help catch all exception
+ * 1.clearAllConnectionQueryServiceMetrics
+ * 2.getConnectionQueryServicesMetrics
+ * 3.updateMetrics
+ */
+public class ConnectionQueryServicesMetricsManager {
+    private static final Logger LOGGER =
+            LoggerFactory.getLogger(ConnectionQueryServicesMetricsManager.class);
+    private static volatile boolean isConnectionQueryServiceMetricsEnabled;
+    private static volatile boolean isConnectionQueryServiceMetricPublisherEnabled;
+    private static ConcurrentMap<String, ConnectionQueryServicesMetrics>
+            connectionQueryServiceMetricsMapping;
+    // Singleton object
+    private static volatile ConnectionQueryServicesMetricsManager
+            connectionQueryServicesMetricsManager = null;
+    private static volatile MetricPublisherSupplierFactory mPublisher = null;
+    private static volatile QueryServicesOptions options;
+
+    @SuppressWarnings(value = "ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD", justification = "This " +
+            "Object is only created once for the JVM")
+    public ConnectionQueryServicesMetricsManager(QueryServicesOptions opts) {
+        options = opts;
+        connectionQueryServiceMetricsMapping = new ConcurrentHashMap<>();
+        isConnectionQueryServiceMetricsEnabled = options.isConnectionQueryServiceMetricsEnabled();
+        isConnectionQueryServiceMetricPublisherEnabled =
+                options.isConnectionQueryServiceMetricsPublisherEnabled();
+        LOGGER.info("Connection query service metrics enabled : "
+                + isConnectionQueryServiceMetricsEnabled + " publisher enabled : "
+                + isConnectionQueryServiceMetricPublisherEnabled);
+    }
+
+    @SuppressWarnings(value = "EI_EXPOSE_STATIC_REP2", justification = "Only used for testing")
+    public static void setInstance(ConnectionQueryServicesMetricsManager metricsManager) {
+        connectionQueryServicesMetricsManager = metricsManager;
+    }
+
+    /**
+     * Function to provide instance of ConnectionQueryServiceMetricsManager(Create if needed in
+     * thread safe manner)
+     * @return returns instance of ConnectionQueryServicesMetricsManager
+     */
+    @SuppressWarnings(value = "MS_EXPOSE_REP", justification = "Only used internally, not exposed" +
+            " to external client")
+    public static ConnectionQueryServicesMetricsManager getInstance() {
+        if (connectionQueryServicesMetricsManager == null) {
+            synchronized (ConnectionQueryServicesMetricsManager.class) {
+                if (connectionQueryServicesMetricsManager == null) {
+                    QueryServicesOptions options = QueryServicesOptions.withDefaults();
+                    if (options.isConnectionQueryServiceMetricsEnabled()) {
+                        connectionQueryServicesMetricsManager =
+                                new ConnectionQueryServicesMetricsManager(options);
+                        LOGGER.info("Created object for Connection query service metrics manager");
+                    } else {
+                        connectionQueryServicesMetricsManager =
+                                NoOpConnectionQueryServicesMetricsManager.NO_OP_CONN_QUERY_SERVICES_METRICS_MANAGER;
+                        LOGGER.info("Created object for NoOp Connection query service metrics manager");
+                        return connectionQueryServicesMetricsManager;
+                    }
+                    registerMetricsPublisher();
+                }
+            }
+        }
+        return connectionQueryServicesMetricsManager;
+    }
+
+    ConnectionQueryServicesMetricsManager() {
+
+    }
+
+    public static void registerMetricsPublisher() {
+        if (isConnectionQueryServiceMetricPublisherEnabled) {
+            String className = options.getConnectionQueryServiceMetricsPublisherClass();
+            if (className != null) {
+                MetricServiceResolver mResolver = new MetricServiceResolver();
+                LOGGER.info("Connection query service metrics publisher className "
+                        + className);
+                try {
+                    mPublisher = mResolver.instantiate(className);
+                    mPublisher.registerMetricProvider();
+                } catch (Throwable e) {
+                    LOGGER.error("The exception from metric publish Function", e);
+                }
+
+            } else {
+                LOGGER.warn("Connection query service metrics publisher className"
+                        + " can't be null");
+            }
+        }
+    }
+
+    /**
+     * Function to provide Object of ConnectionQueryServicesMetrics (Create if needed in
+     * thread safe manner) for connectionQueryServiceName
+     * @param connectionQueryServiceName Connection Query Service Name
+     * @return returns instance of ConnectionQueryServicesMetrics for connectionQueryServiceName
+     */
+    ConnectionQueryServicesMetrics getConnectionQueryServiceMetricsInstance(
+            String connectionQueryServiceName) {
+        if (Strings.isNullOrEmpty(connectionQueryServiceName)) {
+            LOGGER.warn("Connection query service Name can't be null or empty");
+            return null;
+        }
+
+        ConnectionQueryServicesMetrics cqsInstance =
+                connectionQueryServiceMetricsMapping.get(connectionQueryServiceName);
+        if (cqsInstance == null) {
+            synchronized (ConnectionQueryServicesMetricsManager.class) {
+                cqsInstance = connectionQueryServiceMetricsMapping.get(connectionQueryServiceName);
+                if (cqsInstance == null) {
+
+                    LOGGER.info("Creating connection query service metrics object for : "
+                            + connectionQueryServiceName);
+                    cqsInstance = new ConnectionQueryServicesMetrics(connectionQueryServiceName,
+                            options.getConfiguration());
+                    connectionQueryServiceMetricsMapping
+                            .put(connectionQueryServiceName, cqsInstance);
+                }
+            }
+        }
+        return cqsInstance;
+    }
+
+    /**
+     * This function will be used to add individual MetricType to LocalStore. Also this will serve
+     * as LocalStore to store connection query service metrics before their current value is added
+     * to histogram.
+     * This func is only used for metrics which are counter based, where values increases or
+     * decreases frequently. Like Open Conn Counter. This function will first retrieve it's current
+     * value and increment or decrement (by +/-1) it as required then update the new values.
+     * <br>
+     * Example :- OPEN_PHOENIX_CONNECTIONS_COUNTER, OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER
+     * <br>
+     * <br>
+     * histogram will update with each increment/decrement.
+     * @param connectionQueryServiceName
+     * @param type
+     * @param value
+     */
+    void updateMetricsValue(String connectionQueryServiceName, MetricType type,
+            long value) {
+
+        long startTime = EnvironmentEdgeManager.currentTime();
+
+        ConnectionQueryServicesMetrics cqsInstance =
+                getConnectionQueryServiceMetricsInstance(connectionQueryServiceName);
+        if (cqsInstance == null) {
+            return;
+        }
+        cqsInstance.setMetricValue(type, value);
+
+        if (LOGGER.isTraceEnabled()) {
+            LOGGER.trace("Connection query service metrics completed updating metric "
+                    + type + " to value " + value + ", timetaken = "
+                    + (EnvironmentEdgeManager.currentTime() - startTime));
+        }
+    }
+
+    /**
+     * static functions to push, update or retrieve ConnectionQueryService Metrics.
+     * @param connectionQueryServiceName name of the connection query service
+     * @param type                       type of metric
+     * @param value                      metric value
+     */
+    public static void updateMetrics(String connectionQueryServiceName, MetricType type,
+            long value) {
+        try {
+            ConnectionQueryServicesMetricsManager.getInstance()
+                    .updateMetricsValue(connectionQueryServiceName, type, value);
+        } catch (Exception e) {
+            LOGGER.error("Failed updating connection query service metrics", e);
+        }
+    }
+
+    public static Map<String, List<ConnectionQueryServicesMetric>> getAllConnectionQueryServicesMetrics() {
+        return ConnectionQueryServicesMetricsManager.getInstance()
+                .getConnectionQueryServicesMetrics();
+    }
+
+    /**
+     * This function will return all the counters for Phoenix connection query service.
+     * @return Map of all ConnectionQueryService Metrics.
+     */
+    Map<String, List<ConnectionQueryServicesMetric>> getConnectionQueryServicesMetrics() {
+        try {
+            long startTime = EnvironmentEdgeManager.currentTime();
+            Map<String, List<ConnectionQueryServicesMetric>> map = new HashMap<>();
+            for (Map.Entry<String, ConnectionQueryServicesMetrics> entry
+                    : connectionQueryServiceMetricsMapping.entrySet()) {
+                map.put(entry.getKey(), entry.getValue().getAllMetrics());
+            }
+            long timeTakenForMetricConversion = EnvironmentEdgeManager.currentTime() - startTime;
+            LOGGER.info("Connection query service metrics fetching complete, timeTaken: "
+                    + timeTakenForMetricConversion);
+            return map;
+        } catch (Exception e) {
+            LOGGER.error("Failed retrieving connection query service Metrics", e);
+        }
+        return null;
+    }
+
+    public static Map<String, List<HistogramDistribution>> getHistogramsForAllConnectionQueryServices() {
+        return ConnectionQueryServicesMetricsManager.getInstance()
+                .getHistogramsForConnectionQueryServices();
+    }
+
+    /**
+     * This function will return histogram for all the Phoenix connection query service metrics.
+     * @return Map of all ConnectionServiceMetrics Histogram
+     */
+    Map<String, List<HistogramDistribution>> getHistogramsForConnectionQueryServices() {
+        Map<String, List<HistogramDistribution>> map = new HashMap<>();
+        for (Map.Entry<String, ConnectionQueryServicesMetrics> entry
+                : connectionQueryServiceMetricsMapping.entrySet()) {
+            ConnectionQueryServicesMetricsHistograms connectionQueryServiceHistogramsHistograms =
+                    entry.getValue().getConnectionQueryServiceHistograms();
+            map.put(entry.getKey(), connectionQueryServiceHistogramsHistograms
+                    .getConnectionQueryServicesHistogramsDistribution());
+        }
+        return map;
+    }
+
+    /**
+     * Function to update {@link MetricType#OPEN_PHOENIX_CONNECTIONS_COUNTER} counter value in
+     * Histogram
+     * @param connCount                  current count of
+     *                                   {@link MetricType#OPEN_PHOENIX_CONNECTIONS_COUNTER}
+     * @param connectionQueryServiceName ConnectionQueryService name
+     */
+    public static void updateConnectionQueryServiceOpenConnectionHistogram(long connCount,
+            String connectionQueryServiceName) {
+        ConnectionQueryServicesMetrics metrics =
+                getInstance().getConnectionQueryServiceMetricsInstance(connectionQueryServiceName);
+        if (metrics == null) {
+            return;
+        }
+        metrics.getConnectionQueryServiceHistograms().getConnectionQueryServicesOpenConnHisto()
+                .add(connCount);
+    }
+
+    /**
+     * Function to update {@link MetricType#OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER} counter value
+     * in Histogram
+     * @param connCount                  current count of
+     *                                   {@link
+     *                                   MetricType#OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER}
+     * @param connectionQueryServiceName ConnectionQueryService name
+     */
+    public static void updateConnectionQueryServiceOpenInternalConnectionHistogram(long connCount,
+            String connectionQueryServiceName) {
+        ConnectionQueryServicesMetrics metrics =
+                getInstance().getConnectionQueryServiceMetricsInstance(connectionQueryServiceName);
+        if (metrics == null) {
+            return;
+        }
+        metrics.getConnectionQueryServiceHistograms()
+                .getConnectionQueryServicesInternalOpenConnHisto().add(connCount);
+    }
+
+    /////////////////////////////////////////////////////////
+    ////// Below Functions are majorly used in testing //////
+    /////////////////////////////////////////////////////////
+
+    public static ConnectionQueryServicesHistogram getConnectionQueryServiceOpenInternalConnectionHistogram(
+            String connectionQueryServiceName) {
+        ConnectionQueryServicesMetrics metrics =
+                getInstance().getConnectionQueryServiceMetricsInstance(connectionQueryServiceName);
+        if (metrics == null) {
+            return null;
+        }
+        return metrics.getConnectionQueryServiceHistograms()
+                .getConnectionQueryServicesInternalOpenConnHisto();
+    }
+
+    public static ConnectionQueryServicesHistogram
+        getConnectionQueryServiceOpenConnectionHistogram(String connectionQueryServiceName) {
+        ConnectionQueryServicesMetrics metrics =
+                getInstance().getConnectionQueryServiceMetricsInstance(connectionQueryServiceName);
+        if (metrics == null) {
+            return null;
+        }
+        return metrics.getConnectionQueryServiceHistograms()
+                .getConnectionQueryServicesOpenConnHisto();
+    }
+
+    /**
+     * Helps reset the localstore(connectionQueryServiceMetricsMapping)
+     */
+    void clearConnectionQueryServiceMetrics() {
+        if (connectionQueryServiceMetricsMapping != null) {
+            connectionQueryServiceMetricsMapping.clear();
+        }
+        LOGGER.info("Connection query service metrics clearing complete");
+    }
+
+    public static void clearAllConnectionQueryServiceMetrics() {
+        try {
+            ConnectionQueryServicesMetricsManager.getInstance()
+                    .clearConnectionQueryServiceMetrics();
+        } catch (Exception e) {
+            LOGGER.error("Failed resetting connection query service Metrics", e);
+        }
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/NoOpConnectionQueryServicesMetricsManager.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/NoOpConnectionQueryServicesMetricsManager.java
new file mode 100644
index 0000000000..cf3ec00de3
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/NoOpConnectionQueryServicesMetricsManager.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring.connectionqueryservice;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.phoenix.monitoring.ConnectionQueryServicesMetric;
+import org.apache.phoenix.monitoring.HistogramDistribution;
+import org.apache.phoenix.monitoring.MetricType;
+
+/**
+ * ConnectionQueryServicesMetricsManager will be replaced by this class when
+ * {@link org.apache.phoenix.query.QueryServices#CONNECTION_QUERY_SERVICE_METRICS_ENABLED} flag is
+ * set to  false.
+ */
+public class NoOpConnectionQueryServicesMetricsManager extends ConnectionQueryServicesMetricsManager {
+
+    public static final NoOpConnectionQueryServicesMetricsManager NO_OP_CONN_QUERY_SERVICES_METRICS_MANAGER =
+            new NoOpConnectionQueryServicesMetricsManager();
+
+    private NoOpConnectionQueryServicesMetricsManager() {
+        super();
+    }
+
+    void updateMetricsValue(String connectionQueryServiceName, MetricType type,
+            long value) {
+    }
+
+    Map<String, List<ConnectionQueryServicesMetric>> getConnectionQueryServicesMetrics() {
+        return Collections.emptyMap();
+    }
+
+    Map<String, List<HistogramDistribution>> getHistogramsForConnectionQueryServices() {
+        return Collections.emptyMap();
+    }
+
+    void clearConnectionQueryServiceMetrics() {
+
+    }
+
+    ConnectionQueryServicesMetrics getConnectionQueryServiceMetricsInstance(
+            String connectionQueryServiceName) {
+        return null;
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index b89ad3e435..b26c6cf75e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -226,4 +226,5 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated
         throw new UnsupportedOperationException();
     }
 
+    int getConnectionCount(boolean isInternal);
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index dcc29f980e..ad4d2a48e2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -306,7 +306,6 @@ import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.StringUtil;
-import org.apache.phoenix.util.TimeKeeper;
 import org.apache.phoenix.util.UpgradeUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -455,6 +454,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         for (Entry<String,String> entry : connectionInfo.asProps()) {
             config.set(entry.getKey(), entry.getValue());
         }
+        if (connectionInfo.getPrincipal() != null) {
+            config.set(QUERY_SERVICES_NAME, connectionInfo.getPrincipal());
+        }
+        LOGGER.info(String.format("CQS initialized with connection query service : %s",
+                config.get(QUERY_SERVICES_NAME)));
         this.connectionInfo = connectionInfo;
 
         // Without making a copy of the configuration we cons up, we lose some of our properties
@@ -789,6 +793,15 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         return latestMetaData;
     }
 
+    @Override
+    public int getConnectionCount(boolean isInternal) {
+        if (isInternal) {
+            return connectionLimiter.getInternalConnectionCount();
+        } else {
+            return connectionLimiter.getConnectionCount();
+        }
+    }
+
     @Override
     public void addTable(PTable table, long resolvedTime) throws SQLException {
         synchronized (latestMetaDataLock) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 7da182857d..87f0bd63fe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -809,4 +809,9 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
     public PMetaData getMetaDataCache() {
         return metaData;
     }
+
+    @Override
+    public int getConnectionCount(boolean isInternal) {
+        return 0;
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index 72ee10c215..46edad7ce9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -416,4 +416,9 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
     public ConnectionLimiter getConnectionLimiter() {
         return getDelegate().getConnectionLimiter();
     }
+
+    @Override
+    public int getConnectionCount(boolean isInternal) {
+        return getDelegate().getConnectionCount(isInternal);
+    }
 }
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index d300929ac7..af2c518a1c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -56,6 +56,7 @@ public interface QueryServices extends SQLCloseable {
             "phoenix.query.server.orderBy.spooling.enabled";
     public static final String HBASE_CLIENT_KEYTAB = "hbase.myclient.keytab";
     public static final String HBASE_CLIENT_PRINCIPAL = "hbase.myclient.principal";
+    String QUERY_SERVICES_NAME = "phoenix.query.services.name";
     public static final String SPOOL_DIRECTORY = "phoenix.spool.directory";
     public static final String AUTO_COMMIT_ATTRIB = "phoenix.connection.autoCommit";
     // consistency configuration setting
@@ -392,6 +393,17 @@ public interface QueryServices extends SQLCloseable {
     public static final String PHOENIX_HISTOGRAM_LATENCY_RANGES = "phoenix.histogram.latency.ranges";
     // The range of bins for size metrics for histogram.
     public static final String PHOENIX_HISTOGRAM_SIZE_RANGES = "phoenix.histogram.size.ranges";
+
+    // Connection Query Service Metrics Configs
+    String CONNECTION_QUERY_SERVICE_METRICS_ENABLED = "phoenix.conn.query.service.metrics.enabled";
+    String CONNECTION_QUERY_SERVICE_METRICS_PUBLISHER_CLASSNAME =
+            "phoenix.monitoring.connection.query.service.metricProvider.className";
+    String CONNECTION_QUERY_SERVICE_METRICS_PUBLISHER_ENABLED =
+            "phoenix.conn.query.service.metricsPublisher.enabled";
+    // The range of bins for Connection Query Service Metrics of histogram.
+    String CONNECTION_QUERY_SERVICE_HISTOGRAM_SIZE_RANGES =
+            "phoenix.conn.query.service.histogram.size.ranges";
+
     // This config is used to move (copy and delete) the child links from the SYSTEM.CATALOG to SYSTEM.CHILD_LINK table.
     // As opposed to a copy and async (out of band) delete.
     public static final String MOVE_CHILD_LINKS_DURING_UPGRADE_ENABLED = "phoenix.move.child_link.during.upgrade";
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 81a2a29004..b39fb788a6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -30,6 +30,10 @@ import static org.apache.phoenix.query.QueryServices.COLLECT_REQUEST_LEVEL_METRI
 import static org.apache.phoenix.query.QueryServices.COMMIT_STATS_ASYNC;
 import static org.apache.phoenix.query.QueryServices.CONNECTION_ACTIVITY_LOGGING_ENABLED;
 import static org.apache.phoenix.query.QueryServices.CONNECTION_ACTIVITY_LOGGING_INTERVAL;
+import static org.apache.phoenix.query.QueryServices.CONNECTION_QUERY_SERVICE_HISTOGRAM_SIZE_RANGES;
+import static org.apache.phoenix.query.QueryServices.CONNECTION_QUERY_SERVICE_METRICS_ENABLED;
+import static org.apache.phoenix.query.QueryServices.CONNECTION_QUERY_SERVICE_METRICS_PUBLISHER_ENABLED;
+import static org.apache.phoenix.query.QueryServices.CONNECTION_QUERY_SERVICE_METRICS_PUBLISHER_CLASSNAME;
 import static org.apache.phoenix.query.QueryServices.COST_BASED_OPTIMIZER_ENABLED;
 import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_TIMEZONE_ATTRIB;
@@ -71,6 +75,7 @@ import static org.apache.phoenix.query.QueryServices.MIN_STATS_UPDATE_FREQ_MS_AT
 import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK;
 import static org.apache.phoenix.query.QueryServices.PHOENIX_ACLS_ENABLED;
+import static org.apache.phoenix.query.QueryServices.QUERY_SERVICES_NAME;
 import static org.apache.phoenix.query.QueryServices.QUEUE_SIZE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.REGIONSERVER_INFO_PORT_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.RENEW_LEASE_ENABLED;
@@ -322,6 +327,14 @@ public class QueryServicesOptions {
 
     public static final long DEFAULT_INDEX_POPULATION_SLEEP_TIME = 5000;
 
+    // Phoenix Connection Query Service configuration Defaults
+    public static final String DEFAULT_QUERY_SERVICES_NAME = "DEFAULT_CQSN";
+    public static final String DEFAULT_CONNECTION_QUERY_SERVICE_HISTOGRAM_SIZE_RANGES =
+            "1, 10, 100, 500, 1000";
+    public static final boolean DEFAULT_IS_CONNECTION_QUERY_SERVICE_METRICS_PUBLISHER_ENABLED =
+            false;
+    public static final boolean DEFAULT_IS_CONNECTION_QUERY_SERVICE_METRICS_ENABLED = false;
+
     public static final boolean DEFAULT_RENEW_LEASE_ENABLED = true;
     public static final int DEFAULT_RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS =
             DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD / 2;
@@ -495,7 +508,14 @@ public class QueryServicesOptions {
             .setIfUnset(CLIENT_METRICS_TAG, DEFAULT_CLIENT_METRICS_TAG)
             .setIfUnset(CLIENT_INDEX_ASYNC_THRESHOLD, DEFAULT_CLIENT_INDEX_ASYNC_THRESHOLD)
             .setIfUnset(PHOENIX_TTL_SERVER_SIDE_MASKING_ENABLED, DEFAULT_SERVER_SIDE_MASKING_ENABLED)
+            .setIfUnset(QUERY_SERVICES_NAME, DEFAULT_QUERY_SERVICES_NAME)
             .setIfUnset(INDEX_CREATE_DEFAULT_STATE, DEFAULT_CREATE_INDEX_STATE)
+            .setIfUnset(CONNECTION_QUERY_SERVICE_HISTOGRAM_SIZE_RANGES,
+                    DEFAULT_CONNECTION_QUERY_SERVICE_HISTOGRAM_SIZE_RANGES)
+            .setIfUnset(CONNECTION_QUERY_SERVICE_METRICS_ENABLED,
+                    DEFAULT_IS_CONNECTION_QUERY_SERVICE_METRICS_ENABLED)
+            .setIfUnset(CONNECTION_QUERY_SERVICE_METRICS_PUBLISHER_ENABLED,
+                    DEFAULT_IS_CONNECTION_QUERY_SERVICE_METRICS_PUBLISHER_ENABLED)
             .setIfUnset(SKIP_SYSTEM_TABLES_EXISTENCE_CHECK,
                 DEFAULT_SKIP_SYSTEM_TABLES_EXISTENCE_CHECK)
             .setIfUnset(MAX_IN_LIST_SKIP_SCAN_SIZE, DEFAULT_MAX_IN_LIST_SKIP_SCAN_SIZE)
@@ -754,6 +774,29 @@ public class QueryServicesOptions {
         return config.getBoolean(METRIC_PUBLISHER_ENABLED, DEFAULT_IS_METRIC_PUBLISHER_ENABLED);
     }
 
+    public boolean isConnectionQueryServiceMetricsEnabled() {
+        return config.getBoolean(CONNECTION_QUERY_SERVICE_METRICS_ENABLED,
+                DEFAULT_IS_CONNECTION_QUERY_SERVICE_METRICS_ENABLED);
+    }
+
+    public boolean isConnectionQueryServiceMetricsPublisherEnabled() {
+        return config.getBoolean(CONNECTION_QUERY_SERVICE_METRICS_PUBLISHER_ENABLED,
+                DEFAULT_IS_CONNECTION_QUERY_SERVICE_METRICS_PUBLISHER_ENABLED);
+    }
+
+    public String getQueryServicesName() {
+        return config.get(QUERY_SERVICES_NAME, DEFAULT_QUERY_SERVICES_NAME);
+    }
+
+    public void setConnectionQueryServiceMetricsEnabled() {
+        set(CONNECTION_QUERY_SERVICE_METRICS_ENABLED, true);
+    }
+
+    public String getConnectionQueryServiceMetricsPublisherClass() {
+        return config.get(CONNECTION_QUERY_SERVICE_METRICS_PUBLISHER_CLASSNAME,
+                DEFAULT_METRIC_PUBLISHER_CLASS_NAME);
+    }
+
     @VisibleForTesting
     public void setAllowedListForTableLevelMetrics(String tableNameList){
         set(ALLOWED_LIST_FOR_TABLE_LEVEL_METRICS,tableNameList);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
index d4448a786c..f92a92c02b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -67,12 +67,14 @@ import org.apache.phoenix.jdbc.PhoenixMonitoredConnection;
 import org.apache.phoenix.jdbc.PhoenixMonitoredResultSet;
 import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
 import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.monitoring.ConnectionQueryServicesMetric;
 import org.apache.phoenix.monitoring.GlobalClientMetrics;
 import org.apache.phoenix.monitoring.GlobalMetric;
 import org.apache.phoenix.monitoring.HistogramDistribution;
 import org.apache.phoenix.monitoring.MetricType;
 import org.apache.phoenix.monitoring.PhoenixTableMetric;
 import org.apache.phoenix.monitoring.TableMetricsManager;
+import org.apache.phoenix.monitoring.connectionqueryservice.ConnectionQueryServicesMetricsManager;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.AmbiguousColumnException;
@@ -1465,6 +1467,22 @@ public class PhoenixRuntime {
         return TableMetricsManager.getSizeHistogramsForAllTables();
     }
 
+    public static Map<String, List<HistogramDistribution>> getAllConnectionQueryServicesHistograms() {
+        return ConnectionQueryServicesMetricsManager.getHistogramsForAllConnectionQueryServices();
+    }
+
+    public static Map<String, List<ConnectionQueryServicesMetric>> getAllConnectionQueryServicesCounters() {
+        return ConnectionQueryServicesMetricsManager.getAllConnectionQueryServicesMetrics();
+    }
+
+    /**
+     * This is only used in testcases to reset the connection query services Metrics data
+     */
+    @VisibleForTesting
+    public static void clearAllConnectionQueryServiceMetrics() {
+        ConnectionQueryServicesMetricsManager.clearAllConnectionQueryServiceMetrics();
+    }
+
     /**
      * This is only used in testcases to reset the tableLevel Metrics data
      */
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesHistogramTest.java b/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesHistogramTest.java
new file mode 100644
index 0000000000..5b3b107df7
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesHistogramTest.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring.connectionqueryservice;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.query.QueryServices;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ConnectionQueryServicesHistogramTest {
+
+    @Test
+    public void testConnectionQueryServiceHistogramRangeOverride() {
+        String histoName = "PhoenixInternalOpenConn";
+        Configuration conf = new Configuration();
+        conf.set(QueryServices.CONNECTION_QUERY_SERVICE_HISTOGRAM_SIZE_RANGES, "2, 5, 8");
+        ConnectionQueryServicesHistogram histogram = new ConnectionQueryServicesHistogram(histoName,
+                "histogram for Number of open internal phoenix connections", conf);
+        Assert.assertEquals(histoName, histogram.getName());
+        long[] ranges = histogram.getRanges();
+        Assert.assertNotNull(ranges);
+        long[] expectRanges = {2,5,8};
+        Assert.assertArrayEquals(expectRanges, ranges);
+    }
+
+    @Test
+    public void testEveryRangeInDefaultRange() {
+        //1, 3, 7, 9, 15, 30, 120, 600
+        Configuration conf = new Configuration();
+        String histoName = "PhoenixInternalOpenConn";
+        conf.unset(QueryServices.CONNECTION_QUERY_SERVICE_HISTOGRAM_SIZE_RANGES);
+        ConnectionQueryServicesHistogram histogram = new ConnectionQueryServicesHistogram(histoName,
+                "histogram for Number of open internal phoenix connections", conf);
+        Assert.assertEquals(histoName, histogram.getName());
+        Assert.assertEquals(ConnectionQueryServicesHistogram.DEFAULT_RANGE, histogram.getRanges());
+
+        histogram.add(1);
+        histogram.add(3);
+        histogram.add(7);
+        histogram.add(9);
+        histogram.add(15);
+        histogram.add(30);
+        histogram.add(120);
+        histogram.add(600);
+
+        Map<String, Long> distribution = histogram.getRangeHistogramDistribution().getRangeDistributionMap();
+        Map<String, Long> expectedMap = new HashMap<>();
+        expectedMap.put("0,1", 1l);
+        expectedMap.put("1,10", 3l);
+        expectedMap.put("10,100", 2l);
+        expectedMap.put("100,500", 1l);
+        expectedMap.put("500,1000", 1l);
+        Assert.assertEquals(expectedMap, distribution);
+    }
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsHistogramsTest.java b/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsHistogramsTest.java
new file mode 100644
index 0000000000..f9fa36375d
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsHistogramsTest.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring.connectionqueryservice;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ConnectionQueryServicesMetricsHistogramsTest {
+    @Test
+    public void testConnectionQueryServiceMetricsHistograms() {
+        String connectionQueryServiceName = "USE_CASE_1";
+        Configuration conf = new Configuration();
+        ConnectionQueryServicesMetricsHistograms
+                connectionQueryServiceMetricsHistograms = new ConnectionQueryServicesMetricsHistograms(connectionQueryServiceName, conf);
+        Assert.assertEquals(connectionQueryServiceName, connectionQueryServiceMetricsHistograms.getConnectionQueryServicesName());
+        Assert.assertNotNull(connectionQueryServiceMetricsHistograms.getConnectionQueryServicesOpenConnHisto());
+        Assert.assertNotNull(connectionQueryServiceMetricsHistograms.getConnectionQueryServicesInternalOpenConnHisto());
+
+        Assert.assertEquals(2, connectionQueryServiceMetricsHistograms.getConnectionQueryServicesHistogramsDistribution().size());
+    }
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsManagerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsManagerTest.java
new file mode 100644
index 0000000000..b2073433c4
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsManagerTest.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring.connectionqueryservice;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.monitoring.ConnectionQueryServicesMetric;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.phoenix.monitoring.MetricType.OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.OPEN_PHOENIX_CONNECTIONS_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.PHOENIX_CONNECTIONS_THROTTLED_COUNTER;
+import static org.apache.phoenix.monitoring.connectionqueryservice.ConnectionQueryServicesNameMetricsTest.connectionQueryServiceNames;
+import static org.apache.phoenix.monitoring.connectionqueryservice.ConnectionQueryServicesNameMetricsTest.openInternalPhoenixConnCounter;
+import static org.apache.phoenix.monitoring.connectionqueryservice.ConnectionQueryServicesNameMetricsTest.openPhoenixConnCounter;
+import static org.apache.phoenix.monitoring.connectionqueryservice.ConnectionQueryServicesNameMetricsTest.phoenixConnThrottledCounter;
+import static org.junit.Assert.assertTrue;
+
+public class ConnectionQueryServicesMetricsManagerTest {
+    public boolean verifyMetricsReset(){
+        Map<String, List<ConnectionQueryServicesMetric>> map =
+                ConnectionQueryServicesMetricsManager.getAllConnectionQueryServicesMetrics();
+        return map != null && map.isEmpty();
+    }
+
+    public boolean verifyConnectionQueryServiceNamesExists(String connectionQueryServiceName){
+        Map<String,List<ConnectionQueryServicesMetric>>map =
+                ConnectionQueryServicesMetricsManager.getAllConnectionQueryServicesMetrics();
+        return map != null && map.containsKey(connectionQueryServiceName);
+    }
+
+    @Test
+    public void testConnectionQueryServiceMetricsForUpdateMetricsMethod() {
+
+        QueryServicesOptions options = QueryServicesOptions.withDefaults();
+        options.setConnectionQueryServiceMetricsEnabled();
+        ConnectionQueryServicesMetricsManager connectionQueryServicesMetricsManager =
+                new ConnectionQueryServicesMetricsManager(options);
+        ConnectionQueryServicesMetricsManager.setInstance(connectionQueryServicesMetricsManager);
+
+        ConnectionQueryServicesNameMetricsTest
+                testData = new ConnectionQueryServicesNameMetricsTest();
+        testData.populateMetrics();
+        for(int i = 0; i < connectionQueryServiceNames.length; i++) {
+            ConnectionQueryServicesMetricsManager.updateMetrics(connectionQueryServiceNames[i],
+                    OPEN_PHOENIX_CONNECTIONS_COUNTER, openPhoenixConnCounter[i]);
+            ConnectionQueryServicesMetricsManager.updateMetrics(connectionQueryServiceNames[i],
+                    OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER, openInternalPhoenixConnCounter[i]);
+            ConnectionQueryServicesMetricsManager.updateMetrics(connectionQueryServiceNames[i],
+                    PHOENIX_CONNECTIONS_THROTTLED_COUNTER, phoenixConnThrottledCounter[i]);
+        }
+        testData.verfiyCountOfConnectionQueryServices(connectionQueryServiceNames.length);
+        ConnectionQueryServicesMetricsManager.clearAllConnectionQueryServiceMetrics();
+        assertTrue(verifyMetricsReset());
+    }
+
+    @Test
+    public void testHistogramMetricsForOpenPhoenixConnectionCounter() {
+        String connectionQueryServiceName = "USE_CASE_1";
+        Configuration conf = new Configuration();
+        conf.set(QueryServices.CONNECTION_QUERY_SERVICE_HISTOGRAM_SIZE_RANGES, "3, 6, 9");
+
+        QueryServicesOptions mockOptions = Mockito.mock(QueryServicesOptions.class);
+        Mockito.doReturn(true).when(mockOptions)
+                .isConnectionQueryServiceMetricsEnabled();
+        Mockito.doReturn(conf).when(mockOptions).getConfiguration();
+        ConnectionQueryServicesMetricsManager connectionQueryServicesMetricsManager =
+                new ConnectionQueryServicesMetricsManager(mockOptions);
+        ConnectionQueryServicesMetricsManager.setInstance(connectionQueryServicesMetricsManager);
+        for (int i=0; i<9; i++) {
+            updateMetricsAndHistogram(i+1, connectionQueryServiceName);
+        }
+
+
+        // Generate distribution map from histogram snapshots.
+        ConnectionQueryServicesHistogram connectionQueryServicesHistogram =
+                ConnectionQueryServicesMetricsManager.getConnectionQueryServiceOpenConnectionHistogram(connectionQueryServiceName);
+
+        Map<String, Long> openPhoenixConnMap = connectionQueryServicesHistogram.getRangeHistogramDistribution().getRangeDistributionMap();
+        for (Long count: openPhoenixConnMap.values()) {
+            Assert.assertEquals(new Long(3), count);
+        }
+    }
+
+    private void updateMetricsAndHistogram (long counter, String connectionQueryServiceName) {
+        ConnectionQueryServicesMetricsManager.updateMetrics(connectionQueryServiceName,
+                OPEN_PHOENIX_CONNECTIONS_COUNTER, counter);
+        ConnectionQueryServicesMetricsManager.updateConnectionQueryServiceOpenConnectionHistogram(counter,
+                connectionQueryServiceName);
+    }
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsTest.java b/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsTest.java
new file mode 100644
index 0000000000..d05a6824b3
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsTest.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring.connectionqueryservice;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.monitoring.ConnectionQueryServicesMetric;
+import org.apache.phoenix.monitoring.MetricType;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+import static org.apache.phoenix.monitoring.connectionqueryservice.ConnectionQueryServicesNameMetricsTest.openInternalPhoenixConnCounter;
+import static org.apache.phoenix.monitoring.connectionqueryservice.ConnectionQueryServicesNameMetricsTest.openPhoenixConnCounter;
+import static org.apache.phoenix.monitoring.connectionqueryservice.ConnectionQueryServicesNameMetricsTest.connectionQueryServiceNames;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ConnectionQueryServicesMetricsTest {
+    static Map<String, ConnectionQueryServicesMetrics> phoenixConnectionQueryServiceSet =
+            new HashMap<>();
+
+    public boolean verifyConnectionQueryServiceName() {
+
+        if (phoenixConnectionQueryServiceSet.isEmpty()) {
+            return false;
+        }
+        for (String connectionQueryServiceName : connectionQueryServiceNames) {
+            ConnectionQueryServicesMetrics instance =
+                    phoenixConnectionQueryServiceSet.get(connectionQueryServiceName);
+            if (!instance.getConnectionQueryServiceName().equals(connectionQueryServiceName)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    public void verifyMetricsFromPhoenixConnectionQueryServiceMetrics() {
+        assertFalse(phoenixConnectionQueryServiceSet.isEmpty());
+        for (int i = 0; i < connectionQueryServiceNames.length; i++) {
+            ConnectionQueryServicesMetrics instance =
+                    phoenixConnectionQueryServiceSet.get(connectionQueryServiceNames[i]);
+            assertEquals(instance.getConnectionQueryServiceName(), connectionQueryServiceNames[i]);
+            List<ConnectionQueryServicesMetric> metricList = instance.getAllMetrics();
+            for (ConnectionQueryServicesMetric metric : metricList) {
+
+                if (metric.getMetricType()
+                        .equals(MetricType.OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER)) {
+                    assertEquals(openInternalPhoenixConnCounter[i], metric.getValue());
+                }
+                if (metric.getMetricType().equals(MetricType.OPEN_PHOENIX_CONNECTIONS_COUNTER)) {
+                    assertEquals(openPhoenixConnCounter[i], metric.getValue());
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testPhoenixConnectionQueryServiceMetricsForPhoenixConnectionQueryServiceName() {
+        Configuration conf = new Configuration();
+        for (int i = 0; i < connectionQueryServiceNames.length; i++) {
+            ConnectionQueryServicesMetrics instance =
+                    new ConnectionQueryServicesMetrics(connectionQueryServiceNames[i], conf);
+            phoenixConnectionQueryServiceSet.put(connectionQueryServiceNames[i], instance);
+        }
+        assertTrue(verifyConnectionQueryServiceName());
+    }
+
+    /**
+     * This test is for changeMetricValue() Method and getMetricMap()
+     */
+    @Test
+    public void testPhoenixConnectionQueryServiceMetrics() {
+        Configuration conf = new Configuration();
+        for (int i = 0; i < connectionQueryServiceNames.length; i++) {
+            ConnectionQueryServicesMetrics instance =
+                    new ConnectionQueryServicesMetrics(connectionQueryServiceNames[i], conf);
+            phoenixConnectionQueryServiceSet.put(connectionQueryServiceNames[i], instance);
+
+            instance.setMetricValue(MetricType.OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER,
+                    openInternalPhoenixConnCounter[i]);
+            instance.setMetricValue(
+                    MetricType.OPEN_PHOENIX_CONNECTIONS_COUNTER, openPhoenixConnCounter[i]);
+        }
+        verifyMetricsFromPhoenixConnectionQueryServiceMetrics();
+        phoenixConnectionQueryServiceSet.clear();
+    }
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesNameMetricsTest.java b/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesNameMetricsTest.java
new file mode 100644
index 0000000000..6a1d26e05f
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesNameMetricsTest.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring.connectionqueryservice;
+
+import org.apache.phoenix.monitoring.ConnectionQueryServicesMetric;
+import org.apache.phoenix.monitoring.MetricType;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.phoenix.monitoring.MetricType.OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.OPEN_PHOENIX_CONNECTIONS_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.PHOENIX_CONNECTIONS_THROTTLED_COUNTER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This class is used primarily to populate data and
+ * verification methods
+ */
+
+public class ConnectionQueryServicesNameMetricsTest {
+
+    public static final String[] connectionQueryServiceNames =
+            { "USE_CASE_1", "USE_CASE_2", "USE_CASE_3" };
+    public static Map<String, Map<MetricType, Long>>[] connectionQueryServiceNameMetricMap =
+            new Map[connectionQueryServiceNames.length];
+    public static final long[] openPhoenixConnCounter = { 1, 1, 1 };
+    public static final long[] openInternalPhoenixConnCounter = { 1, 1, 1 };
+    public static final long[] phoenixConnThrottledCounter = { 1, 2, 3 };
+
+
+    public void populateMetrics() {
+        for (int i = 0; i < connectionQueryServiceNameMetricMap.length; i++) {
+            connectionQueryServiceNameMetricMap[i] = new HashMap<>();
+        }
+        for (int i = 0; i < connectionQueryServiceNames.length; i++) {
+            Map<MetricType, Long> metrics = new HashMap<>();
+            metrics.put(OPEN_PHOENIX_CONNECTIONS_COUNTER, openPhoenixConnCounter[i]);
+            metrics.put(
+                    OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER, openInternalPhoenixConnCounter[i]);
+            metrics.put(PHOENIX_CONNECTIONS_THROTTLED_COUNTER, phoenixConnThrottledCounter[i]);
+
+            connectionQueryServiceNameMetricMap[i].put(connectionQueryServiceNames[i], metrics);
+        }
+    }
+
+    public void verfiyCountOfConnectionQueryServices(int noOfConnectionQueryServiceName) {
+        Map<String, List<ConnectionQueryServicesMetric>> map =
+                ConnectionQueryServicesMetricsManager.getAllConnectionQueryServicesMetrics();
+        assertFalse(map == null || map.isEmpty());
+        for (int i = 0; i < noOfConnectionQueryServiceName; i++) {
+            assertTrue(map.containsKey(connectionQueryServiceNames[i]));
+            List<ConnectionQueryServicesMetric> connectionQueryServiceNameMetric =
+                    map.get(connectionQueryServiceNames[i]);
+            for (ConnectionQueryServicesMetric metric : connectionQueryServiceNameMetric) {
+                if (metric.getMetricType().equals(OPEN_PHOENIX_CONNECTIONS_COUNTER)) {
+                    assertEquals(openPhoenixConnCounter[i], metric.getValue());
+                }
+                if (metric.getMetricType().equals(OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER)) {
+                    assertEquals(openInternalPhoenixConnCounter[i], metric.getValue());
+                }
+                if (metric.getMetricType().equals(PHOENIX_CONNECTIONS_THROTTLED_COUNTER)) {
+                    assertEquals(phoenixConnThrottledCounter[i], metric.getValue());
+                }
+            }
+        }
+    }
+
+}