You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sh...@apache.org on 2023/12/22 16:44:46 UTC

(phoenix) branch PHOENIX-6883-feature updated: PHOENIX-7115 Create separate handler thread pool for invalidating server metadata cache (#1748)

This is an automated email from the ASF dual-hosted git repository.

shahrs87 pushed a commit to branch PHOENIX-6883-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-6883-feature by this push:
     new 83134d9731 PHOENIX-7115 Create separate handler thread pool for invalidating server metadata cache (#1748)
83134d9731 is described below

commit 83134d9731ab12072d23573d2c38252489d306bb
Author: Rushabh Shah <sh...@apache.org>
AuthorDate: Fri Dec 22 08:44:40 2023 -0800

    PHOENIX-7115 Create separate handler thread pool for invalidating server metadata cache (#1748)
---
 .../FailingPhoenixRegionServerEndpoint.java        |   4 +-
 .../phoenix/end2end/InvalidateMetadataCacheIT.java |   4 +-
 .../hadoop/hbase/ipc/PhoenixRpcScheduler.java      |  37 +++-
 .../hbase/ipc/PhoenixRpcSchedulerFactory.java      |  14 +-
 ...java => InvalidateMetadataCacheController.java} |  36 ++--
 ... InvalidateMetadataCacheControllerFactory.java} |  26 +--
 .../controller/ServerSideRPCControllerFactory.java |   3 -
 .../phoenix/coprocessor/MetaDataEndpointImpl.java  | 195 +----------------
 .../phoenix/query/ConnectionQueryServices.java     |   4 +
 .../phoenix/query/ConnectionQueryServicesImpl.java | 236 +++++++++++++++++++--
 .../query/ConnectionlessQueryServicesImpl.java     |   7 +
 .../query/DelegateConnectionQueryServices.java     |   7 +
 .../org/apache/phoenix/query/QueryServices.java    |  10 +-
 .../apache/phoenix/query/QueryServicesOptions.java |   2 +
 ...dulerTest.java => PhoenixRpcSchedulerTest.java} |  61 ++++--
 .../phoenix/cache/ServerMetadataCacheTest.java     |  26 ++-
 16 files changed, 405 insertions(+), 267 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/FailingPhoenixRegionServerEndpoint.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/FailingPhoenixRegionServerEndpoint.java
index 7e40cfe76f..5f33610c29 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/FailingPhoenixRegionServerEndpoint.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/FailingPhoenixRegionServerEndpoint.java
@@ -27,8 +27,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
-import static org.apache.phoenix.coprocessor.MetaDataEndpointImpl.PHOENIX_METADATA_CACHE_INVALIDATION_TIMEOUT_MS;
-import static org.apache.phoenix.coprocessor.MetaDataEndpointImpl.PHOENIX_METADATA_CACHE_INVALIDATION_TIMEOUT_MS_DEFAULT;
+import static org.apache.phoenix.query.QueryServices.PHOENIX_METADATA_CACHE_INVALIDATION_TIMEOUT_MS;
+import static org.apache.phoenix.query.QueryServices.PHOENIX_METADATA_CACHE_INVALIDATION_TIMEOUT_MS_DEFAULT;
 
 public class FailingPhoenixRegionServerEndpoint extends PhoenixRegionServerEndpoint {
     private static final Logger LOGGER = LoggerFactory.getLogger(FailingPhoenixRegionServerEndpoint.class);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/InvalidateMetadataCacheIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/InvalidateMetadataCacheIT.java
index c0727b9d68..b8d1d55732 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/InvalidateMetadataCacheIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/InvalidateMetadataCacheIT.java
@@ -34,8 +34,8 @@ import java.util.Map;
 import java.util.Properties;
 
 import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY;
-import static org.apache.phoenix.coprocessor.MetaDataEndpointImpl.PHOENIX_METADATA_CACHE_INVALIDATION_TIMEOUT_MS;
-import static org.apache.phoenix.coprocessor.MetaDataEndpointImpl.PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED;
+import static org.apache.phoenix.query.QueryServices.PHOENIX_METADATA_CACHE_INVALIDATION_TIMEOUT_MS;
+import static org.apache.phoenix.query.QueryServices.PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.fail;
 
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java
index ea6a5c9719..0a04ad4787 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java
@@ -27,12 +27,13 @@ import org.apache.phoenix.query.QueryServicesOptions;
 
 import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
 
+import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_INVALIDATE_CACHE_HANDLER_COUNT;
+
 /**
  * {@link RpcScheduler} that first checks to see if this is an index or metadata update before passing off the
  * call to the delegate {@link RpcScheduler}.
  */
 public class PhoenixRpcScheduler extends CompatPhoenixRpcScheduler {
-
     // copied from org.apache.hadoop.hbase.ipc.SimpleRpcScheduler in HBase 0.98.4
     private static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY = "ipc.server.callqueue.handler.factor";
     private static final String CALLQUEUE_LENGTH_CONF_KEY = "ipc.server.max.callqueue.length";
@@ -41,28 +42,44 @@ public class PhoenixRpcScheduler extends CompatPhoenixRpcScheduler {
     private int indexPriority;
     private int metadataPriority;
     private int serverSidePriority;
+    private int invalidateMetadataCachePriority;
     private RpcExecutor indexCallExecutor;
     private RpcExecutor metadataCallExecutor;
     private RpcExecutor serverSideCallExecutor;
+    // Executor for invalidating server side metadata cache RPCs.
+    private RpcExecutor invalidateMetadataCacheCallExecutor;
     private int port;
     
 
-    public PhoenixRpcScheduler(Configuration conf, RpcScheduler delegate, int indexPriority, int metadataPriority, int serversidePriority, PriorityFunction priorityFunction, Abortable abortable) {
+    public PhoenixRpcScheduler(Configuration conf, RpcScheduler delegate, int indexPriority,
+                               int metadataPriority, int serversidePriority,
+                               int invalidateMetadataCachePriority,
+                               PriorityFunction priorityFunction, Abortable abortable) {
         // copied from org.apache.hadoop.hbase.ipc.SimpleRpcScheduler in HBase 0.98.4
     	int indexHandlerCount = conf.getInt(QueryServices.INDEX_HANDLER_COUNT_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_HANDLER_COUNT);
         int metadataHandlerCount = conf.getInt(QueryServices.METADATA_HANDLER_COUNT_ATTRIB, QueryServicesOptions.DEFAULT_METADATA_HANDLER_COUNT);
         int serverSideHandlerCount = conf.getInt(QueryServices.SERVER_SIDE_HANDLER_COUNT_ATTRIB, QueryServicesOptions.DEFAULT_SERVERSIDE_HANDLER_COUNT);
+        int invalidateMetadataCacheHandlerCount = conf.getInt(
+                QueryServices.INVALIDATE_CACHE_HANDLER_COUNT_ATTRIB,
+                DEFAULT_INVALIDATE_CACHE_HANDLER_COUNT);
         int maxIndexQueueLength =  conf.getInt(CALLQUEUE_LENGTH_CONF_KEY, indexHandlerCount*DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
         int maxMetadataQueueLength =  conf.getInt(CALLQUEUE_LENGTH_CONF_KEY, metadataHandlerCount*DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
         int maxServerSideQueueLength =  conf.getInt(CALLQUEUE_LENGTH_CONF_KEY, serverSideHandlerCount*DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
+        int maxInvalidateMetadataCacheQueueLength =  conf.getInt(CALLQUEUE_LENGTH_CONF_KEY,
+                invalidateMetadataCacheHandlerCount * DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
+
 
         this.indexPriority = indexPriority;
         this.metadataPriority = metadataPriority;
         this.serverSidePriority = serversidePriority;
+        this.invalidateMetadataCachePriority = invalidateMetadataCachePriority;
         this.delegate = delegate;
         this.indexCallExecutor = new BalancedQueueRpcExecutor("Index", indexHandlerCount, maxIndexQueueLength, priorityFunction,conf,abortable);
         this.metadataCallExecutor = new BalancedQueueRpcExecutor("Metadata", metadataHandlerCount, maxMetadataQueueLength, priorityFunction,conf,abortable);
         this.serverSideCallExecutor = new BalancedQueueRpcExecutor("ServerSide", serverSideHandlerCount, maxServerSideQueueLength, priorityFunction,conf,abortable);
+        this.invalidateMetadataCacheCallExecutor = new BalancedQueueRpcExecutor(
+                "InvalidateMetadataCache", invalidateMetadataCacheHandlerCount,
+                maxInvalidateMetadataCacheQueueLength, priorityFunction, conf, abortable);
     }
 
     @Override
@@ -77,6 +94,7 @@ public class PhoenixRpcScheduler extends CompatPhoenixRpcScheduler {
         indexCallExecutor.start(port);
         metadataCallExecutor.start(port);
         serverSideCallExecutor.start(port);
+        invalidateMetadataCacheCallExecutor.start(port);
     }
 
     @Override
@@ -85,6 +103,7 @@ public class PhoenixRpcScheduler extends CompatPhoenixRpcScheduler {
         indexCallExecutor.stop();
         metadataCallExecutor.stop();
         serverSideCallExecutor.stop();
+        invalidateMetadataCacheCallExecutor.stop();
     }
 
     @Override
@@ -97,6 +116,8 @@ public class PhoenixRpcScheduler extends CompatPhoenixRpcScheduler {
             return metadataCallExecutor.dispatch(callTask);
         } else if (serverSidePriority == priority) {
             return serverSideCallExecutor.dispatch(callTask);
+        } else if (invalidateMetadataCachePriority == priority) {
+            return invalidateMetadataCacheCallExecutor.dispatch(callTask);
         } else {
             return delegate.dispatch(callTask);
         }
@@ -114,7 +135,8 @@ public class PhoenixRpcScheduler extends CompatPhoenixRpcScheduler {
         return this.delegate.getGeneralQueueLength()
                 + this.indexCallExecutor.getQueueLength()
                 + this.metadataCallExecutor.getQueueLength()
-                + this.serverSideCallExecutor.getQueueLength();
+                + this.serverSideCallExecutor.getQueueLength()
+                + this.invalidateMetadataCacheCallExecutor.getQueueLength();
     }
 
     @Override
@@ -132,7 +154,8 @@ public class PhoenixRpcScheduler extends CompatPhoenixRpcScheduler {
         return this.delegate.getActiveRpcHandlerCount()
                 + this.indexCallExecutor.getActiveHandlerCount()
                 + this.metadataCallExecutor.getActiveHandlerCount()
-                + this.serverSideCallExecutor.getActiveHandlerCount();
+                + this.serverSideCallExecutor.getActiveHandlerCount()
+                + this.invalidateMetadataCacheCallExecutor.getActiveHandlerCount();
     }
 
     @Override
@@ -155,6 +178,11 @@ public class PhoenixRpcScheduler extends CompatPhoenixRpcScheduler {
         this.metadataCallExecutor = executor;
     }
 
+    @VisibleForTesting
+    public void setInvalidateMetadataCacheExecutorForTesting(RpcExecutor executor) {
+        this.invalidateMetadataCacheCallExecutor = executor;
+    }
+
     @VisibleForTesting
     public void setServerSideExecutorForTesting(RpcExecutor executor) {
         this.serverSideCallExecutor = executor;
@@ -229,5 +257,4 @@ public class PhoenixRpcScheduler extends CompatPhoenixRpcScheduler {
     public int getActiveReplicationRpcHandlerCount() {
         return this.delegate.getActiveReplicationRpcHandlerCount();
     }
-
 }
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerFactory.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerFactory.java
index 74adf01573..d57828c260 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerFactory.java
@@ -63,12 +63,16 @@ public class PhoenixRpcSchedulerFactory implements RpcSchedulerFactory {
         validatePriority(serverSidePriority);
 
         // validate index and metadata priorities are not the same
-        Preconditions.checkArgument(indexPriority != metadataPriority, "Index and Metadata priority must not be same "+ indexPriority);
+        Preconditions.checkArgument(indexPriority != metadataPriority,
+                "Index and Metadata priority must not be same " + indexPriority);
         LOGGER.info("Using custom Phoenix Index RPC Handling with index rpc priority "
                 + indexPriority + " and metadata rpc priority " + metadataPriority);
 
-        PhoenixRpcScheduler scheduler =
-                new PhoenixRpcScheduler(conf, delegate, indexPriority, metadataPriority, serverSidePriority, priorityFunction,abortable);
+        int invalidateCachePriority = getInvalidateMetadataCachePriority(conf);
+        validatePriority(invalidateCachePriority);
+        PhoenixRpcScheduler scheduler = new PhoenixRpcScheduler(conf, delegate, indexPriority,
+                metadataPriority, serverSidePriority, invalidateCachePriority, priorityFunction,
+                abortable);
         return scheduler;
     }
 
@@ -97,4 +101,8 @@ public class PhoenixRpcSchedulerFactory implements RpcSchedulerFactory {
         return conf.getInt(QueryServices.SERVER_SIDE_PRIOIRTY_ATTRIB, QueryServicesOptions.DEFAULT_SERVER_SIDE_PRIORITY);
     }
 
+    public static int getInvalidateMetadataCachePriority(Configuration conf) {
+        return conf.getInt(QueryServices.INVALIDATE_METADATA_CACHE_PRIORITY_ATTRIB,
+                QueryServicesOptions.DEFAULT_INVALIDATE_METADATA_CACHE_PRIORITY);
+    }
 }
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerSideRPCControllerFactory.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InvalidateMetadataCacheController.java
similarity index 51%
copy from phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerSideRPCControllerFactory.java
copy to phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InvalidateMetadataCacheController.java
index ba7fb6339d..3c9eda613d 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerSideRPCControllerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InvalidateMetadataCacheController.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,24 +18,34 @@
 package org.apache.hadoop.hbase.ipc.controller;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ipc.DelegatingHBaseRpcController;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory;
 
 /**
- * {@link RpcControllerFactory} that should only be used when
- * making server-server remote RPCs to the region servers hosting Phoenix SYSTEM tables.
+ * Controller used to invalidate server side metadata cache RPCs.
  */
-public class ServerSideRPCControllerFactory  {
+public class InvalidateMetadataCacheController extends DelegatingHBaseRpcController {
+    private int priority;
+
+    public InvalidateMetadataCacheController(HBaseRpcController delegate, Configuration conf) {
+        super(delegate);
+        this.priority = PhoenixRpcSchedulerFactory.getInvalidateMetadataCachePriority(conf);
+    }
 
-    private static final Logger LOG = LoggerFactory.getLogger(ServerSideRPCControllerFactory.class);
-    protected final Configuration conf;
+    @Override
+    public void setPriority(int priority) {
+        this.priority = priority;
+    }
 
-    public ServerSideRPCControllerFactory(Configuration conf) {
-        this.conf = conf;
+    @Override
+    public void setPriority(TableName tn) {
+        // Nothing
     }
 
-    public ServerToServerRpcController newController() {
-        return new ServerToServerRpcControllerImpl(this.conf);
+    @Override
+    public int getPriority() {
+        return this.priority;
     }
 }
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerSideRPCControllerFactory.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InvalidateMetadataCacheControllerFactory.java
similarity index 61%
copy from phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerSideRPCControllerFactory.java
copy to phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InvalidateMetadataCacheControllerFactory.java
index ba7fb6339d..ee6b3b24ff 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerSideRPCControllerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InvalidateMetadataCacheControllerFactory.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,24 +18,24 @@
 package org.apache.hadoop.hbase.ipc.controller;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
- * {@link RpcControllerFactory} that should only be used when
- * making server-server remote RPCs to the region servers hosting Phoenix SYSTEM tables.
+ * Factory to instantiate InvalidateMetadataCacheControllers
  */
-public class ServerSideRPCControllerFactory  {
-
-    private static final Logger LOG = LoggerFactory.getLogger(ServerSideRPCControllerFactory.class);
-    protected final Configuration conf;
+public class InvalidateMetadataCacheControllerFactory extends RpcControllerFactory {
+    public InvalidateMetadataCacheControllerFactory(Configuration conf) {
+        super(conf);
+    }
 
-    public ServerSideRPCControllerFactory(Configuration conf) {
-        this.conf = conf;
+    @Override
+    public HBaseRpcController newController() {
+        HBaseRpcController delegate = super.newController();
+        return getController(delegate);
     }
 
-    public ServerToServerRpcController newController() {
-        return new ServerToServerRpcControllerImpl(this.conf);
+    private HBaseRpcController getController(HBaseRpcController delegate) {
+        return new InvalidateMetadataCacheController(delegate, conf);
     }
 }
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerSideRPCControllerFactory.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerSideRPCControllerFactory.java
index ba7fb6339d..a1a97cf6ce 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerSideRPCControllerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerSideRPCControllerFactory.java
@@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.ipc.controller;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * {@link RpcControllerFactory} that should only be used when
@@ -28,7 +26,6 @@ import org.slf4j.LoggerFactory;
  */
 public class ServerSideRPCControllerFactory  {
 
-    private static final Logger LOG = LoggerFactory.getLogger(ServerSideRPCControllerFactory.class);
     protected final Configuration conf;
 
     public ServerSideRPCControllerFactory(Configuration conf) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index c9aa4acb73..28d561e465 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -101,23 +101,15 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import org.apache.hadoop.conf.Configuration;
@@ -133,11 +125,9 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.TagUtil;
-import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Mutation;
@@ -154,12 +144,10 @@ import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.ipc.RpcCall;
 import org.apache.hadoop.hbase.ipc.RpcUtil;
-import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.Region.RowLock;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.VersionInfo;
@@ -187,9 +175,6 @@ import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionRequest
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionResponse;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest;
-import org.apache.phoenix.coprocessor.generated.RegionServerEndpointProtos;
-import org.apache.phoenix.coprocessor.metrics.MetricsMetadataCachingSource;
-import org.apache.phoenix.coprocessor.metrics.MetricsPhoenixCoprocessorSourceFactory;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.expression.Expression;
@@ -268,7 +253,6 @@ import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixKeyValueUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.PhoenixStopWatch;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
@@ -285,7 +269,6 @@ import com.google.protobuf.ByteString;
 import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.Service;
-import com.google.protobuf.ServiceException;
 
 /**
  * Endpoint co-processor through which all Phoenix metadata mutations flow.
@@ -328,12 +311,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements RegionCopr
     // Column to track tables that have been upgraded based on PHOENIX-2067
     public static final String ROW_KEY_ORDER_OPTIMIZABLE = "ROW_KEY_ORDER_OPTIMIZABLE";
     public static final byte[] ROW_KEY_ORDER_OPTIMIZABLE_BYTES = Bytes.toBytes(ROW_KEY_ORDER_OPTIMIZABLE);
-    public static final String PHOENIX_METADATA_CACHE_INVALIDATION_TIMEOUT_MS =
-            "phoenix.metadata.cache.invalidation.timeoutMs";
-    // Default to 10 seconds.
-    public static final long PHOENIX_METADATA_CACHE_INVALIDATION_TIMEOUT_MS_DEFAULT = 10 * 1000;
-    public static final String PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED =
-            "phoenix.metadata.invalidate.cache.enabled";
+
     // KeyValues for Table
     private static final Cell TABLE_TYPE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY,
         TABLE_FAMILY_BYTES, TABLE_TYPE_BYTES);
@@ -612,8 +590,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
     private boolean allowSplittableSystemCatalogRollback;
 
     private MetricsMetadataSource metricsSource;
-    private MetricsMetadataCachingSource metricsMetadataCachingSource;
-    private long metadataCacheInvalidationTimeoutMs;
+
     public static void setFailConcurrentMutateAddColumnOneTimeForTesting(boolean fail) {
         failConcurrentMutateAddColumnOneTimeForTesting = fail;
     }
@@ -649,16 +626,11 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                 new ReadOnlyProps(config.iterator()));
         this.allowSplittableSystemCatalogRollback = config.getBoolean(QueryServices.ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK,
                 QueryServicesOptions.DEFAULT_ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK);
-        this.metadataCacheInvalidationTimeoutMs = config.getLong(
-                PHOENIX_METADATA_CACHE_INVALIDATION_TIMEOUT_MS,
-                PHOENIX_METADATA_CACHE_INVALIDATION_TIMEOUT_MS_DEFAULT);
         LOGGER.info("Starting Tracing-Metrics Systems");
         // Start the phoenix trace collection
         Tracing.addTraceMetricsSource();
         Metrics.ensureConfigured();
         metricsSource = MetricsMetadataSourceFactory.getMetadataMetricsSource();
-        metricsMetadataCachingSource
-                = MetricsPhoenixCoprocessorSourceFactory.getInstance().getMetadataCachingSource();
     }
 
     @Override
@@ -3483,170 +3455,17 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         }
     }
 
-    /**
-     * Invalidate metadata cache from all region servers for the given list of
-     * InvalidateServerMetadataCacheRequest.
-     * @throws Throwable
-     */
     private void invalidateServerMetadataCache(List<InvalidateServerMetadataCacheRequest> requests)
             throws Throwable {
-        Configuration conf = env.getConfiguration();
-        boolean invalidateCacheEnabled = conf.getBoolean(PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED,
-                false);
-        if (!invalidateCacheEnabled) {
-            LOGGER.info("Skip invalidating server metadata cache since conf property"
-                    + " phoenix.metadata.invalidate.cache.enabled is set to false");
-            return;
-        }
-        metricsMetadataCachingSource.incrementMetadataCacheInvalidationOperationsCount();
         Properties properties = new Properties();
         // Skip checking of system table existence since the system tables should have created
         // by now.
         properties.setProperty(SKIP_SYSTEM_TABLES_EXISTENCE_CHECK, "true");
         try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(properties,
-                env.getConfiguration()).unwrap(PhoenixConnection.class);
-             Admin admin = connection.getQueryServices().getAdmin()) {
-            // This will incur an extra RPC to the master. This RPC is required since we want to
-            // get current list of regionservers.
-            Collection<ServerName> serverNames = admin.getRegionServers(true);
-            PhoenixStopWatch stopWatch = new PhoenixStopWatch().start();
-            try {
-                invalidateServerMetadataCacheWithRetries(admin, serverNames, requests, false);
-                metricsMetadataCachingSource.incrementMetadataCacheInvalidationSuccessCount();
-            } catch (Throwable t) {
-                metricsMetadataCachingSource.incrementMetadataCacheInvalidationFailureCount();
-                throw t;
-            } finally {
-                metricsMetadataCachingSource
-                        .addMetadataCacheInvalidationTotalTime(stopWatch.stop().elapsedMillis());
-            }
-        }
-    }
-
-    /**
-     * Invalidate metadata cache on all regionservers with retries for the given list of
-     * InvalidateServerMetadataCacheRequest. Each InvalidateServerMetadataCacheRequest contains
-     * tenantID, schema name and table name.
-     * We retry once before failing the operation.
-     *
-     * @param admin
-     * @param serverNames
-     * @param invalidateCacheRequests
-     * @param isRetry
-     * @throws Throwable
-     */
-    private void invalidateServerMetadataCacheWithRetries(Admin admin,
-            Collection<ServerName> serverNames,
-            List<InvalidateServerMetadataCacheRequest> invalidateCacheRequests,
-            boolean isRetry) throws Throwable {
-        RegionServerEndpointProtos.InvalidateServerMetadataCacheRequest protoRequest =
-                getRequest(invalidateCacheRequests);
-        // TODO Do I need my own executor or can I re-use QueryServices#Executor
-        //  since it is supposed to be used only for scans according to documentation?
-        List<CompletableFuture<Void>> futures = new ArrayList<>();
-        Map<Future, ServerName> map = new HashMap<>();
-        for (ServerName serverName : serverNames) {
-            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
-                try {
-                    PhoenixStopWatch innerWatch = new PhoenixStopWatch().start();
-                    // TODO Using the same as ServerCacheClient but need to think if we need some
-                    // special controller for invalidating cache since this is in the path of
-                    // DDL operations. We also need to think of we need separate RPC handler
-                    // threads for this?
-                    ServerRpcController controller = new ServerRpcController();
-                    for (InvalidateServerMetadataCacheRequest invalidateCacheRequest
-                            : invalidateCacheRequests) {
-                        LOGGER.info("Sending invalidate metadata cache for {}  to region server:"
-                                + " {}", invalidateCacheRequest.toString(), serverName);
-                    }
-                    RegionServerEndpointProtos.RegionServerEndpointService.BlockingInterface
-                            service = RegionServerEndpointProtos.RegionServerEndpointService
-                            .newBlockingStub(admin.coprocessorService(serverName));
-                    // The timeout for this particular request is managed by config parameter:
-                    // hbase.rpc.timeout. Even if the future times out, this runnable can be in
-                    // RUNNING state and will not be interrupted.
-                    service.invalidateServerMetadataCache(controller, protoRequest);
-                    long cacheInvalidationTime = innerWatch.stop().elapsedMillis();
-                    LOGGER.info("Invalidating metadata cache"
-                            + " on region server: {} completed successfully and it took {} ms",
-                            serverName, cacheInvalidationTime);
-                    metricsMetadataCachingSource
-                            .addMetadataCacheInvalidationRpcTime(cacheInvalidationTime);
-                } catch (ServiceException se) {
-                    LOGGER.error("Invalidating metadata cache failed for regionserver {}",
-                            serverName, se);
-                    IOException ioe = ServerUtil.parseServiceException(se);
-                    throw new CompletionException(ioe);
-                }
-            });
-            futures.add(future);
-            map.put(future, serverName);
-        }
-
-        // Here we create one master like future which tracks individual future
-        // for each region server.
-        CompletableFuture<Void> allFutures = CompletableFuture.allOf(
-                futures.toArray(new CompletableFuture[0]));
-        try {
-            allFutures.get(metadataCacheInvalidationTimeoutMs, TimeUnit.MILLISECONDS);
-        } catch (Throwable t) {
-            List<ServerName> failedServers = getFailedServers(futures, map);
-            LOGGER.error("Invalidating metadata cache for failed for region servers: {}",
-                    failedServers, t);
-            if (isRetry) {
-                // If this is a retry attempt then just fail the operation.
-                if (allFutures.isCompletedExceptionally()) {
-                    if (t instanceof ExecutionException) {
-                        t = t.getCause();
-                    }
-                }
-                throw t;
-            } else {
-                // This is the first attempt, we can retry once.
-                // Indicate that this is a retry attempt.
-                invalidateServerMetadataCacheWithRetries(admin, failedServers,
-                        invalidateCacheRequests, true);
-            }
-        }
-    }
-
-    /*
-        Get the list of regionservers that failed the invalidateCache rpc.
-     */
-    private List<ServerName> getFailedServers(List<CompletableFuture<Void>> futures,
-                                              Map<Future, ServerName> map) {
-        List<ServerName> failedServers = new ArrayList<>();
-        for (CompletableFuture completedFuture : futures) {
-            if (completedFuture.isDone() == false) {
-                // If this task is still running, cancel it and keep in retry list.
-                ServerName sn = map.get(completedFuture);
-                failedServers.add(sn);
-                // Even though we cancel this future but it doesn't interrupt the executing thread.
-                completedFuture.cancel(true);
-            } else if (completedFuture.isCompletedExceptionally()
-                    || completedFuture.isCancelled()) {
-                // This means task is done but completed with exception
-                // or was canceled. Add it to retry list.
-                ServerName sn = map.get(completedFuture);
-                failedServers.add(sn);
-            }
-        }
-        return failedServers;
-    }
-
-    private RegionServerEndpointProtos.InvalidateServerMetadataCacheRequest getRequest(
-            List<InvalidateServerMetadataCacheRequest> requests) {
-        RegionServerEndpointProtos.InvalidateServerMetadataCacheRequest.Builder builder =
-                RegionServerEndpointProtos.InvalidateServerMetadataCacheRequest.newBuilder();
-        for (InvalidateServerMetadataCacheRequest request: requests) {
-            RegionServerEndpointProtos.InvalidateServerMetadataCache.Builder innerBuilder
-                    = RegionServerEndpointProtos.InvalidateServerMetadataCache.newBuilder();
-            innerBuilder.setTenantId(ByteStringer.wrap(request.getTenantId()));
-            innerBuilder.setSchemaName(ByteStringer.wrap(request.getSchemaName()));
-            innerBuilder.setTableName(ByteStringer.wrap(request.getTableName()));
-            builder.addInvalidateServerMetadataCacheRequests(innerBuilder.build());
+                        env.getConfiguration()).unwrap(PhoenixConnection.class)) {
+            ConnectionQueryServices queryServices = connection.getQueryServices();
+            queryServices.invalidateServerMetadataCache(requests);
         }
-        return builder.build();
     }
 
     /**
@@ -3830,7 +3649,9 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                     buildTable(key, cacheKey, region, clientTimeStamp, clientVersion);
             return table;
         } finally {
-            if (!wasLocked && rowLock != null) rowLock.release();
+            if (!wasLocked && rowLock != null) {
+                rowLock.release();
+            }
         }
     }
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index e527023bc2..45315f89a3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.MutationPlan;
+import org.apache.phoenix.coprocessor.InvalidateServerMetadataCacheRequest;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.MutationState;
@@ -230,4 +231,7 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated
     }
 
     int getConnectionCount(boolean isInternal);
+
+    void invalidateServerMetadataCache(List<InvalidateServerMetadataCacheRequest> requests)
+            throws Throwable;
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 3a83a878be..94f9d557fd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder.KEEP_
 import static org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder.MAX_VERSIONS;
 import static org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder.REPLICATION_SCOPE;
 import static org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder.TTL;
+import static org.apache.hadoop.hbase.ipc.RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY;
 import static org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP;
 import static org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_15_0;
 import static org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0;
@@ -109,11 +110,14 @@ import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
@@ -127,6 +131,7 @@ import java.util.regex.Pattern;
 import javax.annotation.concurrent.GuardedBy;
 
 import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
@@ -159,7 +164,9 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils.BlockingRpcCallback;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.ipc.controller.InvalidateMetadataCacheControllerFactory;
 import org.apache.hadoop.hbase.ipc.controller.ServerToServerRpcController;
 import org.apache.hadoop.hbase.ipc.controller.ServerSideRPCControllerFactory;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
@@ -176,6 +183,7 @@ import org.apache.hadoop.ipc.RemoteException;
 import org.apache.phoenix.compile.MutationPlan;
 import org.apache.phoenix.coprocessor.ChildLinkMetaDataEndpoint;
 import org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver;
+import org.apache.phoenix.coprocessor.InvalidateServerMetadataCacheRequest;
 import org.apache.phoenix.coprocessor.MetaDataEndpointImpl;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
@@ -211,6 +219,9 @@ import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionRespons
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest;
+import org.apache.phoenix.coprocessor.generated.RegionServerEndpointProtos;
+import org.apache.phoenix.coprocessor.metrics.MetricsMetadataCachingSource;
+import org.apache.phoenix.coprocessor.metrics.MetricsPhoenixCoprocessorSourceFactory;
 import org.apache.phoenix.exception.InvalidRegionSplitPolicyException;
 import org.apache.phoenix.exception.PhoenixIOException;
 import org.apache.phoenix.exception.RetriableUpgradeException;
@@ -409,6 +420,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     // writes guarded by "liveRegionServersLock"
     private volatile List<ServerName> liveRegionServers;
     private final Object liveRegionServersLock = new Object();
+    // Writes guarded by invalidateMetadataCacheConnLock
+    private Connection invalidateMetadataCacheConnection = null;
+    private final Object invalidateMetadataCacheConnLock = new Object();
+    private MetricsMetadataCachingSource metricsMetadataCachingSource;
+    public static final String INVALIDATE_SERVER_METADATA_CACHE_EX_MESSAGE =
+            "Cannot invalidate server metadata cache on a non-server connection";
 
     private static interface FeatureSupported {
         boolean isSupported(ConnectionQueryServices services);
@@ -541,35 +558,61 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 e.printStackTrace();
             }
         }
-
+        nSequenceSaltBuckets = config.getInt(QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB,
+                QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS);
+        this.metricsMetadataCachingSource = MetricsPhoenixCoprocessorSourceFactory.getInstance()
+                .getMetadataCachingSource();
     }
 
-    private void openConnection() throws SQLException {
+    private Connection openConnection(Configuration conf) throws SQLException {
+        Connection localConnection;
         try {
-            this.connection = HBaseFactoryProvider.getHConnectionFactory().createConnection(this.config);
+            localConnection = HBaseFactoryProvider.getHConnectionFactory().createConnection(conf);
             GLOBAL_HCONNECTIONS_COUNTER.increment();
             LOGGER.info("HConnection established. Stacktrace for informational purposes: "
-                    + connection + " " +  LogUtil.getCallerStackTrace());
+                    + localConnection + " " +  LogUtil.getCallerStackTrace());
         } catch (IOException e) {
             throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION)
             .setRootCause(e).build().buildException();
         }
-        if (this.connection.isClosed()) { // TODO: why the heck doesn't this throw above?
+        if (localConnection.isClosed()) { // TODO: why the heck doesn't this throw above?
             throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION).build().buildException();
         }
+        return localConnection;
+    }
+
+    /**
+     *  We create a long-lived hbase connection to run invalidate cache RPCs. We override
+     *  CUSTOM_CONTROLLER_CONF_KEY to instantiate InvalidateMetadataCacheController which has
+     *  a special priority for invalidate metadata cache operations.
+     * @return hbase connection
+     * @throws SQLException SQLException
+     */
+    public Connection getInvalidateMetadataCacheConnection() throws SQLException {
+        if (invalidateMetadataCacheConnection != null) {
+            return invalidateMetadataCacheConnection;
+        }
+
+        synchronized (invalidateMetadataCacheConnLock) {
+            Configuration clonedConfiguration = PropertiesUtil.cloneConfig(this.config);
+            clonedConfiguration.setClass(CUSTOM_CONTROLLER_CONF_KEY,
+                    InvalidateMetadataCacheControllerFactory.class, RpcControllerFactory.class);
+            invalidateMetadataCacheConnection = openConnection(clonedConfiguration);
+        }
+        return invalidateMetadataCacheConnection;
     }
 
     /**
      * Close the HBase connection and decrement the counter.
      * @throws IOException throws IOException
      */
-    private void closeConnection() throws IOException {
+    private void closeConnection(Connection connection) throws IOException {
         if (connection != null) {
             connection.close();
             LOGGER.info("{} HConnection closed. Stacktrace for informational"
                 + " purposes: {}", connection, LogUtil.getCallerStackTrace());
+            GLOBAL_HCONNECTIONS_COUNTER.decrement();
         }
-        GLOBAL_HCONNECTIONS_COUNTER.decrement();
     }
 
     @Override
@@ -678,8 +721,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                         latestMetaDataLock.notifyAll();
                     }
                     try {
-                        // close the HBase connection
-                        closeConnection();
+                        // close HBase connections.
+                        closeConnection(this.connection);
+                        closeConnection(this.invalidateMetadataCacheConnection);
                     } finally {
                         if (renewLeaseExecutor != null) {
                             renewLeaseExecutor.shutdownNow();
@@ -3529,7 +3573,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                         try {
                             GLOBAL_QUERY_SERVICES_COUNTER.increment();
                             LOGGER.info("An instance of ConnectionQueryServices was created.");
-                            openConnection();
+                            connection = openConnection(config);
                             hConnectionEstablished = true;
                             boolean lastDDLTimestampValidationEnabled
                                 = getProps().getBoolean(
@@ -3546,9 +3590,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                                 success = true;
                                 return null;
                             }
-                            nSequenceSaltBuckets = ConnectionQueryServicesImpl.this.props.getInt(
-                                    QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB,
-                                    QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS);
                             boolean isDoNotUpgradePropSet = UpgradeUtil.isNoUpgradeSet(props);
                             Properties scnProps = PropertiesUtil.deepCopy(props);
                             scnProps.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
@@ -3639,7 +3680,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                             }
                             try {
                                 if (!success && hConnectionEstablished) {
-                                    closeConnection();
+                                    closeConnection(connection);
+                                    closeConnection(invalidateMetadataCacheConnection);
                                 }
                             } catch (IOException e) {
                                 SQLException ex = new SQLException(e);
@@ -4459,7 +4501,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             } else {
                 nSequenceSaltBuckets = getSaltBuckets(e);
             }
-
             updateSystemSequenceWithCacheOnWriteProps(metaConnection);
         }
         return metaConnection;
@@ -6232,4 +6273,169 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     public List<LinkedBlockingQueue<WeakReference<PhoenixConnection>>> getCachedConnections() {
       return connectionQueues;
     }
+
+    /**
+     * Invalidate metadata cache from all region servers for the given list of
+     * InvalidateServerMetadataCacheRequest.
+     * @throws Throwable
+     */
+    public void invalidateServerMetadataCache(List<InvalidateServerMetadataCacheRequest> requests)
+            throws Throwable {
+        boolean invalidateCacheEnabled =
+                config.getBoolean(PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED, false);
+        if (!invalidateCacheEnabled) {
+            LOGGER.info("Skip invalidating server metadata cache since conf property"
+                    + " phoenix.metadata.invalidate.cache.enabled is set to false");
+            return;
+        }
+        if (!QueryUtil.isServerConnection(props)) {
+            LOGGER.warn(INVALIDATE_SERVER_METADATA_CACHE_EX_MESSAGE);
+            throw new Exception(INVALIDATE_SERVER_METADATA_CACHE_EX_MESSAGE);
+        }
+
+        metricsMetadataCachingSource.incrementMetadataCacheInvalidationOperationsCount();
+        Admin admin = getInvalidateMetadataCacheConnection().getAdmin();
+        // This will incur an extra RPC to the master. This RPC is required since we want to
+        // get current list of regionservers.
+        Collection<ServerName> serverNames = admin.getRegionServers(true);
+        PhoenixStopWatch stopWatch = new PhoenixStopWatch().start();
+        try {
+            invalidateServerMetadataCacheWithRetries(admin, serverNames, requests, false);
+            metricsMetadataCachingSource.incrementMetadataCacheInvalidationSuccessCount();
+        } catch (Throwable t) {
+            metricsMetadataCachingSource.incrementMetadataCacheInvalidationFailureCount();
+            throw t;
+        } finally {
+            metricsMetadataCachingSource
+                    .addMetadataCacheInvalidationTotalTime(stopWatch.stop().elapsedMillis());
+        }
+    }
+
+    /**
+     * Invalidate metadata cache on all regionservers with retries for the given list of
+     * InvalidateServerMetadataCacheRequest. Each InvalidateServerMetadataCacheRequest contains
+     * tenantID, schema name and table name.
+     * We retry once before failing the operation.
+     *
+     * @param admin
+     * @param serverNames
+     * @param invalidateCacheRequests
+     * @param isRetry
+     * @throws Throwable
+     */
+    private void invalidateServerMetadataCacheWithRetries(Admin admin,
+            Collection<ServerName> serverNames,
+            List<InvalidateServerMetadataCacheRequest> invalidateCacheRequests,
+            boolean isRetry) throws Throwable {
+        RegionServerEndpointProtos.InvalidateServerMetadataCacheRequest protoRequest =
+                getRequest(invalidateCacheRequests);
+        // TODO Do I need my own executor or can I re-use QueryServices#Executor
+        //  since it is supposed to be used only for scans according to documentation?
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        Map<Future, ServerName> map = new HashMap<>();
+        for (ServerName serverName : serverNames) {
+            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
+                try {
+                    PhoenixStopWatch innerWatch = new PhoenixStopWatch().start();
+                    for (InvalidateServerMetadataCacheRequest invalidateCacheRequest
+                            : invalidateCacheRequests) {
+                        LOGGER.info("Sending invalidate metadata cache for {}  to region server:"
+                                + " {}", invalidateCacheRequest.toString(), serverName);
+                    }
+
+                    RegionServerEndpointProtos.RegionServerEndpointService.BlockingInterface
+                            service = RegionServerEndpointProtos.RegionServerEndpointService
+                            .newBlockingStub(admin.coprocessorService(serverName));
+                    // The timeout for this particular request is managed by config parameter:
+                    // hbase.rpc.timeout. Even if the future times out, this runnable can be in
+                    // RUNNING state and will not be interrupted.
+                    // We use the controller set in hbase connection.
+                    service.invalidateServerMetadataCache(null, protoRequest);
+                    long cacheInvalidationTime = innerWatch.stop().elapsedMillis();
+                    LOGGER.info("Invalidating metadata cache"
+                                    + " on region server: {} completed successfully and it took {} ms",
+                            serverName, cacheInvalidationTime);
+                    metricsMetadataCachingSource
+                            .addMetadataCacheInvalidationRpcTime(cacheInvalidationTime);
+                } catch (ServiceException se) {
+                    LOGGER.error("Invalidating metadata cache failed for regionserver {}",
+                            serverName, se);
+                    IOException ioe = ServerUtil.parseServiceException(se);
+                    throw new CompletionException(ioe);
+                }
+            });
+            futures.add(future);
+            map.put(future, serverName);
+        }
+        // Here we create one master like future which tracks individual future
+        // for each region server.
+        CompletableFuture<Void> allFutures = CompletableFuture.allOf(
+                futures.toArray(new CompletableFuture[0]));
+        long metadataCacheInvalidationTimeoutMs = config.getLong(
+                PHOENIX_METADATA_CACHE_INVALIDATION_TIMEOUT_MS,
+                PHOENIX_METADATA_CACHE_INVALIDATION_TIMEOUT_MS_DEFAULT);
+        try {
+            allFutures.get(metadataCacheInvalidationTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (Throwable t) {
+            List<ServerName> failedServers = getFailedServers(futures, map);
+            LOGGER.error("Invalidating metadata cache for failed for region servers: {}",
+                    failedServers, t);
+            if (isRetry) {
+                // If this is a retry attempt then just fail the operation.
+                if (allFutures.isCompletedExceptionally()) {
+                    if (t instanceof ExecutionException) {
+                        t = t.getCause();
+                    }
+                }
+                throw t;
+            } else {
+                // This is the first attempt, we can retry once.
+                // Indicate that this is a retry attempt.
+                invalidateServerMetadataCacheWithRetries(admin, failedServers,
+                        invalidateCacheRequests, true);
+            }
+        }
+    }
+
+    /**
+     *  Get the list of regionservers that failed the invalidateCache rpc.
+     * @param futures futtures
+     * @param map map of future to server names
+     * @return the list of servers that failed the invalidateCache RPC.
+     */
+    private List<ServerName> getFailedServers(List<CompletableFuture<Void>> futures,
+                                              Map<Future, ServerName> map) {
+        List<ServerName> failedServers = new ArrayList<>();
+        for (CompletableFuture completedFuture : futures) {
+            if (!completedFuture.isDone()) {
+                // If this task is still running, cancel it and keep in retry list.
+                ServerName sn = map.get(completedFuture);
+                failedServers.add(sn);
+                // Even though we cancel this future but it doesn't interrupt the executing thread.
+                completedFuture.cancel(true);
+            } else if (completedFuture.isCompletedExceptionally()
+                    || completedFuture.isCancelled()) {
+                // This means task is done but completed with exception
+                // or was canceled. Add it to retry list.
+                ServerName sn = map.get(completedFuture);
+                failedServers.add(sn);
+            }
+        }
+        return failedServers;
+    }
+
+    private RegionServerEndpointProtos.InvalidateServerMetadataCacheRequest getRequest(
+            List<InvalidateServerMetadataCacheRequest> requests) {
+        RegionServerEndpointProtos.InvalidateServerMetadataCacheRequest.Builder builder =
+                RegionServerEndpointProtos.InvalidateServerMetadataCacheRequest.newBuilder();
+        for (InvalidateServerMetadataCacheRequest request: requests) {
+            RegionServerEndpointProtos.InvalidateServerMetadataCache.Builder innerBuilder
+                    = RegionServerEndpointProtos.InvalidateServerMetadataCache.newBuilder();
+            innerBuilder.setTenantId(ByteStringer.wrap(request.getTenantId()));
+            innerBuilder.setSchemaName(ByteStringer.wrap(request.getSchemaName()));
+            innerBuilder.setTableName(ByteStringer.wrap(request.getTableName()));
+            builder.addInvalidateServerMetadataCacheRequests(innerBuilder.build());
+        }
+        return builder.build();
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index dfe2a22874..62b6514f41 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.util.Addressing;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.MutationPlan;
+import org.apache.phoenix.coprocessor.InvalidateServerMetadataCacheRequest;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
@@ -824,4 +825,10 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
     public int getConnectionCount(boolean isInternal) {
         return 0;
     }
+
+    @Override
+    public void invalidateServerMetadataCache(List<InvalidateServerMetadataCacheRequest> requests)
+            throws Throwable {
+        // No-op
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index 0278345948..674f0c5d5e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.MutationPlan;
+import org.apache.phoenix.coprocessor.InvalidateServerMetadataCacheRequest;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
@@ -432,4 +433,10 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
     public int getConnectionCount(boolean isInternal) {
         return getDelegate().getConnectionCount(isInternal);
     }
+
+    @Override
+    public void invalidateServerMetadataCache(List<InvalidateServerMetadataCacheRequest> requests)
+            throws Throwable {
+        getDelegate().invalidateServerMetadataCache(requests);
+    }
 }
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 0316e18df9..71496f3630 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -186,6 +186,9 @@ public interface QueryServices extends SQLCloseable {
     public static final String INDEX_PRIOIRTY_ATTRIB = "phoenix.index.rpc.priority";
     public static final String METADATA_PRIOIRTY_ATTRIB = "phoenix.metadata.rpc.priority";
     public static final String SERVER_SIDE_PRIOIRTY_ATTRIB = "phoenix.serverside.rpc.priority";
+    String INVALIDATE_METADATA_CACHE_PRIORITY_ATTRIB =
+            "phoenix.invalidate.metadata.cache.rpc.priority";
+
     public static final String ALLOW_LOCAL_INDEX_ATTRIB = "phoenix.index.allowLocalIndex";
 
     // Retries when doing server side writes to SYSTEM.CATALOG
@@ -251,6 +254,7 @@ public interface QueryServices extends SQLCloseable {
     public static final String INDEX_HANDLER_COUNT_ATTRIB = "phoenix.rpc.index.handler.count";
     public static final String METADATA_HANDLER_COUNT_ATTRIB = "phoenix.rpc.metadata.handler.count";
     public static final String SERVER_SIDE_HANDLER_COUNT_ATTRIB = "phoenix.rpc.serverside.handler.count";
+    String INVALIDATE_CACHE_HANDLER_COUNT_ATTRIB = "phoenix.rpc.invalidate.cache.handler.count";
 
     public static final String FORCE_ROW_KEY_ORDER_ATTRIB = "phoenix.query.force.rowkeyorder";
     public static final String ALLOW_USER_DEFINED_FUNCTIONS_ATTRIB = "phoenix.functions.allowUserDefinedFunctions";
@@ -438,7 +442,11 @@ public interface QueryServices extends SQLCloseable {
      *  Parameter to disable the server merges for hinted uncovered indexes
      */
     String SERVER_MERGE_FOR_UNCOVERED_INDEX = "phoenix.query.global.server.merge.enable";
-
+    String PHOENIX_METADATA_CACHE_INVALIDATION_TIMEOUT_MS =
+            "phoenix.metadata.cache.invalidation.timeoutMs";
+    // Default to 10 seconds.
+    long PHOENIX_METADATA_CACHE_INVALIDATION_TIMEOUT_MS_DEFAULT = 10 * 1000;
+    String PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED = "phoenix.metadata.invalidate.cache.enabled";
     /**
      * Param to determine whether client can disable validation to figure out if any of the
      * descendent views extend primary key of their parents. Since this is a bit of
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index dafeadf08a..673f60d69d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -239,10 +239,12 @@ public class QueryServicesOptions {
     public static final int DEFAULT_SERVER_SIDE_PRIORITY = 500;
     public static final int DEFAULT_INDEX_PRIORITY = 1000;
     public static final int DEFAULT_METADATA_PRIORITY = 2000;
+    public static final int DEFAULT_INVALIDATE_METADATA_CACHE_PRIORITY = 3000;
     public static final boolean DEFAULT_ALLOW_LOCAL_INDEX = true;
     public static final int DEFAULT_INDEX_HANDLER_COUNT = 30;
     public static final int DEFAULT_METADATA_HANDLER_COUNT = 30;
     public static final int DEFAULT_SERVERSIDE_HANDLER_COUNT = 30;
+    public static final int DEFAULT_INVALIDATE_CACHE_HANDLER_COUNT = 10;
     public static final int DEFAULT_SYSTEM_MAX_VERSIONS = 1;
     public static final boolean DEFAULT_SYSTEM_KEEP_DELETED_CELLS = false;
 
diff --git a/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java b/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerTest.java
similarity index 75%
rename from phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
rename to phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerTest.java
index 15b83e41b5..aa629c3118 100644
--- a/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
+++ b/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerTest.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.ipc;
 
 import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.mockito.Mockito.when;
 
 import java.net.InetSocketAddress;
@@ -38,16 +39,9 @@ import org.mockito.Mockito;
 
 import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
 
-/**
- * Test that the rpc scheduler schedules index writes to the index handler queue and sends
- * everything else to the standard queues
- */
-public class PhoenixIndexRpcSchedulerTest {
-
+public class PhoenixRpcSchedulerTest {
     private static final Configuration conf = HBaseConfiguration.create();
     private static final InetSocketAddress isa = new InetSocketAddress("localhost", 0);
-
-    
     private class AbortServer implements Abortable {
       private boolean aborted = false;
 
@@ -62,25 +56,29 @@ public class PhoenixIndexRpcSchedulerTest {
       }
     }
 
+    /**
+     * Test that the rpc scheduler schedules index writes to the index handler queue and sends
+     * everything else to the standard queues
+     */
     @Test
     public void testIndexPriorityWritesToIndexHandler() throws Exception {
         RpcScheduler mock = Mockito.mock(RpcScheduler.class);
         PriorityFunction qosFunction = Mockito.mock(PriorityFunction.class);
         Abortable abortable = new AbortServer();
-        PhoenixRpcScheduler scheduler = new PhoenixRpcScheduler(conf, mock, 200, 250, 225, qosFunction,abortable);
+        PhoenixRpcScheduler scheduler = new PhoenixRpcScheduler(conf, mock, 200, 250, 225, 230, qosFunction,abortable);
         BalancedQueueRpcExecutor executor = new BalancedQueueRpcExecutor("test-queue", 1, 1,qosFunction,conf,abortable);
         scheduler.setIndexExecutorForTesting(executor);
         dispatchCallWithPriority(scheduler, 200);
         List<BlockingQueue<CallRunner>> queues = executor.getQueues();
         assertEquals(1, queues.size());
         BlockingQueue<CallRunner> queue = queues.get(0);
-        queue.poll(20, TimeUnit.SECONDS);
+        assertNotNull(queue.poll(5, TimeUnit.SECONDS));
 
         // try again, this time we tweak the ranges we support
-        scheduler = new PhoenixRpcScheduler(conf, mock, 101, 110, 105, qosFunction,abortable);
+        scheduler = new PhoenixRpcScheduler(conf, mock, 101, 110, 105, 115, qosFunction,abortable);
         scheduler.setIndexExecutorForTesting(executor);
         dispatchCallWithPriority(scheduler, 101);
-        queue.poll(20, TimeUnit.SECONDS);
+        assertNotNull(queue.poll(5, TimeUnit.SECONDS));
 
         Mockito.verify(mock, Mockito.times(2)).init(Mockito.any(Context.class));
         scheduler.stop();
@@ -92,7 +90,7 @@ public class PhoenixIndexRpcSchedulerTest {
         RpcScheduler mock = Mockito.mock(RpcScheduler.class);
         PriorityFunction qosFunction = Mockito.mock(PriorityFunction.class);
         Abortable abortable = new AbortServer();
-        PhoenixRpcScheduler scheduler1 = new PhoenixRpcScheduler(conf, mock, 200, 250, 100, qosFunction,abortable);
+        PhoenixRpcScheduler scheduler1 = new PhoenixRpcScheduler(conf, mock, 200, 250, 100, 300, qosFunction,abortable);
         RpcExecutor executor1  = scheduler1.getServerSideExecutorForTesting();
         for (int c = 0; c < 10; c++) {
             dispatchCallWithPriority(scheduler1, 100);
@@ -103,7 +101,7 @@ public class PhoenixIndexRpcSchedulerTest {
             if (queue1.size() > 0) {
                 numDispatches1 += queue1.size();
                 for (int i = 0; i < queue1.size(); i++) {
-                    queue1.poll(20, TimeUnit.SECONDS);
+                    assertNotNull(queue1.poll(5, TimeUnit.SECONDS));
                 }
             }
         }
@@ -111,7 +109,7 @@ public class PhoenixIndexRpcSchedulerTest {
         scheduler1.stop();
 
         // try again, with the incorrect executor
-        PhoenixRpcScheduler scheduler2 = new PhoenixRpcScheduler(conf, mock, 101, 110, 50, qosFunction,abortable);
+        PhoenixRpcScheduler scheduler2 = new PhoenixRpcScheduler(conf, mock, 101, 110, 50, 25,  qosFunction,abortable);
         RpcExecutor executor2  = scheduler2.getIndexExecutorForTesting();
         dispatchCallWithPriority(scheduler2, 50);
         List<BlockingQueue<CallRunner>> queues2 = executor2.getQueues();
@@ -119,7 +117,7 @@ public class PhoenixIndexRpcSchedulerTest {
         for (BlockingQueue<CallRunner> queue2 : queues2) {
             if (queue2.size() > 0) {
                 numDispatches2++;
-                queue2.poll(20, TimeUnit.SECONDS);
+                assertNotNull(queue2.poll(5, TimeUnit.SECONDS));
             }
         }
         assertEquals(0, numDispatches2);
@@ -140,12 +138,12 @@ public class PhoenixIndexRpcSchedulerTest {
         PriorityFunction qosFunction = Mockito.mock(PriorityFunction.class);
         Abortable abortable = new AbortServer();
         RpcScheduler mock = Mockito.mock(RpcScheduler.class);
-        PhoenixRpcScheduler scheduler = new PhoenixRpcScheduler(conf, mock, 200, 250, 225, qosFunction,abortable);
+        PhoenixRpcScheduler scheduler = new PhoenixRpcScheduler(conf, mock, 200, 250, 225, 275, qosFunction,abortable);
         dispatchCallWithPriority(scheduler, 100);
         dispatchCallWithPriority(scheduler, 251);
 
         // try again, this time we tweak the ranges we support
-        scheduler = new PhoenixRpcScheduler(conf, mock, 101, 110, 105, qosFunction,abortable);
+        scheduler = new PhoenixRpcScheduler(conf, mock, 101, 110, 105, 115,  qosFunction,abortable);
         dispatchCallWithPriority(scheduler, 200);
         dispatchCallWithPriority(scheduler, 111);
 
@@ -154,6 +152,33 @@ public class PhoenixIndexRpcSchedulerTest {
         scheduler.stop();
     }
 
+    /**
+     * Test that the rpc scheduler schedules invalidate metadata cache RPC to
+     * the invalidate metadata cache executor.
+     */
+    @Test
+    public void testInvalidateMetadataCacheExecutor() throws Exception {
+        RpcScheduler mock = Mockito.mock(RpcScheduler.class);
+        PriorityFunction qosFunction = Mockito.mock(PriorityFunction.class);
+        Abortable abortable = new AbortServer();
+        // Set invalidate metadata cache priority to 230.
+        int invalidateMetadataCacheCallPriority = 230;
+        PhoenixRpcScheduler scheduler = new PhoenixRpcScheduler(conf, mock,
+                200, 250, 225, invalidateMetadataCacheCallPriority, qosFunction,abortable);
+        BalancedQueueRpcExecutor executor = new BalancedQueueRpcExecutor("test-queue",
+                1, 1, qosFunction, conf, abortable);
+        scheduler.setInvalidateMetadataCacheExecutorForTesting(executor);
+        dispatchCallWithPriority(scheduler, invalidateMetadataCacheCallPriority);
+        List<BlockingQueue<CallRunner>> queues = executor.getQueues();
+        assertEquals(1, queues.size());
+        BlockingQueue<CallRunner> queue = queues.get(0);
+        assertEquals(1, queue.size());
+        assertNotNull(queue.poll(5, TimeUnit.SECONDS));
+        Mockito.verify(mock, Mockito.times(1)).init(Mockito.any(RpcScheduler.Context.class));
+        scheduler.stop();
+        executor.stop();
+    }
+
     private void dispatchCallWithPriority(RpcScheduler scheduler, int priority) throws Exception {
         CallRunner task = Mockito.mock(CallRunner.class);
         RequestHeader header = RequestHeader.newBuilder().setPriority(priority).build();
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java b/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java
index 3ba83ff2ee..11f85172ff 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.coprocessor.metrics.MetricsMetadataCachingSource;
 import org.apache.phoenix.coprocessor.metrics.MetricsPhoenixCoprocessorSourceFactory;
 import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.monitoring.GlobalClientMetrics;
@@ -46,8 +47,6 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.mockito.Mockito;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -58,12 +57,14 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
 
-import static org.apache.phoenix.coprocessor.MetaDataEndpointImpl.PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED;
+import static org.apache.phoenix.query.ConnectionQueryServicesImpl.INVALIDATE_SERVER_METADATA_CACHE_EX_MESSAGE;
+import static org.apache.phoenix.query.QueryServices.PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.eq;
@@ -75,7 +76,6 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
 
     private final Random RANDOM = new Random(42);
     private final long NEVER = (long) ConnectionProperty.UPDATE_CACHE_FREQUENCY.getValue("NEVER");
-    private static final Logger LOGGER = LoggerFactory.getLogger(ServerMetadataCacheTest.class);
 
     @BeforeClass
     public static synchronized void doSetup() throws Exception {
@@ -1422,8 +1422,24 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
         }
     }
 
-    //Helper methods
+    /*
+        Tests that invalidate server metadata cache fails on a non server connection.
+     */
+    @Test
+    public void testInvalidateMetadataCacheOnNonServerConnection() {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (PhoenixConnection conn = DriverManager.getConnection(getUrl(), props)
+                .unwrap(PhoenixConnection.class)) {
+            ConnectionQueryServices cqs = conn.getQueryServices();
+            cqs.invalidateServerMetadataCache(null);
+            fail("Shouldn't come here");
+        } catch (Throwable t) {
+            assertNotNull(t);
+            assertTrue(t.getMessage().contains(INVALIDATE_SERVER_METADATA_CACHE_EX_MESSAGE));
+        }
+    }
 
+    //Helper methods
     private long getLastDDLTimestamp(String tableName) throws SQLException {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         // Need to use different connection than what is used for creating table or indexes.