You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sh...@apache.org on 2024/03/08 05:57:50 UTC

(phoenix) branch PHOENIX-6883-feature updated: PHOENIX-7251 : Refactor server-side code to support multiple ServerMetadataCache for ITs which create multiple RSs or mini clusters (#1845)

This is an automated email from the ASF dual-hosted git repository.

shahrs87 pushed a commit to branch PHOENIX-6883-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-6883-feature by this push:
     new 7930acadba PHOENIX-7251 : Refactor server-side code to support multiple ServerMetadataCache for ITs which create multiple RSs or mini clusters (#1845)
7930acadba is described below

commit 7930acadbacbc31b3b51b542c92c1121434ae4d0
Author: palash <pa...@gmail.com>
AuthorDate: Thu Mar 7 21:57:45 2024 -0800

    PHOENIX-7251 : Refactor server-side code to support multiple ServerMetadataCache for ITs which create multiple RSs or mini clusters (#1845)
---
 .../apache/phoenix/cache/ServerMetadataCache.java  | 187 ++-------------------
 ...dataCache.java => ServerMetadataCacheImpl.java} |  64 ++-----
 .../coprocessor/PhoenixRegionServerEndpoint.java   |  10 +-
 .../coprocessor/VerifyLastDDLTimestamp.java        |  10 +-
 .../phoenix/end2end/BackwardCompatibilityIT.java   |   2 +-
 .../apache/phoenix/end2end/BasePermissionsIT.java  |   1 +
 .../MigrateSystemTablesToSystemNamespaceIT.java    |   1 +
 .../PartialResultServerConfigurationIT.java        |   1 +
 .../PhoenixRegionServerEndpointTestImpl.java       |  45 +++++
 .../end2end/ServerMetadataCacheTestImpl.java       |  89 ++++++++++
 .../end2end/index/DropIndexDuringUpsertIT.java     |   2 +
 .../end2end/index/IndexAsyncThresholdIT.java       |   2 +
 .../index/ReplicationWithWALAnnotationIT.java      |   9 +-
 .../execute/UpsertSelectOverlappingBatchesIT.java  |   2 +
 .../index/FailForUnsupportedHBaseVersionsIT.java   |   2 +
 .../jdbc/HighAvailabilityTestingUtility.java       |   2 +
 .../monitoring/PhoenixTableLevelMetricsIT.java     |   3 +
 .../ConnectionQueryServicesMetricsIT.java          |   3 +
 .../phoenix/query/MaxConcurrentConnectionsIT.java  |   2 +
 ...taCacheTest.java => ServerMetadataCacheIT.java} |  76 ++++++---
 .../hbase/index/write/TestWALRecoveryCaching.java  |   2 +
 .../java/org/apache/phoenix/query/BaseTest.java    |  13 +-
 22 files changed, 264 insertions(+), 264 deletions(-)

diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/cache/ServerMetadataCache.java b/phoenix-core-client/src/main/java/org/apache/phoenix/cache/ServerMetadataCache.java
index 4e47a1382a..f251dc7123 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/cache/ServerMetadataCache.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/cache/ServerMetadataCache.java
@@ -1,11 +1,10 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -17,177 +16,13 @@
  */
 package org.apache.phoenix.cache;
 
-import java.sql.Connection;
 import java.sql.SQLException;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.coprocessorclient.metrics.MetricsMetadataCachingSource;
-import org.apache.phoenix.coprocessorclient.metrics.MetricsPhoenixCoprocessorSourceFactory;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.phoenix.thirdparty.com.google.common.cache.Cache;
-import org.apache.phoenix.thirdparty.com.google.common.cache.CacheBuilder;
-import org.apache.phoenix.thirdparty.com.google.common.cache.RemovalListener;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.QueryUtil;
-import org.apache.phoenix.util.SchemaUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
 /**
- * This manages the cache for all the objects(data table, views, indexes) on each region server.
- * Currently, it only stores LAST_DDL_TIMESTAMP in the cache.
+ * Interface for server side metadata cache hosted on each region server.
  */
-public class ServerMetadataCache {
-    private static final Logger LOGGER = LoggerFactory.getLogger(ServerMetadataCache.class);
-    private static final String PHOENIX_COPROC_REGIONSERVER_CACHE_TTL_MS =
-            "phoenix.coprocessor.regionserver.cache.ttl.ms";
-    // Keeping default cache expiry for 30 mins since we won't have stale entry
-    // for more than 30 mins.
-    private static final long DEFAULT_PHOENIX_COPROC_REGIONSERVER_CACHE_TTL_MS
-            = 30 * 60 * 1000L; // 30 mins
-    private static final String PHOENIX_COPROC_REGIONSERVER_CACHE_SIZE
-            = "phoenix.coprocessor.regionserver.cache.size";
-    private static final long DEFAULT_PHOENIX_COPROC_REGIONSERVER_CACHE_SIZE = 10000L;
-    private static volatile ServerMetadataCache INSTANCE;
-    private Configuration conf;
-    // key is the combination of <tenantID, schema name, table name>, value is the lastDDLTimestamp
-    private final Cache<ImmutableBytesPtr, Long> lastDDLTimestampMap;
-    private Connection connectionForTesting;
-    private MetricsMetadataCachingSource metricsSource;
-
-    /**
-     * Creates/gets an instance of ServerMetadataCache.
-     * @param conf configuration
-     * @return cache
-     */
-    public static ServerMetadataCache getInstance(Configuration conf) {
-        ServerMetadataCache result = INSTANCE;
-        if (result == null) {
-            synchronized (ServerMetadataCache.class) {
-                result = INSTANCE;
-                if (result == null) {
-                    INSTANCE = result = new ServerMetadataCache(conf);
-                }
-            }
-        }
-        return result;
-    }
-
-    private ServerMetadataCache(Configuration conf) {
-        this.conf = conf;
-        this.metricsSource = MetricsPhoenixCoprocessorSourceFactory
-                                .getInstance().getMetadataCachingSource();
-        long maxTTL = conf.getLong(PHOENIX_COPROC_REGIONSERVER_CACHE_TTL_MS,
-                DEFAULT_PHOENIX_COPROC_REGIONSERVER_CACHE_TTL_MS);
-        long maxSize = conf.getLong(PHOENIX_COPROC_REGIONSERVER_CACHE_SIZE,
-                DEFAULT_PHOENIX_COPROC_REGIONSERVER_CACHE_SIZE);
-        lastDDLTimestampMap = CacheBuilder.newBuilder()
-                .removalListener((RemovalListener<ImmutableBytesPtr, Long>) notification -> {
-                    String key = notification.getKey().toString();
-                    LOGGER.debug("Expiring " + key + " because of "
-                            + notification.getCause().name());
-                })
-                // maximum number of entries this cache can handle.
-                .maximumSize(maxSize)
-                .expireAfterAccess(maxTTL, TimeUnit.MILLISECONDS)
-                .build();
-    }
-
-    /**
-     * Returns the last DDL timestamp from the table.
-     * If not found in cache, then query SYSCAT regionserver.
-     * @param tenantID tenant id
-     * @param schemaName schema name
-     * @param tableName table name
-     * @return last DDL timestamp
-     * @throws Exception
-     */
-    public long getLastDDLTimestampForTable(byte[] tenantID, byte[] schemaName, byte[] tableName)
-            throws SQLException {
-        String fullTableNameStr = SchemaUtil.getTableName(schemaName, tableName);
-        byte[] tableKey = SchemaUtil.getTableKey(tenantID, schemaName, tableName);
-        ImmutableBytesPtr tableKeyPtr = new ImmutableBytesPtr(tableKey);
-        // Lookup in cache if present.
-        Long lastDDLTimestamp = lastDDLTimestampMap.getIfPresent(tableKeyPtr);
-        if (lastDDLTimestamp != null) {
-            metricsSource.incrementRegionServerMetadataCacheHitCount();
-            LOGGER.trace("Retrieving last ddl timestamp value from cache for tableName: {}",
-                    fullTableNameStr);
-            return lastDDLTimestamp;
-        }
-        metricsSource.incrementRegionServerMetadataCacheMissCount();
-        PTable table;
-        String tenantIDStr = Bytes.toString(tenantID);
-        if (tenantIDStr == null || tenantIDStr.isEmpty()) {
-            tenantIDStr = null;
-        }
-        Properties properties = new Properties();
-        if (tenantIDStr != null) {
-            properties.setProperty(TENANT_ID_ATTRIB, tenantIDStr);
-        }
-        try (Connection connection = getConnection(properties)) {
-            // Using PhoenixRuntime#getTableNoCache since se don't want to read cached value.
-            table = PhoenixRuntime.getTableNoCache(connection, fullTableNameStr);
-            // TODO PhoenixRuntime#getTableNoCache can throw TableNotFoundException.
-            //  In that case, do we want to throw non retryable exception back to the client?
-            // Update cache with the latest DDL timestamp from SYSCAT server.
-            lastDDLTimestampMap.put(tableKeyPtr, table.getLastDDLTimestamp());
-        }
-        return table.getLastDDLTimestamp();
-    }
-
-    /**
-     * Invalidate cache for the given tenantID, schema name and table name.
-     * Guava cache is thread safe so we don't have to synchronize it explicitly.
-     * @param tenantID tenantID
-     * @param schemaName schemaName
-     * @param tableName tableName
-     */
-    public void invalidate(byte[] tenantID, byte[] schemaName, byte[] tableName) {
-        String fullTableNameStr = SchemaUtil.getTableName(schemaName, tableName);
-        LOGGER.debug("Invalidating server metadata cache for tenantID: {}, full table: {}",
-                Bytes.toString(tenantID), fullTableNameStr);
-        byte[] tableKey = SchemaUtil.getTableKey(tenantID, schemaName, tableName);
-        ImmutableBytesPtr tableKeyPtr = new ImmutableBytesPtr(tableKey);
-        lastDDLTimestampMap.invalidate(tableKeyPtr);
-    }
-
-    /**
-     * This should be used only in the tests. DO NOT use this in production code.
-     */
-    @VisibleForTesting
-    public Long getLastDDLTimestampForTableFromCacheOnly(byte[] tenantID, byte[] schemaName,
-                                                         byte[] tableName) {
-        byte[] tableKey = SchemaUtil.getTableKey(tenantID, schemaName, tableName);
-        ImmutableBytesPtr tableKeyPtr = new ImmutableBytesPtr(tableKey);
-        return lastDDLTimestampMap.getIfPresent(tableKeyPtr);
-    }
-
-    // This is used by tests to override specific connection to use.
-    private Connection getConnection(Properties properties) throws SQLException {
-        return connectionForTesting != null ? connectionForTesting
-                : QueryUtil.getConnectionOnServer(properties, this.conf);
-    }
-
-    @VisibleForTesting
-    public void setConnectionForTesting(Connection connection) {
-        this.connectionForTesting = connection;
-    }
-
-    @VisibleForTesting
-    public static  void resetCache() {
-        LOGGER.info("Resetting ServerMetadataCache");
-        INSTANCE = null;
-    }
-
-    @VisibleForTesting
-    public static void setInstance(ServerMetadataCache cache) {
-        INSTANCE = cache;
-    }
+public interface ServerMetadataCache {
+    long getLastDDLTimestampForTable(byte[] tenantID, byte[] schemaName, byte[] tableName)
+            throws SQLException;
+    void invalidate(byte[] tenantID, byte[] schemaName, byte[] tableName);
 }
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/cache/ServerMetadataCache.java b/phoenix-core-client/src/main/java/org/apache/phoenix/cache/ServerMetadataCacheImpl.java
similarity index 78%
copy from phoenix-core-client/src/main/java/org/apache/phoenix/cache/ServerMetadataCache.java
copy to phoenix-core-client/src/main/java/org/apache/phoenix/cache/ServerMetadataCacheImpl.java
index 4e47a1382a..bd2946992e 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/cache/ServerMetadataCache.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/cache/ServerMetadataCacheImpl.java
@@ -23,12 +23,12 @@ import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.coprocessorclient.metrics.MetricsMetadataCachingSource;
 import org.apache.phoenix.coprocessorclient.metrics.MetricsPhoenixCoprocessorSourceFactory;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.phoenix.thirdparty.com.google.common.cache.Cache;
 import org.apache.phoenix.thirdparty.com.google.common.cache.CacheBuilder;
 import org.apache.phoenix.thirdparty.com.google.common.cache.RemovalListener;
@@ -43,8 +43,12 @@ import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
  * This manages the cache for all the objects(data table, views, indexes) on each region server.
  * Currently, it only stores LAST_DDL_TIMESTAMP in the cache.
  */
-public class ServerMetadataCache {
-    private static final Logger LOGGER = LoggerFactory.getLogger(ServerMetadataCache.class);
+public class ServerMetadataCacheImpl implements ServerMetadataCache {
+
+    protected Configuration conf;
+    // key is the combination of <tenantID, schema name, table name>, value is the lastDDLTimestamp
+    protected final Cache<ImmutableBytesPtr, Long> lastDDLTimestampMap;
+    private static final Logger LOGGER = LoggerFactory.getLogger(ServerMetadataCacheImpl.class);
     private static final String PHOENIX_COPROC_REGIONSERVER_CACHE_TTL_MS =
             "phoenix.coprocessor.regionserver.cache.ttl.ms";
     // Keeping default cache expiry for 30 mins since we won't have stale entry
@@ -54,33 +58,30 @@ public class ServerMetadataCache {
     private static final String PHOENIX_COPROC_REGIONSERVER_CACHE_SIZE
             = "phoenix.coprocessor.regionserver.cache.size";
     private static final long DEFAULT_PHOENIX_COPROC_REGIONSERVER_CACHE_SIZE = 10000L;
-    private static volatile ServerMetadataCache INSTANCE;
-    private Configuration conf;
-    // key is the combination of <tenantID, schema name, table name>, value is the lastDDLTimestamp
-    private final Cache<ImmutableBytesPtr, Long> lastDDLTimestampMap;
-    private Connection connectionForTesting;
+    private static volatile ServerMetadataCacheImpl cacheInstance;
     private MetricsMetadataCachingSource metricsSource;
 
     /**
      * Creates/gets an instance of ServerMetadataCache.
+     *
      * @param conf configuration
      * @return cache
      */
-    public static ServerMetadataCache getInstance(Configuration conf) {
-        ServerMetadataCache result = INSTANCE;
+    public static ServerMetadataCacheImpl getInstance(Configuration conf) {
+        ServerMetadataCacheImpl result = cacheInstance;
         if (result == null) {
-            synchronized (ServerMetadataCache.class) {
-                result = INSTANCE;
+            synchronized (ServerMetadataCacheImpl.class) {
+                result = cacheInstance;
                 if (result == null) {
-                    INSTANCE = result = new ServerMetadataCache(conf);
+                    cacheInstance = result = new ServerMetadataCacheImpl(conf);
                 }
             }
         }
         return result;
     }
 
-    private ServerMetadataCache(Configuration conf) {
-        this.conf = conf;
+    public ServerMetadataCacheImpl(Configuration conf) {
+        this.conf = HBaseConfiguration.create(conf);
         this.metricsSource = MetricsPhoenixCoprocessorSourceFactory
                                 .getInstance().getMetadataCachingSource();
         long maxTTL = conf.getLong(PHOENIX_COPROC_REGIONSERVER_CACHE_TTL_MS,
@@ -158,36 +159,7 @@ public class ServerMetadataCache {
         lastDDLTimestampMap.invalidate(tableKeyPtr);
     }
 
-    /**
-     * This should be used only in the tests. DO NOT use this in production code.
-     */
-    @VisibleForTesting
-    public Long getLastDDLTimestampForTableFromCacheOnly(byte[] tenantID, byte[] schemaName,
-                                                         byte[] tableName) {
-        byte[] tableKey = SchemaUtil.getTableKey(tenantID, schemaName, tableName);
-        ImmutableBytesPtr tableKeyPtr = new ImmutableBytesPtr(tableKey);
-        return lastDDLTimestampMap.getIfPresent(tableKeyPtr);
-    }
-
-    // This is used by tests to override specific connection to use.
-    private Connection getConnection(Properties properties) throws SQLException {
-        return connectionForTesting != null ? connectionForTesting
-                : QueryUtil.getConnectionOnServer(properties, this.conf);
-    }
-
-    @VisibleForTesting
-    public void setConnectionForTesting(Connection connection) {
-        this.connectionForTesting = connection;
-    }
-
-    @VisibleForTesting
-    public static  void resetCache() {
-        LOGGER.info("Resetting ServerMetadataCache");
-        INSTANCE = null;
-    }
-
-    @VisibleForTesting
-    public static void setInstance(ServerMetadataCache cache) {
-        INSTANCE = cache;
+    protected Connection getConnection(Properties properties) throws SQLException {
+        return QueryUtil.getConnectionOnServer(properties, this.conf);
     }
 }
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
index d95ca1f9f0..3531e2f8a4 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.cache.ServerMetadataCache;
+import org.apache.phoenix.cache.ServerMetadataCacheImpl;
 import org.apache.phoenix.coprocessor.generated.RegionServerEndpointProtos;
 import org.apache.phoenix.coprocessorclient.metrics.MetricsMetadataCachingSource;
 import org.apache.phoenix.coprocessorclient.metrics.MetricsPhoenixCoprocessorSourceFactory;
@@ -60,6 +61,7 @@ public class PhoenixRegionServerEndpoint
             RegionServerEndpointProtos.ValidateLastDDLTimestampRequest request,
             RpcCallback<RegionServerEndpointProtos.ValidateLastDDLTimestampResponse> done) {
         metricsSource.incrementValidateTimestampRequestCount();
+        ServerMetadataCache cache = getServerMetadataCache();
         for (RegionServerEndpointProtos.LastDDLTimestampRequest lastDDLTimestampRequest
                 : request.getLastDDLTimestampRequestsList()) {
             byte[] tenantID = lastDDLTimestampRequest.getTenantId().toByteArray();
@@ -71,7 +73,7 @@ public class PhoenixRegionServerEndpoint
             try {
                 LOGGER.debug("Verifying last ddl timestamp for tenantID: {}, tableName: {}",
                         tenantIDStr, fullTableName);
-                VerifyLastDDLTimestamp.verifyLastDDLTimestamp(this.conf, tenantID, schemaName,
+                VerifyLastDDLTimestamp.verifyLastDDLTimestamp(cache, tenantID, schemaName,
                         tableName, clientLastDDLTimestamp);
             } catch (Throwable t) {
                 String errorMsg = String.format("Verifying last ddl timestamp FAILED for "
@@ -101,7 +103,7 @@ public class PhoenixRegionServerEndpoint
             String tenantIDStr = Bytes.toString(tenantID);
             LOGGER.info("PhoenixRegionServerEndpoint invalidating the cache for tenantID: {},"
                     + " tableName: {}", tenantIDStr, fullTableName);
-            ServerMetadataCache cache = ServerMetadataCache.getInstance(conf);
+            ServerMetadataCache cache = getServerMetadataCache();
             cache.invalidate(tenantID, schemaName, tableName);
         }
     }
@@ -110,4 +112,8 @@ public class PhoenixRegionServerEndpoint
     public Iterable<Service> getServices() {
         return Collections.singletonList(this);
     }
+
+    public ServerMetadataCache getServerMetadataCache() {
+        return ServerMetadataCacheImpl.getInstance(conf);
+    }
 }
\ No newline at end of file
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/VerifyLastDDLTimestamp.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/VerifyLastDDLTimestamp.java
index 2f1554279e..4c5bf09704 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/VerifyLastDDLTimestamp.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/VerifyLastDDLTimestamp.java
@@ -17,10 +17,8 @@
  */
 package org.apache.phoenix.coprocessor;
 
-import java.io.IOException;
 import java.sql.SQLException;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.cache.ServerMetadataCache;
 import org.apache.phoenix.exception.StaleMetadataCacheException;
@@ -49,13 +47,13 @@ public class VerifyLastDDLTimestamp {
      * @param schemaName             schema name
      * @param tableName              table name
      * @param clientLastDDLTimestamp last ddl timestamp provided by client
-     * @param conf                   configuration
+     * @param cache                  ServerMetadataCache
      * @throws SQLException         StaleMetadataCacheException if client provided timestamp
      *                              is stale.
      */
-    public static void verifyLastDDLTimestamp(Configuration conf, byte[] tenantID,
-            byte[] schemaName, byte[] tableName, long clientLastDDLTimestamp) throws SQLException {
-        ServerMetadataCache cache = ServerMetadataCache.getInstance(conf);
+    public static void verifyLastDDLTimestamp(ServerMetadataCache cache, byte[] tenantID,
+                              byte[] schemaName, byte[] tableName, long clientLastDDLTimestamp)
+            throws SQLException {
         long lastDDLTimestamp = cache.getLastDDLTimestampForTable(tenantID, schemaName, tableName);
         // Is it possible to have client last ddl timestamp greater than server side?
         if (clientLastDDLTimestamp < lastDDLTimestamp) {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityIT.java
index 9e94f4eb1c..9b258cead2 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityIT.java
@@ -137,7 +137,7 @@ public class BackwardCompatibilityIT {
             DriverManager.deregisterDriver(PhoenixDriver.INSTANCE);
         } finally {
             hbaseTestUtil.shutdownMiniCluster();
-            ServerMetadataCache.resetCache();
+            ServerMetadataCacheTestImpl.resetCache();
         }
         System.setProperty("java.io.tmpdir", tmpDir);
         assertFalse("refCount leaked", refCountLeaked);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java
index 397c0b4992..363f672537 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java
@@ -167,6 +167,7 @@ public abstract class BasePermissionsIT extends BaseTest {
 
     static void initCluster(boolean isNamespaceMapped, boolean useCustomAccessController) throws Exception {
         if (null != testUtil) {
+            ServerMetadataCacheTestImpl.resetCache();
             testUtil.shutdownMiniCluster();
             testUtil = null;
         }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MigrateSystemTablesToSystemNamespaceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MigrateSystemTablesToSystemNamespaceIT.java
index b75d0a3258..29fc6c526a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MigrateSystemTablesToSystemNamespaceIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MigrateSystemTablesToSystemNamespaceIT.java
@@ -98,6 +98,7 @@ public class MigrateSystemTablesToSystemNamespaceIT extends BaseTest {
         try {
             if (testUtil != null) {
                 boolean refCountLeaked = isAnyStoreRefCountLeaked();
+                ServerMetadataCacheTestImpl.resetCache();
                 testUtil.shutdownMiniCluster();
                 testUtil = null;
                 assertFalse("refCount leaked", refCountLeaked);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialResultServerConfigurationIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialResultServerConfigurationIT.java
index d529f6ebfd..b636f56fa3 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialResultServerConfigurationIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialResultServerConfigurationIT.java
@@ -84,6 +84,7 @@ public class PartialResultServerConfigurationIT {
         try {
             DriverManager.deregisterDriver(PhoenixDriver.INSTANCE);
         } finally {
+            ServerMetadataCacheTestImpl.resetCache();
             hbaseTestUtil.shutdownMiniCluster();
         }
     }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointTestImpl.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointTestImpl.java
new file mode 100644
index 0000000000..c7b6a9414a
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointTestImpl.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
+import org.apache.phoenix.cache.ServerMetadataCache;
+import org.apache.phoenix.coprocessor.PhoenixRegionServerEndpoint;
+
+import java.io.IOException;
+
+/**
+ * PhoenixRegionServerEndpoint for integration tests.
+ * Uses {@link ServerMetadataCacheTestImpl} to support keeping multiple cache instances.
+ */
+public class PhoenixRegionServerEndpointTestImpl extends PhoenixRegionServerEndpoint {
+    protected ServerName serverName;
+
+    @Override
+    public void start(CoprocessorEnvironment env) throws IOException {
+        super.start(env);
+        this.serverName = ((RegionServerCoprocessorEnvironment)env).getServerName();
+    }
+
+    @Override
+    public ServerMetadataCache getServerMetadataCache() {
+        return ServerMetadataCacheTestImpl.getInstance(conf, serverName);
+    }
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerMetadataCacheTestImpl.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerMetadataCacheTestImpl.java
new file mode 100644
index 0000000000..c919f76db6
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerMetadataCacheTestImpl.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.phoenix.cache.ServerMetadataCacheImpl;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Implementation of {@link ServerMetadataCache} for Integration Tests.
+ * Supports keeping more than one instanceof the cache keyed on the regionserver ServerName.
+ *
+ * PhoenixRegionServerEndpoint is a region server coproc. There is a 1-1 correspondence between
+ * PhoenixRegionServerEndpoint and ServerMetadataCache. In ITs we can have multiple regionservers
+ * per cluster so we need multiple instances of ServerMetadataCache in the same jvm. Tests using
+ * HighAvailabilityTestingUtility create 2 clusters so we need to have one instance of
+ * ServerMetadataCache for each regionserver in each cluster.
+ */
+public class ServerMetadataCacheTestImpl extends ServerMetadataCacheImpl {
+    private static volatile Map<ServerName, ServerMetadataCacheTestImpl> INSTANCES = new HashMap<>();
+    private Connection connectionForTesting;
+
+    ServerMetadataCacheTestImpl(Configuration conf) {
+        super(conf);
+    }
+
+    public static ServerMetadataCacheTestImpl getInstance(Configuration conf, ServerName serverName) {
+        ServerMetadataCacheTestImpl result = INSTANCES.get(serverName);
+        if (result == null) {
+            synchronized (ServerMetadataCacheTestImpl.class) {
+                result = INSTANCES.get(serverName);
+                if (result == null) {
+                    result = new ServerMetadataCacheTestImpl(conf);
+                    INSTANCES.put(serverName, result);
+                }
+            }
+        }
+        return result;
+    }
+
+    public static void setInstance(ServerName serverName, ServerMetadataCacheTestImpl cache) {
+        INSTANCES.put(serverName, cache);
+    }
+
+    public Long getLastDDLTimestampForTableFromCacheOnly(byte[] tenantID, byte[] schemaName,
+                                                         byte[] tableName) {
+        byte[] tableKey = SchemaUtil.getTableKey(tenantID, schemaName, tableName);
+        ImmutableBytesPtr tableKeyPtr = new ImmutableBytesPtr(tableKey);
+        return lastDDLTimestampMap.getIfPresent(tableKeyPtr);
+    }
+
+    public void setConnectionForTesting(Connection connection) {
+        this.connectionForTesting = connection;
+    }
+
+    public static void resetCache() {
+        INSTANCES.clear();
+    }
+
+    @Override
+    protected Connection getConnection(Properties properties) throws SQLException {
+        return connectionForTesting != null ? connectionForTesting
+                : super.getConnection(properties);
+    }
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropIndexDuringUpsertIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropIndexDuringUpsertIT.java
index c1527dd3f3..5e88f9bbac 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropIndexDuringUpsertIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropIndexDuringUpsertIT.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl;
 import org.apache.phoenix.jdbc.PhoenixTestDriver;
 import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.query.QueryServices;
@@ -101,6 +102,7 @@ public abstract class DropIndexDuringUpsertIT extends BaseTest {
             service.shutdownNow();
             destroyDriver(driver);
         } finally {
+            ServerMetadataCacheTestImpl.resetCache();
             util.shutdownMiniCluster();
         }
     }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexAsyncThresholdIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexAsyncThresholdIT.java
index 51f77fae71..aaf58d2e65 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexAsyncThresholdIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexAsyncThresholdIT.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.end2end.index;
 
+import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -123,6 +124,7 @@ public class IndexAsyncThresholdIT extends BaseTest {
         } catch (Throwable t) {
             logger.error("Exception caught when shutting down mini cluster", t);
         } finally {
+            ServerMetadataCacheTestImpl.resetCache();
             ConnectionFactory.shutdown();
         }
         assertFalse("refCount leaked", refCountLeaked);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReplicationWithWALAnnotationIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReplicationWithWALAnnotationIT.java
index ee4e387d4b..de24e4c385 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReplicationWithWALAnnotationIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReplicationWithWALAnnotationIT.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.phoenix.coprocessor.ReplicationSinkEndpoint;
 import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.hbase.index.IndexRegionObserver;
 import org.apache.phoenix.query.BaseTest;
@@ -126,8 +127,12 @@ public class ReplicationWithWALAnnotationIT extends BaseTest {
 
     @AfterClass
     public static void afterClass() throws Exception {
-        utility1.shutdownMiniCluster();
-        utility2.shutdownMiniCluster();
+        try {
+            utility1.shutdownMiniCluster();
+            utility2.shutdownMiniCluster();
+        } finally {
+            ServerMetadataCacheTestImpl.resetCache();
+        }
     }
 
     private static void setupConfigsAndStartCluster() throws Exception {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
index 4994a4b7d6..66c48566bb 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl;
 import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
 import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.query.QueryServices;
@@ -88,6 +89,7 @@ public class UpsertSelectOverlappingBatchesIT extends BaseTest {
     @AfterClass
     public static synchronized void tearDownClass() throws Exception {
         SlowBatchRegionObserver.SLOW_MUTATE = false;
+        ServerMetadataCacheTestImpl.resetCache();
         getUtility().shutdownMiniCluster();
     }
 
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/FailForUnsupportedHBaseVersionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/FailForUnsupportedHBaseVersionsIT.java
index ab66e03955..9ea6771fed 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/FailForUnsupportedHBaseVersionsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/FailForUnsupportedHBaseVersionsIT.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl;
 import org.apache.phoenix.hbase.index.covered.ColumnGroup;
 import org.apache.phoenix.hbase.index.covered.CoveredColumn;
 import org.apache.phoenix.hbase.index.covered.CoveredColumnIndexSpecifierBuilder;
@@ -160,6 +161,7 @@ public class FailForUnsupportedHBaseVersionsIT {
 
         } finally {
             // cleanup
+            ServerMetadataCacheTestImpl.resetCache();
             util.shutdownMiniCluster();
         }
     }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java
index 1cb6c0ba79..bc67e4272a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java
@@ -18,6 +18,7 @@
 package org.apache.phoenix.jdbc;
 
 import org.apache.hadoop.hbase.StartMiniClusterOption;
+import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl;
 import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
 import org.apache.commons.lang3.RandomUtils;
@@ -472,6 +473,7 @@ public class HighAvailabilityTestingUtility {
             admin1.close();
             admin2.close();
             try {
+                ServerMetadataCacheTestImpl.resetCache();
                 hbaseCluster1.shutdownMiniCluster();
                 hbaseCluster2.shutdownMiniCluster();
             } catch (Exception e) {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java
index e070075a46..e77f292dca 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
 import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
 import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl;
 import org.apache.phoenix.exception.PhoenixIOException;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.CommitException;
@@ -189,6 +190,8 @@ public class PhoenixTableLevelMetricsIT extends BaseTest {
             }
         } catch (Exception e) {
             // ignore
+        } finally {
+            ServerMetadataCacheTestImpl.resetCache();
         }
     }
 
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsIT.java
index 454662b28a..0b3b6236a0 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsIT.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl;
 import org.apache.phoenix.jdbc.ConnectionInfo;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDriver;
@@ -127,6 +128,8 @@ public class ConnectionQueryServicesMetricsIT extends BaseTest {
             }
         } catch (Exception e) {
             // ignore
+        } finally {
+            ServerMetadataCacheTestImpl.resetCache();
         }
     }
 
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/query/MaxConcurrentConnectionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/query/MaxConcurrentConnectionsIT.java
index 5adeeb18e3..92bf8aefaa 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/query/MaxConcurrentConnectionsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/query/MaxConcurrentConnectionsIT.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.query;
 
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDriver;
 import org.apache.phoenix.util.DelayedRegionServer;
@@ -73,6 +74,7 @@ public class MaxConcurrentConnectionsIT extends BaseTest {
     //Have to shutdown our special delayed region server
     @AfterClass
     public static void tearDown() throws Exception {
+        ServerMetadataCacheTestImpl.resetCache();
         hbaseTestUtil.shutdownMiniCluster();
     }
 
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java b/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheIT.java
similarity index 96%
rename from phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java
rename to phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheIT.java
index 963b97cc00..590ca7774a 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheIT.java
@@ -17,11 +17,15 @@
  */
 package org.apache.phoenix.cache;
 
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.coprocessorclient.metrics.MetricsMetadataCachingSource;
 import org.apache.phoenix.coprocessorclient.metrics.MetricsPhoenixCoprocessorSourceFactory;
 import org.apache.phoenix.end2end.IndexToolIT;
 import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.end2end.PhoenixRegionServerEndpointTestImpl;
+import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixResultSet;
@@ -61,6 +65,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
 
+import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY;
 import static org.apache.phoenix.query.ConnectionQueryServicesImpl.INVALIDATE_SERVER_METADATA_CACHE_EX_MESSAGE;
 import static org.apache.phoenix.query.QueryServices.PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
@@ -75,12 +80,15 @@ import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+// End to end tests for metadata caching re-design.
 @Category(ParallelStatsDisabledIT.class)
-public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
+public class ServerMetadataCacheIT extends ParallelStatsDisabledIT {
 
     private final Random RANDOM = new Random(42);
     private final long NEVER = (long) ConnectionProperty.UPDATE_CACHE_FREQUENCY.getValue("NEVER");
 
+    private static ServerName serverName;
+
     @BeforeClass
     public static synchronized void doSetup() throws Exception {
         Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
@@ -91,6 +99,8 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
         props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB,
                 Long.toString(Long.MAX_VALUE));
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+        assertEquals(1, getUtility().getHBaseCluster().getNumLiveRegionServers());
+        serverName = getUtility().getHBaseCluster().getRegionServer(0).getServerName();
     }
 
     @Before
@@ -100,7 +110,23 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
 
     @After
     public void resetMetadataCache() {
-        ServerMetadataCache.resetCache();
+        ServerMetadataCacheTestImpl.resetCache();
+    }
+
+    /**
+     * Get the server metadata cache instance from the endpoint loaded on the region server.
+     */
+    private ServerMetadataCacheTestImpl getServerMetadataCache() {
+        String phoenixRegionServerEndpoint = config.get(REGIONSERVER_COPROCESSOR_CONF_KEY);
+        assertNotNull(phoenixRegionServerEndpoint);
+        RegionServerCoprocessor coproc = getUtility().getHBaseCluster()
+                .getRegionServer(0)
+                .getRegionServerCoprocessorHost()
+                .findCoprocessor(phoenixRegionServerEndpoint);
+        assertNotNull(coproc);
+        ServerMetadataCache cache = ((PhoenixRegionServerEndpointTestImpl)coproc).getServerMetadataCache();
+        assertNotNull(cache);
+        return (ServerMetadataCacheTestImpl)cache;
     }
 
     /**
@@ -121,7 +147,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
             createTable(conn, tableNameStr, NEVER);
             pTable = PhoenixRuntime.getTableNoCache(conn,
                     tableNameStr);// --> First call to CQSI#getTable
-            ServerMetadataCache cache = ServerMetadataCache.getInstance(config);
+            ServerMetadataCacheTestImpl cache = getServerMetadataCache();
             // Override the connection to use in ServerMetadataCache
             cache.setConnectionForTesting(conn);
             byte[] tableName = Bytes.toBytes(tableNameStr);
@@ -162,7 +188,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
             String viewNameStr = generateUniqueName();
             createViewWhereClause(conn, tableNameStr, viewNameStr, whereClause);
             viewTable = PhoenixRuntime.getTableNoCache(conn, viewNameStr);  // --> First call to CQSI#getTable
-            ServerMetadataCache cache = ServerMetadataCache.getInstance(config);
+            ServerMetadataCacheTestImpl cache = getServerMetadataCache();;
             // Override the connection to use in ServerMetadataCache
             cache.setConnectionForTesting(conn);
 
@@ -209,7 +235,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
             createViewWhereClause(conn, tableNameStr, tenantViewNameStr, whereClause);
             tenantViewTable = PhoenixRuntime.getTableNoCache(conn,
                     tenantViewNameStr);  // --> First call to CQSI#getTable
-            ServerMetadataCache cache = ServerMetadataCache.getInstance(config);
+            ServerMetadataCacheTestImpl cache = getServerMetadataCache();;
             // Override the connection to use in ServerMetadataCache
             cache.setConnectionForTesting(conn);
             byte[] tenantIDBytes = Bytes.toBytes(tenantId);
@@ -246,7 +272,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
             // Create a test table.
             createTable(conn, tableNameStr, NEVER);
             pTable = PhoenixRuntime.getTableNoCache(conn, tableNameStr);
-            ServerMetadataCache cache = ServerMetadataCache.getInstance(config);
+            ServerMetadataCacheTestImpl cache = getServerMetadataCache();;
             // Override the connection to use in ServerMetadataCache
             cache.setConnectionForTesting(conn);
             byte[] tableName = Bytes.toBytes(tableNameStr);
@@ -276,7 +302,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
             // Create a test table.
             createTable(conn, fullTableName, NEVER);
             pTable = PhoenixRuntime.getTableNoCache(conn, fullTableName);
-            ServerMetadataCache cache = ServerMetadataCache.getInstance(config);
+            ServerMetadataCacheTestImpl cache = getServerMetadataCache();;
             // Override the connection to use in ServerMetadataCache
             cache.setConnectionForTesting(conn);
             byte[] tableNameBytes = Bytes.toBytes(fullTableName);
@@ -313,7 +339,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
         try (Connection conn = DriverManager.getConnection(getUrl(), tenantProps)) {
             createViewWhereClause(conn, tableNameStr, tenantViewNameStr, whereClause);
             tenantViewTable = PhoenixRuntime.getTableNoCache(conn, tenantViewNameStr);
-            ServerMetadataCache cache = ServerMetadataCache.getInstance(config);
+            ServerMetadataCacheTestImpl cache = getServerMetadataCache();;
             // Override the connection to use in ServerMetadataCache
             cache.setConnectionForTesting(conn);
             byte[] tenantIDBytes = Bytes.toBytes(tenantId);
@@ -340,7 +366,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
         String tableNameStr =  generateUniqueName();
         byte[] tableNameBytes = Bytes.toBytes(tableNameStr);
         PTable pTable;
-        ServerMetadataCache cache = ServerMetadataCache.getInstance(config);
+        ServerMetadataCacheTestImpl cache = getServerMetadataCache();;
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
             conn.setAutoCommit(false);
             // Create a test table.
@@ -377,7 +403,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
         String tableNameStr =  generateUniqueName();
         byte[] tableNameBytes = Bytes.toBytes(tableNameStr);
         PTable pTable;
-        ServerMetadataCache cache = ServerMetadataCache.getInstance(config);
+        ServerMetadataCacheTestImpl cache = getServerMetadataCache();;
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
             conn.setAutoCommit(false);
             // Create a test table.
@@ -411,7 +437,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
         String indexNameStr = "IND_" + generateUniqueName();
         byte[] indexNameBytes = Bytes.toBytes(indexNameStr);
         PTable indexTable;
-        ServerMetadataCache cache = ServerMetadataCache.getInstance(config);
+        ServerMetadataCacheTestImpl cache = getServerMetadataCache();;
         try (Connection conn = DriverManager.getConnection(url, props)) {
             conn.setAutoCommit(false);
             // Create a test table.
@@ -457,7 +483,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
         byte[] tableNameBytes = Bytes.toBytes(tableName);
         String indexName = generateUniqueName();
         byte[] indexNameBytes = Bytes.toBytes(indexName);
-        ServerMetadataCache cache = ServerMetadataCache.getInstance(config);
+        ServerMetadataCacheTestImpl cache = getServerMetadataCache();;
         try (Connection conn = DriverManager.getConnection(getUrl())) {
             conn.setAutoCommit(true);
             createTable(conn, tableName, NEVER);
@@ -507,7 +533,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
         String globalViewIndexName = "GV_IDX_" + generateUniqueName();
         byte[] globalViewIndexNameBytes = Bytes.toBytes(globalViewIndexName);
 
-        ServerMetadataCache cache = ServerMetadataCache.getInstance(config);
+        ServerMetadataCacheTestImpl cache = getServerMetadataCache();;
         try(Connection conn = DriverManager.getConnection(getUrl());
             Statement stmt = conn.createStatement()) {
             String whereClause = " WHERE v1 < 1000";
@@ -614,7 +640,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
         String tableName = generateUniqueName();
         ConnectionQueryServices spyCqs1 = Mockito.spy(driver.getConnectionQueryServices(url1, props));
         ConnectionQueryServices spyCqs2 = Mockito.spy(driver.getConnectionQueryServices(url2, props));
-        ServerMetadataCache cache = null;
+        ServerMetadataCacheTestImpl cache = null;
 
         try (Connection conn1 = spyCqs1.connect(url1, props);
              Connection conn2 = spyCqs2.connect(url2, props)) {
@@ -624,11 +650,11 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
             upsert(conn1, tableName, true);
 
             // Instrument ServerMetadataCache to throw a SQLException once
-            cache = ServerMetadataCache.getInstance(config);
-            ServerMetadataCache spyCache = Mockito.spy(cache);
+            cache = getServerMetadataCache();;
+            ServerMetadataCacheTestImpl spyCache = Mockito.spy(cache);
             Mockito.doThrow(new SQLException("FAIL")).doCallRealMethod().when(spyCache)
                     .getLastDDLTimestampForTable(any(), any(), eq(Bytes.toBytes(tableName)));
-            ServerMetadataCache.setInstance(spyCache);
+            ServerMetadataCacheTestImpl.setInstance(serverName, spyCache);
 
             // query using client-2 should succeed
             query(conn2, tableName);
@@ -652,7 +678,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
         ConnectionQueryServices spyCqs1 = Mockito.spy(driver.getConnectionQueryServices(url1, props));
         ConnectionQueryServices spyCqs2 = Mockito.spy(driver.getConnectionQueryServices(url2, props));
         int expectedNumCacheUpdates;
-        ServerMetadataCache cache = null;
+        ServerMetadataCacheTestImpl cache = null;
 
         try (Connection conn1 = spyCqs1.connect(url1, props);
              Connection conn2 = spyCqs2.connect(url2, props)) {
@@ -674,11 +700,11 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
             Mockito.reset(spyCqs2);
 
             // Instrument ServerMetadataCache to throw a SQLException once
-            cache = ServerMetadataCache.getInstance(config);
-            ServerMetadataCache spyCache = Mockito.spy(cache);
+            cache = getServerMetadataCache();;
+            ServerMetadataCacheTestImpl spyCache = Mockito.spy(cache);
             Mockito.doThrow(new SQLException("FAIL")).doCallRealMethod().when(spyCache)
                     .getLastDDLTimestampForTable(any(), any(), eq(Bytes.toBytes(tableName)));
-            ServerMetadataCache.setInstance(spyCache);
+            ServerMetadataCacheTestImpl.setInstance(serverName, spyCache);
 
             // query using client-2 should succeed, one cache update
             query(conn2, tableName);
@@ -702,7 +728,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
         String tableName = generateUniqueName();
         ConnectionQueryServices spyCqs1 = Mockito.spy(driver.getConnectionQueryServices(url1, props));
         ConnectionQueryServices spyCqs2 = Mockito.spy(driver.getConnectionQueryServices(url2, props));
-        ServerMetadataCache cache = null;
+        ServerMetadataCacheTestImpl cache = null;
 
         try (Connection conn1 = spyCqs1.connect(url1, props);
              Connection conn2 = spyCqs2.connect(url2, props)) {
@@ -712,12 +738,12 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
             upsert(conn1, tableName, true);
 
             // Instrument ServerMetadataCache to throw a SQLException twice
-            cache = ServerMetadataCache.getInstance(config);
-            ServerMetadataCache spyCache = Mockito.spy(cache);
+            cache = getServerMetadataCache();;
+            ServerMetadataCacheTestImpl spyCache = Mockito.spy(cache);
             SQLException e = new SQLException("FAIL");
             Mockito.doThrow(e).when(spyCache)
                     .getLastDDLTimestampForTable(any(), any(), eq(Bytes.toBytes(tableName)));
-            ServerMetadataCache.setInstance(spyCache);
+            ServerMetadataCacheTestImpl.setInstance(serverName, spyCache);
 
             // query using client-2 should fail
             query(conn2, tableName);
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java
index 3fb7bffb5a..ebea1f1eff 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
 import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl;
 import org.apache.phoenix.hbase.index.IndexTableName;
 import org.apache.phoenix.hbase.index.IndexTestingUtils;
 import org.apache.phoenix.hbase.index.Indexer;
@@ -269,6 +270,7 @@ public class TestWALRecoveryCaching {
     scanner.close();
     index.close();
     primary.close();
+    ServerMetadataCacheTestImpl.resetCache();
     util.shutdownMiniCluster();
   }
 
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 527bcc6e73..341b808479 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -136,14 +136,14 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.SystemExitRule;
-import org.apache.phoenix.cache.ServerMetadataCache;
 import org.apache.phoenix.compat.hbase.CompatUtil;
-import org.apache.phoenix.coprocessor.PhoenixRegionServerEndpoint;
 import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
 import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
 import org.apache.phoenix.end2end.ParallelStatsDisabledTest;
 import org.apache.phoenix.end2end.ParallelStatsEnabledIT;
 import org.apache.phoenix.end2end.ParallelStatsEnabledTest;
+import org.apache.phoenix.end2end.PhoenixRegionServerEndpointTestImpl;
+import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -493,7 +493,7 @@ public abstract class BaseTest {
         } finally {
             try {
                 // Clear ServerMetadataCache.
-                ServerMetadataCache.resetCache();
+                ServerMetadataCacheTestImpl.resetCache();
                 utility.shutdownMiniCluster();
             } catch (Throwable t) {
                 LOGGER.error("Exception caught when shutting down mini cluster", t);
@@ -646,13 +646,14 @@ public abstract class BaseTest {
     }
 
     /*
-        Set property hbase.coprocessor.regionserver.classes to include PhoenixRegionServerEndpoint
-        by default, if some other regionserver coprocs are not already present.
+        Set property hbase.coprocessor.regionserver.classes to include test implementation of
+        PhoenixRegionServerEndpoint by default, if some other regionserver coprocs
+        are not already present.
      */
     private static void setPhoenixRegionServerEndpoint(Configuration conf) {
         String value = conf.get(REGIONSERVER_COPROCESSOR_CONF_KEY);
         if (value == null) {
-            value = PhoenixRegionServerEndpoint.class.getName();
+            value = PhoenixRegionServerEndpointTestImpl.class.getName();
         }
         conf.set(REGIONSERVER_COPROCESSOR_CONF_KEY, value);
     }