You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sh...@apache.org on 2023/08/20 16:27:26 UTC

[phoenix] branch PHOENIX-6883-feature updated: PHOENIX-6988 Create new regionserver coprocessor named PhoenixRegionServerEndpoint in Phoenix (#1654)

This is an automated email from the ASF dual-hosted git repository.

shahrs87 pushed a commit to branch PHOENIX-6883-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-6883-feature by this push:
     new b58ea06ea0 PHOENIX-6988 Create new regionserver coprocessor named PhoenixRegionServerEndpoint in Phoenix (#1654)
b58ea06ea0 is described below

commit b58ea06ea0e34f1962c4abb732ed66ccb0aa9cce
Author: Rushabh Shah <sh...@apache.org>
AuthorDate: Sun Aug 20 09:27:20 2023 -0700

    PHOENIX-6988 Create new regionserver coprocessor named PhoenixRegionServerEndpoint in Phoenix (#1654)
---
 .../end2end/PhoenixRegionServerEndpointIT.java     | 195 +++++++++++++++++++++
 .../apache/phoenix/cache/ServerMetadataCache.java  |   9 +-
 .../coprocessor/PhoenixRegionServerEndpoint.java   |  76 ++++++++
 .../coprocessor/VerifyLastDDLTimestamp.java        |  71 ++++++++
 .../apache/phoenix/exception/SQLExceptionCode.java |   3 +
 .../exception/StaleMetadataCacheException.java     |  32 ++++
 .../java/org/apache/phoenix/util/ServerUtil.java   |   6 +-
 .../protobuf/RegionServerEndpointService.proto     |  42 +++++
 .../phoenix/cache/ServerMetadataCacheTest.java     |   6 +-
 9 files changed, 427 insertions(+), 13 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointIT.java
new file mode 100644
index 0000000000..c3898a2561
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointIT.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.PhoenixRegionServerEndpoint;
+import org.apache.phoenix.coprocessor.generated.RegionServerEndpointProtos;
+import org.apache.phoenix.exception.StaleMetadataCacheException;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.ServerUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+@Category({NeedsOwnMiniClusterTest.class })
+public class PhoenixRegionServerEndpointIT extends BaseTest {
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        props.put(REGIONSERVER_COPROCESSOR_CONF_KEY,
+                PhoenixRegionServerEndpoint.class.getName());
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    // Tests that PhoenixRegionServerEndpoint validates the last ddl timestamp for base table.
+    @Test
+    public void testValidateLastDDLTimestampNoException() throws SQLException {
+        HRegionServer regionServer = utility.getMiniHBaseCluster().getRegionServer(0);
+        PhoenixRegionServerEndpoint coprocessor = getPhoenixRegionServerEndpoint(regionServer);
+        assertNotNull(coprocessor);
+        ServerRpcController controller = new ServerRpcController();
+        String tableNameStr = generateUniqueName();
+        Properties props = new Properties();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(false);
+            String ddl = getCreateTableStmt(tableNameStr);
+            // Create a test table.
+            conn.createStatement().execute(ddl);
+            conn.commit();
+
+            PTable pTable = PhoenixRuntime.getTable(conn, tableNameStr);
+            long lastDDLTimestamp = pTable.getLastDDLTimestamp();
+            RegionServerEndpointProtos.ValidateLastDDLTimestampRequest request = getRequest(
+                    HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY,
+                    tableNameStr.getBytes(StandardCharsets.UTF_8), lastDDLTimestamp);
+            // Call coprocessor#validateLastDDLTimestamp to validate
+            // client provided last ddl timestamp
+            coprocessor.validateLastDDLTimestamp(controller, request, null);
+            assertFalse(controller.failed());
+        }
+    }
+
+    // Tests that PhoenixRegionServerEndpoint throws StaleMetadataCacheException if client
+    // provided last ddl timestamp is less than server maintained last ddl timestamp.
+    @Test
+    public void testValidateLastDDLTimestampWithException() throws SQLException {
+        HRegionServer regionServer = utility.getMiniHBaseCluster().getRegionServer(0);
+        PhoenixRegionServerEndpoint coprocessor = getPhoenixRegionServerEndpoint(regionServer);
+        assertNotNull(coprocessor);
+        ServerRpcController controller = new ServerRpcController();
+        String tableNameStr = generateUniqueName();
+        Properties props = new Properties();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(false);
+            String ddl = getCreateTableStmt(tableNameStr);
+            // Create a test table.
+            conn.createStatement().execute(ddl);
+            conn.commit();
+
+            PTable pTable = PhoenixRuntime.getTable(conn, tableNameStr);
+            long lastDDLTimestamp = pTable.getLastDDLTimestamp();
+            RegionServerEndpointProtos.ValidateLastDDLTimestampRequest request = getRequest(
+                    HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY,
+                    tableNameStr.getBytes(StandardCharsets.UTF_8), lastDDLTimestamp - 1);
+            // Call coprocessor#validateLastDDLTimestamp to validate client provided
+            // last ddl timestamp and make sure it throws an StaleMetadataCacheException
+            coprocessor.validateLastDDLTimestamp(controller, request, null);
+            assertTrue(controller.failed());
+            Exception exception = controller.getFailedOn();
+            Exception parsedException = ServerUtil.parseRemoteException(exception);
+            assertTrue(parsedException instanceof StaleMetadataCacheException);
+        }
+    }
+
+    // Tests that PhoenixRegionServerEndpoint validates the last ddl timestamp for tenant owned
+    // views
+    @Test
+    public void testValidateLastDDLTimestampWithTenantID() throws SQLException {
+        HRegionServer regionServer = utility.getMiniHBaseCluster().getRegionServer(0);
+        PhoenixRegionServerEndpoint coprocessor = getPhoenixRegionServerEndpoint(regionServer);
+        assertNotNull(coprocessor);
+        ServerRpcController controller = new ServerRpcController();
+        String tableNameStr = generateUniqueName();
+        Properties props = new Properties();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(false);
+            String ddl = getCreateTableStmt(tableNameStr);
+            // Create a test table.
+            conn.createStatement().execute(ddl);
+        }
+        String tenantId = "T_" + generateUniqueName();
+        Properties tenantProps = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        tenantProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+        PTable tenantViewTable;
+        // Create view on table.
+        String whereClause = " WHERE COL1 = 1000";
+        String tenantViewNameStr = generateUniqueName();
+        try (Connection conn = DriverManager.getConnection(getUrl(), tenantProps)) {
+            conn.createStatement().execute(getCreateViewStmt(tenantViewNameStr,
+                    tableNameStr, whereClause));
+            tenantViewTable = PhoenixRuntime.getTableNoCache(conn, tenantViewNameStr);
+
+            byte[] tenantIDBytes = Bytes.toBytes(tenantId);
+            byte[] tenantViewNameBytes = Bytes.toBytes(tenantViewNameStr);
+            long lastDDLTimestamp = tenantViewTable.getLastDDLTimestamp();
+            RegionServerEndpointProtos.ValidateLastDDLTimestampRequest request = getRequest(
+                    tenantIDBytes, HConstants.EMPTY_BYTE_ARRAY, tenantViewNameBytes,
+                    lastDDLTimestamp);
+            // Call coprocessor#validateLastDDLTimestamp to validate client provided
+            // last ddl timestamp for tenant owned views.
+            coprocessor.validateLastDDLTimestamp(controller, request, null);
+            assertFalse(controller.failed());
+        }
+    }
+
+    private String getCreateTableStmt(String tableName) {
+        return   "CREATE TABLE " + tableName +
+                "  (a_string varchar not null, col1 integer" +
+                "  CONSTRAINT pk PRIMARY KEY (a_string)) ";
+    }
+
+    private String getCreateViewStmt(String viewName, String fullTableName, String whereClause) {
+        String viewStmt =  "CREATE VIEW " + viewName +
+                " AS SELECT * FROM "+ fullTableName + whereClause;
+        return  viewStmt;
+    }
+
+    private PhoenixRegionServerEndpoint getPhoenixRegionServerEndpoint(HRegionServer regionServer) {
+        PhoenixRegionServerEndpoint coproc = regionServer
+                .getRegionServerCoprocessorHost()
+                .findCoprocessor(PhoenixRegionServerEndpoint.class);
+        return coproc;
+    }
+
+    private RegionServerEndpointProtos.ValidateLastDDLTimestampRequest getRequest(byte[] tenantID,
+            byte[] schemaName, byte[] tableName, long lastDDLTimestamp) {
+        RegionServerEndpointProtos.ValidateLastDDLTimestampRequest.Builder requestBuilder
+                = RegionServerEndpointProtos.ValidateLastDDLTimestampRequest.newBuilder();
+        RegionServerEndpointProtos.LastDDLTimestampRequest.Builder innerBuilder
+                = RegionServerEndpointProtos.LastDDLTimestampRequest.newBuilder();
+        innerBuilder.setTenantId(ByteStringer.wrap(tenantID));
+        innerBuilder.setSchemaName(ByteStringer.wrap(schemaName));
+        innerBuilder.setTableName(ByteStringer.wrap(tableName));
+        innerBuilder.setLastDDLTimestamp(lastDDLTimestamp);
+        requestBuilder.addLastDDLTimestampRequests(innerBuilder);
+        return  requestBuilder.build();
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerMetadataCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerMetadataCache.java
index 6f7d7365c1..8d3bfffffe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerMetadataCache.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerMetadataCache.java
@@ -17,7 +17,6 @@
  */
 package org.apache.phoenix.cache;
 
-import java.io.IOException;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.Properties;
@@ -105,7 +104,7 @@ public class ServerMetadataCache {
      * @throws Exception
      */
     public long getLastDDLTimestampForTable(byte[] tenantID, byte[] schemaName, byte[] tableName)
-            throws IOException {
+            throws SQLException {
         String fullTableNameStr = SchemaUtil.getTableName(schemaName, tableName);
         byte[] tableKey = SchemaUtil.getTableKey(tenantID, schemaName, tableName);
         ImmutableBytesPtr tableKeyPtr = new ImmutableBytesPtr(tableKey);
@@ -133,12 +132,6 @@ public class ServerMetadataCache {
             //  In that case, do we want to throw non retryable exception back to the client?
             // Update cache with the latest DDL timestamp from SYSCAT server.
             lastDDLTimestampMap.put(tableKeyPtr, table.getLastDDLTimestamp());
-        } catch (SQLException sqle) {
-            // Throw IOException back to the client and let the client retry depending on
-            // the configured retry policies.
-            LOGGER.warn("Exception while calling getTableNoCache for tenant id: {},"
-                    + " tableName: {}", tenantIDStr, fullTableNameStr, sqle);
-            throw new IOException(sqle);
         }
         return table.getLastDDLTimestamp();
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
new file mode 100644
index 0000000000..4ea5c7e06c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.generated.RegionServerEndpointProtos;
+import org.apache.phoenix.protobuf.ProtobufUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.ServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is first implementation of RegionServer coprocessor introduced by Phoenix.
+ */
+public class PhoenixRegionServerEndpoint
+        extends RegionServerEndpointProtos.RegionServerCoprocService
+        implements RegionServerCoprocessor {
+    private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixRegionServerEndpoint.class);
+    Configuration conf;
+
+    @Override
+    public void start(CoprocessorEnvironment env) throws IOException {
+        this.conf = env.getConfiguration();
+    }
+
+    @Override
+    public void validateLastDDLTimestamp(RpcController controller,
+            RegionServerEndpointProtos.ValidateLastDDLTimestampRequest request,
+            RpcCallback<RegionServerEndpointProtos.ValidateLastDDLTimestampResponse> done) {
+        for (RegionServerEndpointProtos.LastDDLTimestampRequest lastDDLTimestampRequest
+                : request.getLastDDLTimestampRequestsList()) {
+            byte[] tenantID = lastDDLTimestampRequest.getTenantId().toByteArray();
+            byte[] schemaName = lastDDLTimestampRequest.getSchemaName().toByteArray();
+            byte[] tableName = lastDDLTimestampRequest.getTableName().toByteArray();
+            long clientLastDDLTimestamp = lastDDLTimestampRequest.getLastDDLTimestamp();
+            String tenantIDStr = Bytes.toString(tenantID);
+            String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+            try {
+                LOGGER.debug("Verifying last ddl timestamp for tenantID: {}, tableName: {}",
+                        tenantIDStr, fullTableName);
+                VerifyLastDDLTimestamp.verifyLastDDLTimestamp(this.conf, tenantID, schemaName,
+                        tableName, clientLastDDLTimestamp);
+            } catch (Throwable t) {
+                String errorMsg = String.format("Verifying last ddl timestamp FAILED for "
+                        + "tenantID: %s,  fullTableName: %s", tenantIDStr, fullTableName);
+                LOGGER.error(errorMsg,  t);
+                IOException ioe = ServerUtil.createIOException(errorMsg, t);
+                ProtobufUtil.setControllerException(controller, ioe);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/VerifyLastDDLTimestamp.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/VerifyLastDDLTimestamp.java
new file mode 100644
index 0000000000..2f1554279e
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/VerifyLastDDLTimestamp.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.io.IOException;
+import java.sql.SQLException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.cache.ServerMetadataCache;
+import org.apache.phoenix.exception.StaleMetadataCacheException;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Client provides last DDL timestamp of tables/views/indexes included in read/write operation
+ * This verifies that client has the latest version of LAST_DDL_TIMESTAMP version.
+ * If client's provided LAST_DDL_TIMESTAMP is less than what is present in SYSTEM.CATALOG
+ * then it throws StaleMetadataCacheException.
+ */
+public class VerifyLastDDLTimestamp {
+    private static final Logger LOGGER = LoggerFactory.getLogger(VerifyLastDDLTimestamp.class);
+
+    private VerifyLastDDLTimestamp() {
+        // Not to be instantiated.
+    }
+
+    /**
+     * Verify that LAST_DDL_TIMESTAMP provided by the client is up to date. If it is stale it will
+     * throw StaleMetadataCacheException.
+     *
+     * @param tenantID               tenant id
+     * @param schemaName             schema name
+     * @param tableName              table name
+     * @param clientLastDDLTimestamp last ddl timestamp provided by client
+     * @param conf                   configuration
+     * @throws SQLException         StaleMetadataCacheException if client provided timestamp
+     *                              is stale.
+     */
+    public static void verifyLastDDLTimestamp(Configuration conf, byte[] tenantID,
+            byte[] schemaName, byte[] tableName, long clientLastDDLTimestamp) throws SQLException {
+        ServerMetadataCache cache = ServerMetadataCache.getInstance(conf);
+        long lastDDLTimestamp = cache.getLastDDLTimestampForTable(tenantID, schemaName, tableName);
+        // Is it possible to have client last ddl timestamp greater than server side?
+        if (clientLastDDLTimestamp < lastDDLTimestamp) {
+            LOGGER.error("Stale metadata for LAST_DDL_TIMESTAMP for tenantID: {}, schema: {},"
+                            + " table: {}, client provided timestamp: {}, server timestamp: {}",
+                    Bytes.toString(tenantID), Bytes.toString(schemaName),
+                    Bytes.toString(tableName), clientLastDDLTimestamp, lastDDLTimestamp);
+            String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+            throw new StaleMetadataCacheException("Stale metadata cache for table name: "
+                    + fullTableName);
+        }
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 67dd2635a1..01b6d1a82d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -596,6 +596,9 @@ public enum SQLExceptionCode {
 
     CANNOT_TRANSFORM_TRANSACTIONAL_TABLE(914, "43M25", "Cannot transform a transactional table."),
 
+    STALE_METADATA_CACHE_EXCEPTION(915, "43M26", "Stale metadata cache exception",
+        info -> new StaleMetadataCacheException(info.getMessage())),
+
     //SQLCode for testing exceptions
     FAILED_KNOWINGLY_FOR_TEST(7777, "TEST", "Exception was thrown to test something");
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/StaleMetadataCacheException.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/StaleMetadataCacheException.java
new file mode 100644
index 0000000000..584e33e0b8
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/StaleMetadataCacheException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.exception;
+
+import java.sql.SQLException;
+
+/**
+ * Indicates metadata cache is stale.
+ */
+public class StaleMetadataCacheException extends SQLException {
+    private static final long serialVersionUID = 1L;
+    private static final SQLExceptionCode code = SQLExceptionCode.STALE_METADATA_CACHE_EXCEPTION;
+
+    public StaleMetadataCacheException(String  message) {
+        super(message, code.getSQLState(), code.getErrorCode());
+    }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
index 015c6beaf3..9ecfa778cf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
@@ -170,7 +170,7 @@ public class ServerUtil {
         return parseRemoteException(t);
     }
 
-    private static SQLException parseRemoteException(Throwable t) {
+    public static SQLException parseRemoteException(Throwable t) {
         
         String message = t.getLocalizedMessage();
         if (message != null) {
@@ -179,7 +179,7 @@ public class ServerUtil {
             if (matcher.find()) {
                 int statusCode = Integer.parseInt(matcher.group(1));
                 SQLExceptionCode code = SQLExceptionCode.fromErrorCode(statusCode);
-                if(code.equals(SQLExceptionCode.HASH_JOIN_CACHE_NOT_FOUND)){
+                if (code.equals(SQLExceptionCode.HASH_JOIN_CACHE_NOT_FOUND)) {
                     Matcher m = HASH_JOIN_EXCEPTION_PATTERN.matcher(t.getLocalizedMessage());
                     if (m.find()) { return new HashJoinCacheNotFoundException(Long.parseLong(m.group(1))); }
                 }
@@ -206,7 +206,7 @@ public class ServerUtil {
             return conn.getTable(tableName);
         } catch (RuntimeException t) {
             // handle cases that an IOE is wrapped inside a RuntimeException like HTableInterface#createHTableInterface
-            if(t.getCause() instanceof IOException) {
+            if (t.getCause() instanceof IOException) {
                 throw (IOException)t.getCause();
             } else {
                 throw t;
diff --git a/phoenix-core/src/main/protobuf/RegionServerEndpointService.proto b/phoenix-core/src/main/protobuf/RegionServerEndpointService.proto
new file mode 100644
index 0000000000..16e1c2eff9
--- /dev/null
+++ b/phoenix-core/src/main/protobuf/RegionServerEndpointService.proto
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+option java_package = "org.apache.phoenix.coprocessor.generated";
+option java_outer_classname = "RegionServerEndpointProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
+
+message ValidateLastDDLTimestampResponse {
+}
+
+message ValidateLastDDLTimestampRequest {
+  repeated LastDDLTimestampRequest lastDDLTimestampRequests = 1;
+}
+
+message LastDDLTimestampRequest {
+  // Will be HConstants.EMPTY_BYTE_ARRAY if tenantID or schema name is null.
+  required bytes tenantId = 1;
+  required bytes schemaName = 2;
+  required bytes tableName = 3;
+  required int64 lastDDLTimestamp = 4;
+}
+
+service RegionServerCoprocService {
+  rpc validateLastDDLTimestamp(ValidateLastDDLTimestampRequest)
+      returns (ValidateLastDDLTimestampResponse);
+}
\ No newline at end of file
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java b/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java
index 26b226b8d0..9b2dc872be 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java
@@ -18,7 +18,7 @@
 package org.apache.phoenix.cache;
 
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
@@ -28,6 +28,7 @@ import org.apache.phoenix.util.ReadOnlyProps;
 import org.junit.After;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.mockito.Mockito;
 
 import java.sql.Connection;
@@ -43,7 +44,8 @@ import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
-public class ServerMetadataCacheTest extends BaseTest {
+@Category(ParallelStatsDisabledIT.class)
+public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
     @BeforeClass
     public static synchronized void doSetup() throws Exception {
         Map<String, String> props = Maps.newHashMapWithExpectedSize(1);