You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@phoenix.apache.org by GitBox <gi...@apache.org> on 2021/03/29 23:47:11 UTC

[GitHub] [phoenix] ChinmaySKulkarni commented on a change in pull request #1160: Implement TableMetricsManager class and its associated functions for select. upsert and Delete Queries

ChinmaySKulkarni commented on a change in pull request #1160:
URL: https://github.com/apache/phoenix/pull/1160#discussion_r603669887



##########
File path: phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
##########
@@ -1273,6 +1387,121 @@ public void doMutation() throws IOException {
         }
     }
 
+    /**
+     * Update metrics related to failed mutations
+     * @param failedMutationBatch the batch of mutations that failed
+     * @param tableName table that was to be mutated
+     * @param numFailedMutations total number of failed mutations
+     * @param isTransactional true if the table is transactional
+     */
+    @VisibleForTesting
+    public static MutationMetricQueue.MutationMetric updateMutationBatchFailureMetrics(
+            List<Mutation> failedMutationBatch,
+            String tableName,
+            long numFailedMutations,
+            boolean isTransactional) {
+        if (failedMutationBatch == null || failedMutationBatch.isEmpty() ||
+                Strings.isNullOrEmpty(tableName)) {
+            return MutationMetricQueue.MutationMetric.EMPTY_METRIC;
+        }
+        long numUpsertMutationsInBatch = 0L;
+        long numDeleteMutationsInBatch = 0L;
+
+        for (Mutation m : failedMutationBatch) {
+            if (m instanceof Put) {
+                numUpsertMutationsInBatch++;
+            } else if (m instanceof Delete) {
+                numDeleteMutationsInBatch++;
+            }
+        }
+        // Update the MUTATION_BATCH_FAILED_SIZE counter with the number of failed delete mutations
+        // in case we are dealing with all deletes for a non-transactional table, since there is a
+        // bug in sendMutations where we don't get the correct value for numFailedMutations when
+        // we don't use transactions
+        return new MutationMetricQueue.MutationMetric(0, 0, 0, 0, 0,
+                allDeletesMutations && !isTransactional ? numDeleteMutationsInBatch : numFailedMutations,
+                0, 0, 0, 0,
+                numUpsertMutationsInBatch,
+                allUpsertsMutations ? 1 : 0,
+                numDeleteMutationsInBatch,
+                allDeletesMutations ? 1 : 0);
+    }
+
+    /**
+     * Get mutation metrics that correspond to committed mutations only
+     * @param totalMutationBytesObject MutationBytes object corresponding to all the mutations we
+     *                                 attempted to commit including those that failed, those that
+     *                                 were already sent and those that were unsent
+     * @param unsentMutationBatchList list of mutation batches that are unsent
+     * @param numMutations total number of mutations
+     * @param numFailedMutations number of failed mutations in the most recent failed batch
+     * @param numFailedPhase3Mutations number of mutations failed in phase 3 of index commits
+     * @param mutationCommitTime time taken for committing all mutations
+     * @return mutation metric object just accounting for mutations that are already
+     * successfully committed
+     */
+    @VisibleForTesting
+    static MutationMetric getCommittedMutationsMetric(
+            MutationBytes totalMutationBytesObject, List<List<Mutation>> unsentMutationBatchList,
+            long numMutations, long numFailedMutations,
+            long numFailedPhase3Mutations, long mutationCommitTime) {
+        long committedUpsertMutationBytes = totalMutationBytesObject == null ? 0 :
+                totalMutationBytesObject.getUpsertMutationBytes();
+        long committedDeleteMutationBytes = totalMutationBytesObject == null ? 0 :
+                totalMutationBytesObject.getDeleteMutationBytes();
+        long committedUpsertMutationCounter = totalMutationBytesObject == null ? 0 :
+                totalMutationBytesObject.getUpsertMutationCounter();
+        long committedDeleteMutationCounter = totalMutationBytesObject == null ? 0 :
+                totalMutationBytesObject.getDeleteMutationCounter();
+        long committedTotalMutationBytes = totalMutationBytesObject == null ? 0 :
+                totalMutationBytesObject.getTotalMutationBytes();
+        long upsertMutationCommitTime = 0L;
+        long deleteMutationCommitTime = 0L;
+
+        if (totalMutationBytesObject != null && numFailedMutations != 0) {
+            List<Mutation> uncommittedMutationsList = new ArrayList<>();
+            for (List<Mutation> mutationBatch : unsentMutationBatchList) {
+                uncommittedMutationsList.addAll(mutationBatch);
+            }
+            // Calculate the uncommitted mutations
+            MutationBytes uncommittedMutationBytesObject =
+                    calculateMutationSize(uncommittedMutationsList, false);
+            committedUpsertMutationBytes -=
+                    uncommittedMutationBytesObject.getUpsertMutationBytes();
+            committedDeleteMutationBytes -=
+                    uncommittedMutationBytesObject.getDeleteMutationBytes();
+            committedUpsertMutationCounter -=
+                    uncommittedMutationBytesObject.getUpsertMutationCounter();
+            committedDeleteMutationCounter -=
+                    uncommittedMutationBytesObject.getDeleteMutationCounter();
+            committedTotalMutationBytes -=
+                    uncommittedMutationBytesObject.getTotalMutationBytes();
+        }
+
+        // TODO: For V1, we don't expect mixed upserts and deletes so this is fine,

Review comment:
       Remove this comment or modify it to explain what should be covered in "V1" version of this feature

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java
##########
@@ -0,0 +1,1241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
+import org.apache.phoenix.exception.PhoenixIOException;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.CommitException;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
+import org.apache.phoenix.jdbc.PhoenixTestDriver;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.ConfigurationFactory;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.ConnectionQueryServicesImpl;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesTestImpl;
+import org.apache.phoenix.util.DelayedOrFailingRegionServer;
+import org.apache.phoenix.util.EnvironmentEdge;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.InstanceResolver;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ReadOnlyProps;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME;
+import static org.apache.phoenix.exception.SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY;
+import static org.apache.phoenix.exception.SQLExceptionCode.GET_TABLE_REGIONS_FAIL;
+import static org.apache.phoenix.exception.SQLExceptionCode.OPERATION_TIMED_OUT;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BYTES;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIME;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SCAN_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_AGGREGATE_FAILURE_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_AGGREGATE_SUCCESS_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_BATCH_FAILED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_BATCH_FAILED_SIZE;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_COMMIT_TIME;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_FAILED_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_MUTATION_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_MUTATION_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_SQL_QUERY_TIME;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_SUCCESS_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_FAILED_SIZE;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_SIZE;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_FAILED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_POINTLOOKUP_FAILED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_POINTLOOKUP_TIMEOUT_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_SCAN_FAILED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_SCAN_TIMEOUT_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_TIMEOUT_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.RESULT_SET_TIME_MS;
+import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_AGGREGATE_FAILURE_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_AGGREGATE_SUCCESS_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_FAILED_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_POINTLOOKUP_FAILED_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_POINTLOOKUP_SUCCESS_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_SCAN_FAILED_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_SCAN_SUCCESS_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_SQL_QUERY_TIME;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_SUCCESS_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_AGGREGATE_FAILURE_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_AGGREGATE_SUCCESS_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_BATCH_FAILED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_BATCH_FAILED_SIZE;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_COMMIT_TIME;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_FAILED_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_MUTATION_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_MUTATION_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_SQL_QUERY_TIME;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_SUCCESS_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.PhoenixMetricsIT.POINT_LOOKUP_SELECT_QUERY;
+import static org.apache.phoenix.monitoring.PhoenixMetricsIT.RANGE_SCAN_SELECT_QUERY;
+import static org.apache.phoenix.monitoring.PhoenixMetricsIT.createTableAndInsertValues;
+import static org.apache.phoenix.monitoring.PhoenixMetricsIT.doPointDeleteFromTable;
+import static org.apache.phoenix.monitoring.PhoenixMetricsIT.doDeleteAllFromTable;
+import static org.apache.phoenix.util.DelayedOrFailingRegionServer.INJECTED_EXCEPTION_STRING;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+import static org.apache.phoenix.util.PhoenixRuntime.clearTableLevelMetrics;
+import static org.apache.phoenix.util.PhoenixRuntime.getOverAllReadRequestMetricInfo;
+import static org.apache.phoenix.util.PhoenixRuntime.getPhoenixTableClientMetrics;
+import static org.apache.phoenix.util.PhoenixRuntime.getRequestReadMetricInfo;
+import static org.apache.phoenix.util.PhoenixRuntime.getWriteMetricInfoForMutationsSinceLastReset;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class PhoenixTableLevelMetricsIT extends BaseUniqueNamesOwnClusterIT {
+
+    private static final String
+            CREATE_TABLE_DDL =
+            "CREATE TABLE %s (K VARCHAR(%d) NOT NULL" + " PRIMARY KEY, V VARCHAR)";
+    private static final String UPSERT_DML = "UPSERT INTO %s VALUES (?, ?)";
+    private static final String KEY = "key";
+    private static final String VALUE = "value";
+    private static boolean failExecuteQueryAndClientSideDeletes;
+    private static long injectDelay;
+    private static HBaseTestingUtility hbaseTestUtil;
+
+    @BeforeClass public static void doSetup() throws Exception {
+        final Configuration conf = HBaseConfiguration.create();
+        conf.set(QueryServices.TABLE_LEVEL_METRICS_ENABLED, String.valueOf(true));
+        conf.set(QueryServices.METRIC_PUBLISHER_ENABLED, String.valueOf(true));
+        conf.set(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, String.valueOf(true));
+
+        InstanceResolver.clearSingletons();
+        // Override to get required config for static fields loaded that require HBase config
+        InstanceResolver.getSingleton(ConfigurationFactory.class, new ConfigurationFactory() {
+
+            @Override public Configuration getConfiguration() {
+                return conf;
+            }
+
+            @Override public Configuration getConfiguration(Configuration confToClone) {
+                Configuration copy = new Configuration(conf);
+                copy.addResource(confToClone);
+                return copy;
+            }
+        });
+        hbaseTestUtil = new HBaseTestingUtility();
+        hbaseTestUtil.startMiniCluster(1, 1, null, null, DelayedOrFailingRegionServer.class);
+        // establish url and quorum. Need to use PhoenixDriver and not PhoenixTestDriver
+        String zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort();
+        url = PhoenixRuntime.JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkQuorum;
+
+        // Add our own driver
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        props.put(BaseTest.DRIVER_CLASS_NAME_ATTRIB, PhoenixMetricsTestingDriver.class.getName());
+        initAndRegisterTestDriver(url, new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    @AfterClass public static void tearDownMiniCluster() {
+        try {
+            if (hbaseTestUtil != null) {
+                hbaseTestUtil.shutdownMiniCluster();
+            }
+        } catch (Exception e) {
+            // ignore
+        }
+    }
+
+    /**
+     * Assert table-level metrics related to SELECT queries
+     *
+     * @param tableName                           table name
+     * @param isPointLookup                       true if it is a point lookup
+     * @param expectedSqlSuccessCt                expected number of successes related to query execution
+     * @param expectedSqlFailureCt                expected number of failures related to query execution
+     * @param expectedMinTimeElapsed              minimum expected time elapsed during query execution
+     * @param hasResultSetIterationStarted        true if we have actually started issuing the scan(s) and
+     *                                            iterating over results via ResultSet.next() calls
+     * @param expectedResultSetIterFailedCounter  expected number of failures related to rs.next()
+     * @param expectedResultSetIterTimeoutCounter expected number of timeouts related to rs.next()
+     * @param rs                                  current ResultSet which we can use to check table-level metric values against
+     *                                            the ReadMetricQueue and OverallQueryMetrics. Null indicates that rs iteration
+     *                                            has not started yet due to an exception in the executeMutation step itself
+     */
+    static void assertSelectQueryTableMetrics(final String tableName, final boolean isPointLookup,
+            final long expectedSelectAggregateSuccessCt,
+            final long expectedSelectAggregateFailureCt, final long expectedSqlSuccessCt,
+            final long expectedSqlFailureCt, final long expectedMinTimeElapsed,
+            final boolean hasResultSetIterationStarted,
+            final long expectedResultSetIterFailedCounter,
+            final long expectedResultSetIterTimeoutCounter, final ResultSet rs)
+            throws SQLException {
+        // The resultSet must be closed since we modify certain timing related metrics when calling rs.close()
+        if (hasResultSetIterationStarted) {
+            assertTrue(rs != null && rs.isClosed());
+        } else {
+            assertTrue(rs == null || rs.isBeforeFirst());
+        }
+        assertFalse(getPhoenixTableClientMetrics().isEmpty());
+        assertFalse(getPhoenixTableClientMetrics().get(tableName).isEmpty());
+        final long

Review comment:
       nit: Fix formatting

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/monitoring/TableClientMetrics.java
##########
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.monitoring;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.metrics.Gauge;
+import org.apache.hadoop.hbase.metrics.MetricRegistry;
+
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_SIZE;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_FAILED_SIZE;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_POINTLOOKUP_FAILED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_POINTLOOKUP_TIMEOUT_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_SCAN_FAILED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_SCAN_TIMEOUT_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.RESULT_SET_TIME_MS;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_FAILED_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_SUCCESS_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_MUTATION_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_MUTATION_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_BATCH_FAILED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_BATCH_FAILED_SIZE;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_COMMIT_TIME;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_MUTATION_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_MUTATION_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_COMMIT_TIME;
+import static org.apache.phoenix.monitoring.MetricType.TASK_END_TO_END_TIME;
+import static org.apache.phoenix.monitoring.MetricType.COUNT_ROWS_SCANNED;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_FAILED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_TIMEOUT_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_POINTLOOKUP_SUCCESS_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_POINTLOOKUP_FAILED_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_SQL_QUERY_TIME;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_SCAN_SUCCESS_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_SCAN_FAILED_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_FAILED_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_SQL_QUERY_TIME;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_SUCCESS_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_BATCH_FAILED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_BATCH_FAILED_SIZE;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_COMMIT_TIME;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_FAILED_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_SQL_QUERY_TIME;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_SUCCESS_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_AGGREGATE_SUCCESS_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_AGGREGATE_FAILURE_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_AGGREGATE_SUCCESS_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_AGGREGATE_FAILURE_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_AGGREGATE_SUCCESS_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_AGGREGATE_FAILURE_SQL_COUNTER;
+
+/**
+ * This is used by TableMetricsManager class to store instance of
+ * object associated with a tableName.
+ */
+public class TableClientMetrics {
+
+    public enum TableMetrics {
+        TABLE_MUTATION_BATCH_FAILED_SIZE(MUTATION_BATCH_FAILED_SIZE), TABLE_MUTATION_BATCH_SIZE(
+                MUTATION_BATCH_SIZE), TABLE_MUTATION_BYTES(
+                MUTATION_BYTES), TABLE_UPSERT_MUTATION_BYTES(
+                UPSERT_MUTATION_BYTES), TABLE_UPSERT_MUTATION_SQL_COUNTER(
+                UPSERT_MUTATION_SQL_COUNTER), TABLE_DELETE_MUTATION_BYTES(
+                DELETE_MUTATION_BYTES), TABLE_DELETE_MUTATION_SQL_COUNTER(
+                DELETE_MUTATION_SQL_COUNTER), TABLE_MUTATION_SQL_COUNTER(
+                MUTATION_SQL_COUNTER), TABLE_MUTATION_COMMIT_TIME(
+                MUTATION_COMMIT_TIME), TABLE_UPSERT_SQL_COUNTER(
+                UPSERT_SQL_COUNTER), TABLE_UPSERT_SQL_QUERY_TIME(
+                UPSERT_SQL_QUERY_TIME), TABLE_SUCCESS_UPSERT_SQL_COUNTER(
+                UPSERT_SUCCESS_SQL_COUNTER), TABLE_FAILED_UPSERT_SQL_COUNTER(
+                UPSERT_FAILED_SQL_COUNTER), TABLE_UPSERT_BATCH_FAILED_SIZE(
+                UPSERT_BATCH_FAILED_SIZE), TABLE_UPSERT_BATCH_FAILED_COUNTER(
+                UPSERT_BATCH_FAILED_COUNTER), TABLE_DELETE_SQL_COUNTER(
+                DELETE_SQL_COUNTER), TABLE_DELETE_SQL_QUERY_TIME(
+                DELETE_SQL_QUERY_TIME), TABLE_SUCCESS_DELETE_SQL_COUNTER(
+                DELETE_SUCCESS_SQL_COUNTER), TABLE_FAILED_DELETE_SQL_COUNTER(
+                DELETE_FAILED_SQL_COUNTER), TABLE_DELETE_BATCH_FAILED_SIZE(
+                DELETE_BATCH_FAILED_SIZE), TABLE_DELETE_BATCH_FAILED_COUNTER(
+                DELETE_BATCH_FAILED_COUNTER), TABLE_UPSERT_COMMIT_TIME(
+                UPSERT_COMMIT_TIME), TABLE_DELETE_COMMIT_TIME(
+                DELETE_COMMIT_TIME), TABLE_TASK_END_TO_END_TIME(
+                TASK_END_TO_END_TIME), TABLE_COUNT_ROWS_SCANNED(
+                COUNT_ROWS_SCANNED), TABLE_QUERY_FAILED_COUNTER(
+                QUERY_FAILED_COUNTER), TABLE_QUERY_POINTLOOKUP_FAILED_COUNTER(
+                QUERY_POINTLOOKUP_FAILED_COUNTER), TABLE_QUERY_SCAN_FAILED_COUNTER(
+                QUERY_SCAN_FAILED_COUNTER), TABLE_QUERY_TIMEOUT_COUNTER(
+                QUERY_TIMEOUT_COUNTER), TABLE_QUERY_POINTLOOKUP_TIMEOUT_COUNTER(
+                QUERY_POINTLOOKUP_TIMEOUT_COUNTER), TABLE_QUERY_SCAN_TIMEOUT_COUNTER(
+                QUERY_SCAN_TIMEOUT_COUNTER), TABLE_SELECT_QUERY_RESULT_SET_MS(
+                RESULT_SET_TIME_MS), TABLE_SCANBYTES(SCAN_BYTES), TABLE_SELECT_SQL_COUNTER(
+                SELECT_SQL_COUNTER), TABLE_SELECT_SQL_QUERY_TIME(
+                SELECT_SQL_QUERY_TIME), TABLE_SUCCESS_SELECT_SQL_COUNTER(
+                SELECT_SUCCESS_SQL_COUNTER), TABLE_FAILED_SELECT_SQL_COUNTER(
+                SELECT_FAILED_SQL_COUNTER), TABLE_SELECT_POINTLOOKUP_COUNTER_SUCCESS(
+                SELECT_POINTLOOKUP_SUCCESS_SQL_COUNTER), TABLE_SELECT_POINTLOOKUP_COUNTER_FAILED(
+                SELECT_POINTLOOKUP_FAILED_SQL_COUNTER), TABLE_SELECT_SCAN_COUNTER_SUCCESS(
+                SELECT_SCAN_SUCCESS_SQL_COUNTER), TABLE_SELECT_SCAN_COUNTER_FAILED(
+                SELECT_SCAN_FAILED_SQL_COUNTER), TABLE_UPSERT_AGGREGATE_SUCCESS_SQL_COUNTER(
+                UPSERT_AGGREGATE_SUCCESS_SQL_COUNTER), TABLE_UPSERT_AGGREGATE_FAILURE_SQL_COUNTER(
+                UPSERT_AGGREGATE_FAILURE_SQL_COUNTER), TABLE_DELETE_AGGREGATE_SUCCESS_SQL_COUNTER(
+                DELETE_AGGREGATE_SUCCESS_SQL_COUNTER), TABLE_DELETE_AGGREGATE_FAILURE_SQL_COUNTER(
+                DELETE_AGGREGATE_FAILURE_SQL_COUNTER), TABLE_SELECT_AGGREGATE_SUCCESS_SQL_COUNTER(
+                SELECT_AGGREGATE_SUCCESS_SQL_COUNTER), TABLE_SELECT_AGGREGATE_FAILURE_SQL_COUNTER(
+                SELECT_AGGREGATE_FAILURE_SQL_COUNTER);
+
+        private MetricType metricType;
+        private PhoenixTableMetric metric;
+
+        TableMetrics(MetricType metricType) {
+            this.metricType = metricType;
+        }
+    }
+
+    private final String tableName;
+    private Map<MetricType, PhoenixTableMetric> metricRegister;

Review comment:
       can probably be `final`

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/monitoring/TableMetricsManager.java
##########
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.monitoring;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Central place where we keep track of all the Table Level metrics. Register each tableMetrics and
+ * store the instance of it associated with TableName in a map
+ * This class exposes following functions as static methods to help catch all execptions
+ * 1.clearTableLevelMetricsMethod
+ * 2.getTableMetricsMethod
+ * 3.pushMetricsFromConnInstanceMethod
+ * 4.updateMetricsMethod
+ */
+
+public class TableMetricsManager {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(TableMetricsManager.class);
+    private static boolean isTableLevelMetricsEnabled;
+    private static boolean isMetricPublisherEnabled;
+    private static final Set<String> allowedListOfTableNames = new HashSet<>();
+    private static ConcurrentMap<String, TableClientMetrics> tableClientMetricsMapping = null;
+    // Singleton object
+    protected static TableMetricsManager tableMetricsManager = null;
+    private static MetricPublisherSupplierFactory mPublisher = null;
+    private static QueryServicesOptions options = null;
+
+    public TableMetricsManager(QueryServicesOptions ops) {
+        options = ops;
+        isTableLevelMetricsEnabled = options.isTableLevelMetricsEnabled();
+        LOGGER.info(String.format("Phoenix Table metrics enabled status: %s",
+                isTableLevelMetricsEnabled));
+        tableClientMetricsMapping = new ConcurrentHashMap<>();
+
+        String tableNamesList = options.getAllowedListTableNames();
+        if (tableNamesList != null && !tableNamesList.isEmpty()) {
+            for (String tableName : tableNamesList.split(",")) {
+                allowedListOfTableNames.add(tableName);
+            }
+        }
+        isMetricPublisherEnabled = options.isMetricPublisherEnabled();
+        LOGGER.info(String.format("Phoenix table level metrics publisher enabled status %s",
+                isMetricPublisherEnabled));
+    }
+
+    public TableMetricsManager() {
+    }
+
+    @VisibleForTesting public static void setInstance(TableMetricsManager metricsManager) {
+        tableMetricsManager = metricsManager;
+    }
+
+    /**
+     * Method to provide instance of TableMetricsManager(Create if needed in thread safe manner)
+     *
+     * @return
+     */
+    private static TableMetricsManager getInstance() {
+
+        if (tableMetricsManager == null) {
+            synchronized (TableMetricsManager.class) {
+                if (tableMetricsManager == null) {

Review comment:
       For double-checked locking to work properly, `tableMetricsManager` will need to be declared `volatile`.

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/monitoring/TableMetricsManager.java
##########
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.monitoring;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Central place where we keep track of all the Table Level metrics. Register each tableMetrics and
+ * store the instance of it associated with TableName in a map
+ * This class exposes following functions as static methods to help catch all execptions
+ * 1.clearTableLevelMetricsMethod
+ * 2.getTableMetricsMethod
+ * 3.pushMetricsFromConnInstanceMethod
+ * 4.updateMetricsMethod
+ */
+
+public class TableMetricsManager {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(TableMetricsManager.class);
+    private static boolean isTableLevelMetricsEnabled;
+    private static boolean isMetricPublisherEnabled;
+    private static final Set<String> allowedListOfTableNames = new HashSet<>();
+    private static ConcurrentMap<String, TableClientMetrics> tableClientMetricsMapping = null;
+    // Singleton object
+    protected static TableMetricsManager tableMetricsManager = null;
+    private static MetricPublisherSupplierFactory mPublisher = null;
+    private static QueryServicesOptions options = null;
+
+    public TableMetricsManager(QueryServicesOptions ops) {
+        options = ops;
+        isTableLevelMetricsEnabled = options.isTableLevelMetricsEnabled();
+        LOGGER.info(String.format("Phoenix Table metrics enabled status: %s",
+                isTableLevelMetricsEnabled));
+        tableClientMetricsMapping = new ConcurrentHashMap<>();
+
+        String tableNamesList = options.getAllowedListTableNames();
+        if (tableNamesList != null && !tableNamesList.isEmpty()) {
+            for (String tableName : tableNamesList.split(",")) {
+                allowedListOfTableNames.add(tableName);
+            }
+        }
+        isMetricPublisherEnabled = options.isMetricPublisherEnabled();
+        LOGGER.info(String.format("Phoenix table level metrics publisher enabled status %s",
+                isMetricPublisherEnabled));
+    }
+
+    public TableMetricsManager() {
+    }
+
+    @VisibleForTesting public static void setInstance(TableMetricsManager metricsManager) {
+        tableMetricsManager = metricsManager;
+    }
+
+    /**
+     * Method to provide instance of TableMetricsManager(Create if needed in thread safe manner)
+     *
+     * @return
+     */
+    private static TableMetricsManager getInstance() {
+
+        if (tableMetricsManager == null) {
+            synchronized (TableMetricsManager.class) {
+                if (tableMetricsManager == null) {

Review comment:
       Same with other variables of this class, need to be either `volatile` or if possible `final`

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/monitoring/TableClientMetrics.java
##########
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.monitoring;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.metrics.Gauge;
+import org.apache.hadoop.hbase.metrics.MetricRegistry;
+
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_SIZE;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_FAILED_SIZE;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_POINTLOOKUP_FAILED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_POINTLOOKUP_TIMEOUT_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_SCAN_FAILED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_SCAN_TIMEOUT_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.RESULT_SET_TIME_MS;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_FAILED_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_SUCCESS_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_MUTATION_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_MUTATION_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_BATCH_FAILED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_BATCH_FAILED_SIZE;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_COMMIT_TIME;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_MUTATION_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_MUTATION_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_COMMIT_TIME;
+import static org.apache.phoenix.monitoring.MetricType.TASK_END_TO_END_TIME;
+import static org.apache.phoenix.monitoring.MetricType.COUNT_ROWS_SCANNED;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_FAILED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_TIMEOUT_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_POINTLOOKUP_SUCCESS_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_POINTLOOKUP_FAILED_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_SQL_QUERY_TIME;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_SCAN_SUCCESS_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_SCAN_FAILED_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_FAILED_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_SQL_QUERY_TIME;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_SUCCESS_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_BATCH_FAILED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_BATCH_FAILED_SIZE;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_COMMIT_TIME;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_FAILED_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_SQL_QUERY_TIME;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_SUCCESS_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_AGGREGATE_SUCCESS_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_AGGREGATE_FAILURE_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_AGGREGATE_SUCCESS_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_AGGREGATE_FAILURE_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_AGGREGATE_SUCCESS_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_AGGREGATE_FAILURE_SQL_COUNTER;
+
+/**
+ * This is used by TableMetricsManager class to store instance of
+ * object associated with a tableName.
+ */
+public class TableClientMetrics {
+
+    public enum TableMetrics {
+        TABLE_MUTATION_BATCH_FAILED_SIZE(MUTATION_BATCH_FAILED_SIZE), TABLE_MUTATION_BATCH_SIZE(
+                MUTATION_BATCH_SIZE), TABLE_MUTATION_BYTES(
+                MUTATION_BYTES), TABLE_UPSERT_MUTATION_BYTES(
+                UPSERT_MUTATION_BYTES), TABLE_UPSERT_MUTATION_SQL_COUNTER(
+                UPSERT_MUTATION_SQL_COUNTER), TABLE_DELETE_MUTATION_BYTES(
+                DELETE_MUTATION_BYTES), TABLE_DELETE_MUTATION_SQL_COUNTER(
+                DELETE_MUTATION_SQL_COUNTER), TABLE_MUTATION_SQL_COUNTER(
+                MUTATION_SQL_COUNTER), TABLE_MUTATION_COMMIT_TIME(
+                MUTATION_COMMIT_TIME), TABLE_UPSERT_SQL_COUNTER(
+                UPSERT_SQL_COUNTER), TABLE_UPSERT_SQL_QUERY_TIME(
+                UPSERT_SQL_QUERY_TIME), TABLE_SUCCESS_UPSERT_SQL_COUNTER(
+                UPSERT_SUCCESS_SQL_COUNTER), TABLE_FAILED_UPSERT_SQL_COUNTER(
+                UPSERT_FAILED_SQL_COUNTER), TABLE_UPSERT_BATCH_FAILED_SIZE(
+                UPSERT_BATCH_FAILED_SIZE), TABLE_UPSERT_BATCH_FAILED_COUNTER(
+                UPSERT_BATCH_FAILED_COUNTER), TABLE_DELETE_SQL_COUNTER(
+                DELETE_SQL_COUNTER), TABLE_DELETE_SQL_QUERY_TIME(
+                DELETE_SQL_QUERY_TIME), TABLE_SUCCESS_DELETE_SQL_COUNTER(
+                DELETE_SUCCESS_SQL_COUNTER), TABLE_FAILED_DELETE_SQL_COUNTER(
+                DELETE_FAILED_SQL_COUNTER), TABLE_DELETE_BATCH_FAILED_SIZE(
+                DELETE_BATCH_FAILED_SIZE), TABLE_DELETE_BATCH_FAILED_COUNTER(
+                DELETE_BATCH_FAILED_COUNTER), TABLE_UPSERT_COMMIT_TIME(
+                UPSERT_COMMIT_TIME), TABLE_DELETE_COMMIT_TIME(
+                DELETE_COMMIT_TIME), TABLE_TASK_END_TO_END_TIME(
+                TASK_END_TO_END_TIME), TABLE_COUNT_ROWS_SCANNED(
+                COUNT_ROWS_SCANNED), TABLE_QUERY_FAILED_COUNTER(
+                QUERY_FAILED_COUNTER), TABLE_QUERY_POINTLOOKUP_FAILED_COUNTER(
+                QUERY_POINTLOOKUP_FAILED_COUNTER), TABLE_QUERY_SCAN_FAILED_COUNTER(
+                QUERY_SCAN_FAILED_COUNTER), TABLE_QUERY_TIMEOUT_COUNTER(
+                QUERY_TIMEOUT_COUNTER), TABLE_QUERY_POINTLOOKUP_TIMEOUT_COUNTER(
+                QUERY_POINTLOOKUP_TIMEOUT_COUNTER), TABLE_QUERY_SCAN_TIMEOUT_COUNTER(
+                QUERY_SCAN_TIMEOUT_COUNTER), TABLE_SELECT_QUERY_RESULT_SET_MS(
+                RESULT_SET_TIME_MS), TABLE_SCANBYTES(SCAN_BYTES), TABLE_SELECT_SQL_COUNTER(
+                SELECT_SQL_COUNTER), TABLE_SELECT_SQL_QUERY_TIME(
+                SELECT_SQL_QUERY_TIME), TABLE_SUCCESS_SELECT_SQL_COUNTER(
+                SELECT_SUCCESS_SQL_COUNTER), TABLE_FAILED_SELECT_SQL_COUNTER(
+                SELECT_FAILED_SQL_COUNTER), TABLE_SELECT_POINTLOOKUP_COUNTER_SUCCESS(
+                SELECT_POINTLOOKUP_SUCCESS_SQL_COUNTER), TABLE_SELECT_POINTLOOKUP_COUNTER_FAILED(
+                SELECT_POINTLOOKUP_FAILED_SQL_COUNTER), TABLE_SELECT_SCAN_COUNTER_SUCCESS(
+                SELECT_SCAN_SUCCESS_SQL_COUNTER), TABLE_SELECT_SCAN_COUNTER_FAILED(
+                SELECT_SCAN_FAILED_SQL_COUNTER), TABLE_UPSERT_AGGREGATE_SUCCESS_SQL_COUNTER(
+                UPSERT_AGGREGATE_SUCCESS_SQL_COUNTER), TABLE_UPSERT_AGGREGATE_FAILURE_SQL_COUNTER(
+                UPSERT_AGGREGATE_FAILURE_SQL_COUNTER), TABLE_DELETE_AGGREGATE_SUCCESS_SQL_COUNTER(
+                DELETE_AGGREGATE_SUCCESS_SQL_COUNTER), TABLE_DELETE_AGGREGATE_FAILURE_SQL_COUNTER(
+                DELETE_AGGREGATE_FAILURE_SQL_COUNTER), TABLE_SELECT_AGGREGATE_SUCCESS_SQL_COUNTER(
+                SELECT_AGGREGATE_SUCCESS_SQL_COUNTER), TABLE_SELECT_AGGREGATE_FAILURE_SQL_COUNTER(
+                SELECT_AGGREGATE_FAILURE_SQL_COUNTER);
+
+        private MetricType metricType;

Review comment:
       can probably be `final`

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricServiceResolver.java
##########
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.monitoring;
+
+import com.google.common.base.Preconditions;
+import org.apache.phoenix.util.InstanceResolver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * This class helps resolve the metricpublisher supplier class at the run time.
+ * Based on the classString name passed, it will return the appropriate class Instance.
+ */
+public class MetricServiceResolver {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(MetricServiceResolver.class);
+
+    private MetricPublisherSupplierFactory metricSupplier = null;
+
+    public MetricPublisherSupplierFactory instantiate(String classString) {
+        Preconditions.checkNotNull(classString);
+        if (metricSupplier == null) {
+            try {
+                Class clazz = Class.forName(classString);
+                List<MetricPublisherSupplierFactory>
+                        factoryList =
+                        InstanceResolver.get(MetricPublisherSupplierFactory.class, null);
+                for (MetricPublisherSupplierFactory factory : factoryList) {
+                    if (clazz.isInstance(factory)) {
+                        metricSupplier = factory;
+                        LOGGER.info(String.format(
+                                "Sucessfully loaded class for MetricPublishFactory of type: %s",
+                                classString));
+                        break;
+                    }
+                }
+                if (metricSupplier == null) {
+                    String msg = String.format("Could not load/instantiate class %s", classString);
+                    LOGGER.error(msg);
+                }
+            } catch (ClassNotFoundException e) {
+                LOGGER.error(String.format("Could not load/instantiate class %s", classString), e);

Review comment:
       Should this not be bubbled up? What will happen if a ClassNotFoundException is swallowed and we continue?

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/monitoring/TableMetricsManager.java
##########
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.monitoring;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Central place where we keep track of all the Table Level metrics. Register each tableMetrics and
+ * store the instance of it associated with TableName in a map
+ * This class exposes following functions as static methods to help catch all execptions
+ * 1.clearTableLevelMetricsMethod
+ * 2.getTableMetricsMethod
+ * 3.pushMetricsFromConnInstanceMethod
+ * 4.updateMetricsMethod
+ */
+
+public class TableMetricsManager {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(TableMetricsManager.class);
+    private static boolean isTableLevelMetricsEnabled;
+    private static boolean isMetricPublisherEnabled;
+    private static final Set<String> allowedListOfTableNames = new HashSet<>();
+    private static ConcurrentMap<String, TableClientMetrics> tableClientMetricsMapping = null;
+    // Singleton object
+    protected static TableMetricsManager tableMetricsManager = null;
+    private static MetricPublisherSupplierFactory mPublisher = null;
+    private static QueryServicesOptions options = null;
+
+    public TableMetricsManager(QueryServicesOptions ops) {

Review comment:
       Also, just to prevent object creation via reflection you can throw a RunTimeException inside the constructor if `tableMetricsManager != null`

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/monitoring/NoOpTableMetricsManager.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import org.apache.phoenix.query.QueryServicesOptions;
+
+import java.util.List;
+import java.util.Map;
+
+public class NoOpTableMetricsManager extends TableMetricsManager {

Review comment:
       Can you add a comment about where/why this implementation is used?

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/monitoring/TableMetricsManager.java
##########
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.monitoring;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Central place where we keep track of all the Table Level metrics. Register each tableMetrics and
+ * store the instance of it associated with TableName in a map
+ * This class exposes following functions as static methods to help catch all execptions
+ * 1.clearTableLevelMetricsMethod
+ * 2.getTableMetricsMethod
+ * 3.pushMetricsFromConnInstanceMethod
+ * 4.updateMetricsMethod
+ */
+
+public class TableMetricsManager {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(TableMetricsManager.class);
+    private static boolean isTableLevelMetricsEnabled;
+    private static boolean isMetricPublisherEnabled;
+    private static final Set<String> allowedListOfTableNames = new HashSet<>();
+    private static ConcurrentMap<String, TableClientMetrics> tableClientMetricsMapping = null;
+    // Singleton object
+    protected static TableMetricsManager tableMetricsManager = null;

Review comment:
       Can this not be private?

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/monitoring/TableClientMetrics.java
##########
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.monitoring;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.metrics.Gauge;
+import org.apache.hadoop.hbase.metrics.MetricRegistry;
+
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_SIZE;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_FAILED_SIZE;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_POINTLOOKUP_FAILED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_POINTLOOKUP_TIMEOUT_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_SCAN_FAILED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_SCAN_TIMEOUT_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.RESULT_SET_TIME_MS;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_FAILED_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_SUCCESS_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_MUTATION_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_MUTATION_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_BATCH_FAILED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_BATCH_FAILED_SIZE;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_COMMIT_TIME;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_MUTATION_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_MUTATION_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_COMMIT_TIME;
+import static org.apache.phoenix.monitoring.MetricType.TASK_END_TO_END_TIME;
+import static org.apache.phoenix.monitoring.MetricType.COUNT_ROWS_SCANNED;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_FAILED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_TIMEOUT_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_POINTLOOKUP_SUCCESS_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_POINTLOOKUP_FAILED_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_SQL_QUERY_TIME;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_SCAN_SUCCESS_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_SCAN_FAILED_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_FAILED_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_SQL_QUERY_TIME;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_SUCCESS_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_BATCH_FAILED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_BATCH_FAILED_SIZE;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_COMMIT_TIME;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_FAILED_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_SQL_QUERY_TIME;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_SUCCESS_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_AGGREGATE_SUCCESS_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.UPSERT_AGGREGATE_FAILURE_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_AGGREGATE_SUCCESS_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.DELETE_AGGREGATE_FAILURE_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_AGGREGATE_SUCCESS_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_AGGREGATE_FAILURE_SQL_COUNTER;
+
+/**
+ * This is used by TableMetricsManager class to store instance of
+ * object associated with a tableName.
+ */
+public class TableClientMetrics {
+
+    public enum TableMetrics {
+        TABLE_MUTATION_BATCH_FAILED_SIZE(MUTATION_BATCH_FAILED_SIZE), TABLE_MUTATION_BATCH_SIZE(
+                MUTATION_BATCH_SIZE), TABLE_MUTATION_BYTES(
+                MUTATION_BYTES), TABLE_UPSERT_MUTATION_BYTES(
+                UPSERT_MUTATION_BYTES), TABLE_UPSERT_MUTATION_SQL_COUNTER(
+                UPSERT_MUTATION_SQL_COUNTER), TABLE_DELETE_MUTATION_BYTES(
+                DELETE_MUTATION_BYTES), TABLE_DELETE_MUTATION_SQL_COUNTER(
+                DELETE_MUTATION_SQL_COUNTER), TABLE_MUTATION_SQL_COUNTER(
+                MUTATION_SQL_COUNTER), TABLE_MUTATION_COMMIT_TIME(
+                MUTATION_COMMIT_TIME), TABLE_UPSERT_SQL_COUNTER(
+                UPSERT_SQL_COUNTER), TABLE_UPSERT_SQL_QUERY_TIME(
+                UPSERT_SQL_QUERY_TIME), TABLE_SUCCESS_UPSERT_SQL_COUNTER(
+                UPSERT_SUCCESS_SQL_COUNTER), TABLE_FAILED_UPSERT_SQL_COUNTER(
+                UPSERT_FAILED_SQL_COUNTER), TABLE_UPSERT_BATCH_FAILED_SIZE(
+                UPSERT_BATCH_FAILED_SIZE), TABLE_UPSERT_BATCH_FAILED_COUNTER(
+                UPSERT_BATCH_FAILED_COUNTER), TABLE_DELETE_SQL_COUNTER(
+                DELETE_SQL_COUNTER), TABLE_DELETE_SQL_QUERY_TIME(
+                DELETE_SQL_QUERY_TIME), TABLE_SUCCESS_DELETE_SQL_COUNTER(
+                DELETE_SUCCESS_SQL_COUNTER), TABLE_FAILED_DELETE_SQL_COUNTER(
+                DELETE_FAILED_SQL_COUNTER), TABLE_DELETE_BATCH_FAILED_SIZE(
+                DELETE_BATCH_FAILED_SIZE), TABLE_DELETE_BATCH_FAILED_COUNTER(
+                DELETE_BATCH_FAILED_COUNTER), TABLE_UPSERT_COMMIT_TIME(
+                UPSERT_COMMIT_TIME), TABLE_DELETE_COMMIT_TIME(
+                DELETE_COMMIT_TIME), TABLE_TASK_END_TO_END_TIME(
+                TASK_END_TO_END_TIME), TABLE_COUNT_ROWS_SCANNED(
+                COUNT_ROWS_SCANNED), TABLE_QUERY_FAILED_COUNTER(
+                QUERY_FAILED_COUNTER), TABLE_QUERY_POINTLOOKUP_FAILED_COUNTER(
+                QUERY_POINTLOOKUP_FAILED_COUNTER), TABLE_QUERY_SCAN_FAILED_COUNTER(
+                QUERY_SCAN_FAILED_COUNTER), TABLE_QUERY_TIMEOUT_COUNTER(
+                QUERY_TIMEOUT_COUNTER), TABLE_QUERY_POINTLOOKUP_TIMEOUT_COUNTER(
+                QUERY_POINTLOOKUP_TIMEOUT_COUNTER), TABLE_QUERY_SCAN_TIMEOUT_COUNTER(
+                QUERY_SCAN_TIMEOUT_COUNTER), TABLE_SELECT_QUERY_RESULT_SET_MS(
+                RESULT_SET_TIME_MS), TABLE_SCANBYTES(SCAN_BYTES), TABLE_SELECT_SQL_COUNTER(
+                SELECT_SQL_COUNTER), TABLE_SELECT_SQL_QUERY_TIME(
+                SELECT_SQL_QUERY_TIME), TABLE_SUCCESS_SELECT_SQL_COUNTER(
+                SELECT_SUCCESS_SQL_COUNTER), TABLE_FAILED_SELECT_SQL_COUNTER(
+                SELECT_FAILED_SQL_COUNTER), TABLE_SELECT_POINTLOOKUP_COUNTER_SUCCESS(
+                SELECT_POINTLOOKUP_SUCCESS_SQL_COUNTER), TABLE_SELECT_POINTLOOKUP_COUNTER_FAILED(
+                SELECT_POINTLOOKUP_FAILED_SQL_COUNTER), TABLE_SELECT_SCAN_COUNTER_SUCCESS(
+                SELECT_SCAN_SUCCESS_SQL_COUNTER), TABLE_SELECT_SCAN_COUNTER_FAILED(
+                SELECT_SCAN_FAILED_SQL_COUNTER), TABLE_UPSERT_AGGREGATE_SUCCESS_SQL_COUNTER(
+                UPSERT_AGGREGATE_SUCCESS_SQL_COUNTER), TABLE_UPSERT_AGGREGATE_FAILURE_SQL_COUNTER(
+                UPSERT_AGGREGATE_FAILURE_SQL_COUNTER), TABLE_DELETE_AGGREGATE_SUCCESS_SQL_COUNTER(
+                DELETE_AGGREGATE_SUCCESS_SQL_COUNTER), TABLE_DELETE_AGGREGATE_FAILURE_SQL_COUNTER(
+                DELETE_AGGREGATE_FAILURE_SQL_COUNTER), TABLE_SELECT_AGGREGATE_SUCCESS_SQL_COUNTER(
+                SELECT_AGGREGATE_SUCCESS_SQL_COUNTER), TABLE_SELECT_AGGREGATE_FAILURE_SQL_COUNTER(
+                SELECT_AGGREGATE_FAILURE_SQL_COUNTER);
+
+        private MetricType metricType;
+        private PhoenixTableMetric metric;

Review comment:
       Is this field unused?

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/monitoring/JmxMetricProvider.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import org.apache.hadoop.hbase.metrics.Gauge;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.metrics.MetricRegistries;
+import org.apache.hadoop.hbase.metrics.MetricRegistry;
+import org.apache.hadoop.hbase.metrics.MetricRegistryInfo;
+import org.apache.phoenix.query.QueryServicesOptions;
+
+import java.util.Map;
+
+public class JmxMetricProvider implements MetricPublisherSupplierFactory {

Review comment:
       Add a class-level comment

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/monitoring/TableMetricsManager.java
##########
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.monitoring;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Central place where we keep track of all the Table Level metrics. Register each tableMetrics and
+ * store the instance of it associated with TableName in a map
+ * This class exposes following functions as static methods to help catch all execptions
+ * 1.clearTableLevelMetricsMethod
+ * 2.getTableMetricsMethod
+ * 3.pushMetricsFromConnInstanceMethod
+ * 4.updateMetricsMethod
+ */
+
+public class TableMetricsManager {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(TableMetricsManager.class);
+    private static boolean isTableLevelMetricsEnabled;
+    private static boolean isMetricPublisherEnabled;
+    private static final Set<String> allowedListOfTableNames = new HashSet<>();
+    private static ConcurrentMap<String, TableClientMetrics> tableClientMetricsMapping = null;
+    // Singleton object
+    protected static TableMetricsManager tableMetricsManager = null;
+    private static MetricPublisherSupplierFactory mPublisher = null;
+    private static QueryServicesOptions options = null;
+
+    public TableMetricsManager(QueryServicesOptions ops) {

Review comment:
       since you want this to be a singleton, better to make all constructors private.

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/monitoring/TableMetricsManager.java
##########
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.monitoring;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Central place where we keep track of all the Table Level metrics. Register each tableMetrics and
+ * store the instance of it associated with TableName in a map
+ * This class exposes following functions as static methods to help catch all execptions
+ * 1.clearTableLevelMetricsMethod
+ * 2.getTableMetricsMethod
+ * 3.pushMetricsFromConnInstanceMethod
+ * 4.updateMetricsMethod
+ */
+
+public class TableMetricsManager {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(TableMetricsManager.class);
+    private static boolean isTableLevelMetricsEnabled;
+    private static boolean isMetricPublisherEnabled;
+    private static final Set<String> allowedListOfTableNames = new HashSet<>();
+    private static ConcurrentMap<String, TableClientMetrics> tableClientMetricsMapping = null;
+    // Singleton object
+    protected static TableMetricsManager tableMetricsManager = null;
+    private static MetricPublisherSupplierFactory mPublisher = null;
+    private static QueryServicesOptions options = null;
+
+    public TableMetricsManager(QueryServicesOptions ops) {
+        options = ops;
+        isTableLevelMetricsEnabled = options.isTableLevelMetricsEnabled();
+        LOGGER.info(String.format("Phoenix Table metrics enabled status: %s",
+                isTableLevelMetricsEnabled));
+        tableClientMetricsMapping = new ConcurrentHashMap<>();
+
+        String tableNamesList = options.getAllowedListTableNames();
+        if (tableNamesList != null && !tableNamesList.isEmpty()) {
+            for (String tableName : tableNamesList.split(",")) {
+                allowedListOfTableNames.add(tableName);
+            }
+        }
+        isMetricPublisherEnabled = options.isMetricPublisherEnabled();
+        LOGGER.info(String.format("Phoenix table level metrics publisher enabled status %s",
+                isMetricPublisherEnabled));
+    }
+
+    public TableMetricsManager() {
+    }
+
+    @VisibleForTesting public static void setInstance(TableMetricsManager metricsManager) {
+        tableMetricsManager = metricsManager;
+    }
+
+    /**
+     * Method to provide instance of TableMetricsManager(Create if needed in thread safe manner)
+     *
+     * @return
+     */
+    private static TableMetricsManager getInstance() {
+
+        if (tableMetricsManager == null) {
+            synchronized (TableMetricsManager.class) {
+                if (tableMetricsManager == null) {

Review comment:
       Also, consider using a local variable to point to the volatile field for performance reasons (See https://en.wikipedia.org/wiki/Double-checked_locking#Usage_in_Java for more details)

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/monitoring/TableMetricsManager.java
##########
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.monitoring;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Central place where we keep track of all the Table Level metrics. Register each tableMetrics and
+ * store the instance of it associated with TableName in a map
+ * This class exposes following functions as static methods to help catch all execptions
+ * 1.clearTableLevelMetricsMethod
+ * 2.getTableMetricsMethod
+ * 3.pushMetricsFromConnInstanceMethod
+ * 4.updateMetricsMethod
+ */
+
+public class TableMetricsManager {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(TableMetricsManager.class);
+    private static boolean isTableLevelMetricsEnabled;

Review comment:
       This field and maybe some others can be made `final`

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/monitoring/TableMetricsManager.java
##########
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.monitoring;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Central place where we keep track of all the Table Level metrics. Register each tableMetrics and
+ * store the instance of it associated with TableName in a map
+ * This class exposes following functions as static methods to help catch all execptions
+ * 1.clearTableLevelMetricsMethod
+ * 2.getTableMetricsMethod
+ * 3.pushMetricsFromConnInstanceMethod
+ * 4.updateMetricsMethod
+ */
+
+public class TableMetricsManager {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(TableMetricsManager.class);
+    private static boolean isTableLevelMetricsEnabled;
+    private static boolean isMetricPublisherEnabled;
+    private static final Set<String> allowedListOfTableNames = new HashSet<>();
+    private static ConcurrentMap<String, TableClientMetrics> tableClientMetricsMapping = null;
+    // Singleton object
+    protected static TableMetricsManager tableMetricsManager = null;
+    private static MetricPublisherSupplierFactory mPublisher = null;
+    private static QueryServicesOptions options = null;
+
+    public TableMetricsManager(QueryServicesOptions ops) {
+        options = ops;
+        isTableLevelMetricsEnabled = options.isTableLevelMetricsEnabled();
+        LOGGER.info(String.format("Phoenix Table metrics enabled status: %s",
+                isTableLevelMetricsEnabled));
+        tableClientMetricsMapping = new ConcurrentHashMap<>();
+
+        String tableNamesList = options.getAllowedListTableNames();
+        if (tableNamesList != null && !tableNamesList.isEmpty()) {
+            for (String tableName : tableNamesList.split(",")) {
+                allowedListOfTableNames.add(tableName);
+            }
+        }
+        isMetricPublisherEnabled = options.isMetricPublisherEnabled();
+        LOGGER.info(String.format("Phoenix table level metrics publisher enabled status %s",
+                isMetricPublisherEnabled));
+    }
+
+    public TableMetricsManager() {
+    }
+
+    @VisibleForTesting public static void setInstance(TableMetricsManager metricsManager) {
+        tableMetricsManager = metricsManager;
+    }
+
+    /**
+     * Method to provide instance of TableMetricsManager(Create if needed in thread safe manner)
+     *
+     * @return
+     */
+    private static TableMetricsManager getInstance() {
+
+        if (tableMetricsManager == null) {
+            synchronized (TableMetricsManager.class) {
+                if (tableMetricsManager == null) {
+                    QueryServicesOptions options = QueryServicesOptions.withDefaults();
+                    if (!options.isTableLevelMetricsEnabled()) {
+                        tableMetricsManager = NoOpTableMetricsManager.noOpsTableMetricManager;
+                        return tableMetricsManager;
+                    }
+                    tableMetricsManager = new TableMetricsManager(options);
+                    LOGGER.info("Phoenix Table metrics created object for metrics manager");
+                    if (isMetricPublisherEnabled) {
+                        String className = options.getMetricPublisherClass();
+                        if (className != null) {
+                            MetricServiceResolver mResolver = new MetricServiceResolver();
+                            LOGGER.info(String.format(
+                                    "Phoenix table level metrics publisher className %s",
+                                    className));
+                            try {
+                                mPublisher = mResolver.instantiate(className);
+                                mPublisher.registerMetricProvider();
+                            } catch (Throwable e) {
+                                LOGGER.error("The exception from metric publish Function", e);
+                            }
+
+                        } else {
+                            LOGGER.error(
+                                    "Phoenix table level metrics publisher className cannot be null");
+                        }
+
+                    }
+                }
+            }
+        }
+        return tableMetricsManager;
+    }
+
+    /**
+     * This function is provided as hook to publish the tableLevel Metrics to
+     * LocalStore(tablePhoenixMapping).
+     *
+     * @param map of tableName to pair of (MetricType, Metric Value)
+     */
+    public void pushMetricsFromConnInstance(Map<String, Map<MetricType, Long>> map) {
+
+        if (map == null) {
+            LOGGER.debug("Phoenix table level metrics input map cannott be null");
+            return;
+        }
+
+        long startTime = EnvironmentEdgeManager.currentTime();
+        for (Map.Entry<String, Map<MetricType, Long>> tableEntry : map.entrySet()) {
+            for (Map.Entry<MetricType, Long> metricEntry : tableEntry.getValue().entrySet()) {
+                updateMetrics(tableEntry.getKey(), metricEntry.getKey(), metricEntry.getValue());
+            }
+        }
+
+        LOGGER.debug(String.format(
+                "Phoenix table level metrics completed updating metrics from conn instance, timetaken:\t%d",
+                +EnvironmentEdgeManager.currentTime() - startTime));
+    }
+
+    /**
+     * This function will be used to add individual MetricType to LocalStore.
+     *
+     * @param tableName
+     * @param type
+     * @param value
+     */
+    public void updateMetrics(String tableName, MetricType type, long value) {
+
+        long startTime = EnvironmentEdgeManager.currentTime();
+
+        TableClientMetrics tInstance = getTableClientMetrics(tableName);
+        if (tInstance == null) {
+            LOGGER.debug("Table level client metrics are disabled for table: " + tableName);
+            return;
+        }
+        tInstance.changeMetricValue(type, value);
+
+        LOGGER.debug(String.format("Phoenix table level metrics completed updating metric"
+                        + " %s to value %s, timetaken = %s", type, value,
+                EnvironmentEdgeManager.currentTime() - startTime));
+    }
+
+    /**
+     * Get Table specific metrics object and create if not initialized(thread safe)
+     *
+     * @param tableName
+     * @return TableClientMetrics object
+     */
+    private TableClientMetrics getTableClientMetrics(String tableName) {
+
+        if (Strings.isNullOrEmpty(tableName)) {
+            LOGGER.debug("Phoenix Table metrics TableName cannot be null or empty");
+            return null;
+        }
+
+        if (!allowedListOfTableNames.isEmpty() && !allowedListOfTableNames.contains(tableName)) {
+            return null;
+        }
+
+        TableClientMetrics tInstance;
+        tInstance = tableClientMetricsMapping.get(tableName);
+        if (tInstance == null) {

Review comment:
       similar comments here about correcting the double-checked locking

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/monitoring/JmxMetricProvider.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import org.apache.hadoop.hbase.metrics.Gauge;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.metrics.MetricRegistries;
+import org.apache.hadoop.hbase.metrics.MetricRegistry;
+import org.apache.hadoop.hbase.metrics.MetricRegistryInfo;
+import org.apache.phoenix.query.QueryServicesOptions;
+
+import java.util.Map;
+
+public class JmxMetricProvider implements MetricPublisherSupplierFactory {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(JmxMetricProvider.class);
+    private MetricRegistry metricRegistry;
+
+    @Override public void registerMetricProvider() {
+        metricRegistry = createMetricRegistry();
+        GlobalMetricRegistriesAdapter.getInstance().registerMetricRegistry(metricRegistry,
+                QueryServicesOptions.withDefaults().getClientMetricTag());
+    }
+
+    @Override public void unregisterMetricProvider() {
+
+    }
+
+    private  MetricRegistry createMetricRegistry() {
+        LOGGER.info("Creating Metric Registry for Phoenix Table Level Metrics");
+        MetricRegistryInfo
+                registryInfo =
+                new MetricRegistryInfo("PHOENIX-TableLevel", "Phoenix Client Metrics",
+                        "phoenixTableLevel", "Phoenix,sub=CLIENT", true);
+        return MetricRegistries.global().create(registryInfo);
+    }
+
+    private static class PhoenixMetricGauge implements Gauge<Long> {
+        private final PhoenixTableMetric metric;
+
+        public PhoenixMetricGauge(PhoenixTableMetric metric) {
+            this.metric = metric;
+        }
+
+        @Override public Long getValue() {
+            return metric.getValue();
+        }
+    }
+
+    private String getMetricNameFromMetricType(MetricType type, String tableName) {
+        return tableName + "_table_" + type;
+    }
+
+    @Override public void registerMetrics(TableClientMetrics tInstance) {
+        for (Map.Entry<MetricType, PhoenixTableMetric> entry : tInstance.getMetricRegistry()
+                .entrySet()) {
+            metricRegistry
+                    .register(getMetricNameFromMetricType(entry.getKey(), tInstance.getTableName()),
+                            new PhoenixMetricGauge(entry.getValue()));
+        }
+    }
+
+    @Override public void unRegisterMetrics(TableClientMetrics tInstance) {

Review comment:
       Don't we actually want to deregister the metrics stored for `tInstance` from the `metricRegistry` here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org