You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by tk...@apache.org on 2023/07/12 17:00:17 UTC

[phoenix] branch PHOENIX-6883-feature updated: PHOENIX-6943 Add MetadataCache on each region server. (#1637)

This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch PHOENIX-6883-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-6883-feature by this push:
     new 68693129d1 PHOENIX-6943 Add MetadataCache on each region server. (#1637)
68693129d1 is described below

commit 68693129d1e7d2aad90858aabca9dc3897fc93ef
Author: Rushabh Shah <sh...@gmail.com>
AuthorDate: Wed Jul 12 10:00:12 2023 -0700

    PHOENIX-6943 Add MetadataCache on each region server. (#1637)
    
    * PHOENIX-6943 Add MetadataCache on each region server.
    
    * PHOENIX-6943 Addressing checkstyle warnings
    
    * PHOENIX-6943 Addressing review comments
    
    * PHOENIX-6943 Rename test related method
    
    ---------
    
    Co-authored-by: Rushabh Shah <ru...@rushabh-ltmflld.internal.salesforce.com>
---
 .../apache/phoenix/cache/ServerMetadataCache.java  | 162 +++++++++++++++++
 .../phoenix/cache/ServerMetadataCacheTest.java     | 201 +++++++++++++++++++++
 .../org/apache/phoenix/jdbc/PhoenixTestDriver.java |   4 +-
 .../java/org/apache/phoenix/query/BaseTest.java    |   3 +
 4 files changed, 369 insertions(+), 1 deletion(-)

diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerMetadataCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerMetadataCache.java
new file mode 100644
index 0000000000..6f7d7365c1
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerMetadataCache.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.cache;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.com.google.common.cache.Cache;
+import org.apache.phoenix.thirdparty.com.google.common.cache.CacheBuilder;
+import org.apache.phoenix.thirdparty.com.google.common.cache.RemovalListener;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
+/**
+ * This manages the cache for all the objects(data table, views, indexes) on each region server.
+ * Currently, it only stores LAST_DDL_TIMESTAMP in the cache.
+ */
+public class ServerMetadataCache {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ServerMetadataCache.class);
+    private static final String PHOENIX_COPROC_REGIONSERVER_CACHE_TTL_MS =
+            "phoenix.coprocessor.regionserver.cache.ttl.ms";
+    // Keeping default cache expiry for 30 mins since we won't have stale entry
+    // for more than 30 mins.
+    private static final long DEFAULT_PHOENIX_COPROC_REGIONSERVER_CACHE_TTL_MS
+            = 30 * 60 * 1000L; // 30 mins
+    private static final String PHOENIX_COPROC_REGIONSERVER_CACHE_SIZE
+            = "phoenix.coprocessor.regionserver.cache.size";
+    private static final long DEFAULT_PHOENIX_COPROC_REGIONSERVER_CACHE_SIZE = 10000L;
+    private static volatile ServerMetadataCache INSTANCE;
+    private Configuration conf;
+    // key is the combination of <tenantID, schema name, table name>, value is the lastDDLTimestamp
+    private final Cache<ImmutableBytesPtr, Long> lastDDLTimestampMap;
+    private Connection connectionForTesting;
+
+    /**
+     * Creates/gets an instance of ServerMetadataCache.
+     * @param conf configuration
+     * @return cache
+     */
+    public static ServerMetadataCache getInstance(Configuration conf) {
+        ServerMetadataCache result = INSTANCE;
+        if (result == null) {
+            synchronized (ServerMetadataCache.class) {
+                result = INSTANCE;
+                if (result == null) {
+                    INSTANCE = result = new ServerMetadataCache(conf);
+                }
+            }
+        }
+        return result;
+    }
+
+    private ServerMetadataCache(Configuration conf) {
+        this.conf = conf;
+        long maxTTL = conf.getLong(PHOENIX_COPROC_REGIONSERVER_CACHE_TTL_MS,
+                DEFAULT_PHOENIX_COPROC_REGIONSERVER_CACHE_TTL_MS);
+        long maxSize = conf.getLong(PHOENIX_COPROC_REGIONSERVER_CACHE_SIZE,
+                DEFAULT_PHOENIX_COPROC_REGIONSERVER_CACHE_SIZE);
+        lastDDLTimestampMap = CacheBuilder.newBuilder()
+                .removalListener((RemovalListener<ImmutableBytesPtr, Long>) notification -> {
+                    String key = notification.getKey().toString();
+                    LOGGER.debug("Expiring " + key + " because of "
+                            + notification.getCause().name());
+                })
+                // maximum number of entries this cache can handle.
+                .maximumSize(maxSize)
+                .expireAfterAccess(maxTTL, TimeUnit.MILLISECONDS)
+                .build();
+    }
+
+    /**
+     * Returns the last DDL timestamp from the table.
+     * If not found in cache, then query SYSCAT regionserver.
+     * @param tenantID tenant id
+     * @param schemaName schema name
+     * @param tableName table name
+     * @return last DDL timestamp
+     * @throws Exception
+     */
+    public long getLastDDLTimestampForTable(byte[] tenantID, byte[] schemaName, byte[] tableName)
+            throws IOException {
+        String fullTableNameStr = SchemaUtil.getTableName(schemaName, tableName);
+        byte[] tableKey = SchemaUtil.getTableKey(tenantID, schemaName, tableName);
+        ImmutableBytesPtr tableKeyPtr = new ImmutableBytesPtr(tableKey);
+        // Lookup in cache if present.
+        Long lastDDLTimestamp = lastDDLTimestampMap.getIfPresent(tableKeyPtr);
+        if (lastDDLTimestamp != null) {
+            LOGGER.trace("Retrieving last ddl timestamp value from cache for tableName: {}",
+                    fullTableNameStr);
+            return lastDDLTimestamp;
+        }
+
+        PTable table;
+        String tenantIDStr = Bytes.toString(tenantID);
+        if (tenantIDStr == null || tenantIDStr.isEmpty()) {
+            tenantIDStr = null;
+        }
+        Properties properties = new Properties();
+        if (tenantIDStr != null) {
+            properties.setProperty(TENANT_ID_ATTRIB, tenantIDStr);
+        }
+        try (Connection connection = getConnection(properties)) {
+            // Using PhoenixRuntime#getTableNoCache since se don't want to read cached value.
+            table = PhoenixRuntime.getTableNoCache(connection, fullTableNameStr);
+            // TODO PhoenixRuntime#getTableNoCache can throw TableNotFoundException.
+            //  In that case, do we want to throw non retryable exception back to the client?
+            // Update cache with the latest DDL timestamp from SYSCAT server.
+            lastDDLTimestampMap.put(tableKeyPtr, table.getLastDDLTimestamp());
+        } catch (SQLException sqle) {
+            // Throw IOException back to the client and let the client retry depending on
+            // the configured retry policies.
+            LOGGER.warn("Exception while calling getTableNoCache for tenant id: {},"
+                    + " tableName: {}", tenantIDStr, fullTableNameStr, sqle);
+            throw new IOException(sqle);
+        }
+        return table.getLastDDLTimestamp();
+    }
+
+    // This is used by tests to override specific connection to use.
+    private Connection getConnection(Properties properties) throws SQLException {
+        return connectionForTesting != null ? connectionForTesting
+                : QueryUtil.getConnectionOnServer(properties, this.conf);
+    }
+
+    @VisibleForTesting
+    public void setConnectionForTesting(Connection connection) {
+        this.connectionForTesting = connection;
+    }
+
+    @VisibleForTesting
+    public static  void resetCache() {
+        LOGGER.info("Resetting ServerMetadataCache");
+        INSTANCE = null;
+    }
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java b/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java
new file mode 100644
index 0000000000..26b226b8d0
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.cache;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class ServerMetadataCacheTest extends BaseTest {
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    @After
+    public void resetMetadataCache() {
+        ServerMetadataCache.resetCache();
+    }
+
+    /**
+     * Make sure cache is working fine for base table.
+     * @throws Exception
+     */
+    @Test
+    public void testCacheForBaseTable() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String tableNameStr =  generateUniqueName();
+        PTable pTable;
+        // use a spyed ConnectionQueryServices so we can verify calls to getTable
+        ConnectionQueryServices spyCQS = Mockito.spy(driver.getConnectionQueryServices(getUrl(),
+                PropertiesUtil.deepCopy(TEST_PROPERTIES)));
+        try(Connection conn = spyCQS.connect(getUrl(), props)) {
+            conn.setAutoCommit(false);
+            String ddl = getCreateTableStmt(tableNameStr);
+            // Create a test table.
+            conn.createStatement().execute(ddl);
+            pTable = PhoenixRuntime.getTableNoCache(conn,
+                    tableNameStr);// --> First call to CQSI#getTable
+            ServerMetadataCache cache = ServerMetadataCache.getInstance(config);
+            // Override the connection to use in ServerMetadataCache
+            cache.setConnectionForTesting(conn);
+            byte[] tableName = Bytes.toBytes(tableNameStr);
+            long lastDDLTimestampFromCache = cache.getLastDDLTimestampForTable(
+                    null, null, tableName); // --> Second call to CQSI#getTable
+            // Make sure the lastDDLTimestamp are the same.
+            assertEquals(pTable.getLastDDLTimestamp().longValue(), lastDDLTimestampFromCache);
+            // Verify that we made 2 calls to CQSI#getTable.
+            verify(spyCQS, times(2)).getTable(
+                    any(), any(),  eq(tableName), anyLong(), anyLong());
+            // Make the same call 2 times to make sure it returns from the cache.
+            cache.getLastDDLTimestampForTable(null, null, tableName);
+            cache.getLastDDLTimestampForTable(null, null, tableName);
+            // Both the above 2 calls were served from the cache.
+            verify(spyCQS, times(2)).getTable(
+                    any(), any(),  eq(tableName), anyLong(), anyLong());
+        }
+    }
+
+    /**
+     * Make sure cache is working fine for global view.
+     * @throws Exception
+     */
+    @Test
+    public void testCacheForGlobalView() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String tableNameStr =  generateUniqueName();
+        PTable viewTable;
+        // use a spyed ConnectionQueryServices so we can verify calls to getTable
+        ConnectionQueryServices spyCQS = Mockito.spy(driver.getConnectionQueryServices(getUrl(),
+                PropertiesUtil.deepCopy(TEST_PROPERTIES)));
+        try (Connection conn = spyCQS.connect(getUrl(), props)) {
+            conn.setAutoCommit(false);
+            String ddl = getCreateTableStmt(tableNameStr);
+            // Create a test table.
+            conn.createStatement().execute(ddl);
+            // Create view on table.
+            String whereClause = " WHERE COL1 = 1000";
+            String viewNameStr = generateUniqueName();
+            conn.createStatement().execute(getCreateViewStmt(viewNameStr, tableNameStr, whereClause));
+            viewTable = PhoenixRuntime.getTableNoCache(conn, viewNameStr);  // --> First call to CQSI#getTable
+            ServerMetadataCache cache = ServerMetadataCache.getInstance(config);
+            // Override the connection to use in ServerMetadataCache
+            cache.setConnectionForTesting(conn);
+
+            long lastDDLTimestampFromCache = cache.getLastDDLTimestampForTable(
+                    null, null, Bytes.toBytes(viewNameStr)); // --> Second call to CQSI#getTable
+
+            byte[] viewNameBytes = Bytes.toBytes(viewNameStr);
+            // Make sure the lastDDLTimestamp are the same.
+            assertEquals(viewTable.getLastDDLTimestamp().longValue(), lastDDLTimestampFromCache);
+            // Verify that we made 2 calls to CQSI#getTable.
+            verify(spyCQS, times(2)).getTable(
+                    any(), any(),  eq(viewNameBytes), anyLong(), anyLong());
+            // Make the same call 2 times to make sure it returns from the cache.
+            cache.getLastDDLTimestampForTable(null, null, viewNameBytes);
+            cache.getLastDDLTimestampForTable(null, null, viewNameBytes);
+            verify(spyCQS, times(2)).getTable(
+                    any(), any(),  eq(viewNameBytes), anyLong(), anyLong());
+        }
+    }
+
+    /**
+     * Make sure cache is working fine for tenant view.
+     * @throws Exception
+     */
+    @Test
+    public void testCacheForTenantView() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String tableNameStr = generateUniqueName();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(false);
+            String ddl = getCreateTableStmt(tableNameStr);
+            // Create a test table.
+            conn.createStatement().execute(ddl);
+        }
+        String tenantId = "T_" + generateUniqueName();
+        Properties tenantProps = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        tenantProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+        PTable tenantViewTable;
+        // Create view on table.
+        String whereClause = " WHERE COL1 = 1000";
+        String tenantViewNameStr = generateUniqueName();
+        ConnectionQueryServices spyCQS = Mockito.spy(driver.getConnectionQueryServices(getUrl(),
+                PropertiesUtil.deepCopy(TEST_PROPERTIES)));
+        try (Connection conn = spyCQS.connect(getUrl(), tenantProps)) {
+            conn.createStatement().execute(getCreateViewStmt(tenantViewNameStr,
+                    tableNameStr, whereClause));
+            tenantViewTable = PhoenixRuntime.getTableNoCache(conn,
+                    tenantViewNameStr);  // --> First call to CQSI#getTable
+            ServerMetadataCache cache = ServerMetadataCache.getInstance(config);
+            // Override the connection to use in ServerMetadataCache
+            cache.setConnectionForTesting(conn);
+            byte[] tenantIDBytes = Bytes.toBytes(tenantId);
+            long lastDDLTimestampFromCache = cache.getLastDDLTimestampForTable(tenantIDBytes,
+                    null, Bytes.toBytes(tenantViewNameStr)); // --> Second call to CQSI#getTable
+            assertEquals(tenantViewTable.getLastDDLTimestamp().longValue(),
+                    lastDDLTimestampFromCache);
+            byte[] tenantViewNameBytes = Bytes.toBytes(tenantViewNameStr);
+            // Verify that we made 2 calls to CQSI#getTable.
+            verify(spyCQS, times(2)).getTable(
+                    any(), any(),  eq(tenantViewNameBytes), anyLong(), anyLong());
+            // Make the same call 2 times to make sure it returns from the cache.
+            cache.getLastDDLTimestampForTable(tenantIDBytes,
+                    null, Bytes.toBytes(tenantViewNameStr));
+            cache.getLastDDLTimestampForTable(tenantIDBytes,
+                    null, Bytes.toBytes(tenantViewNameStr));
+            verify(spyCQS, times(2)).getTable(
+                    any(), any(),  eq(tenantViewNameBytes), anyLong(), anyLong());
+        }
+    }
+
+    private String getCreateTableStmt(String tableName) {
+        return   "CREATE TABLE " + tableName +
+                "  (a_string varchar not null, col1 integer" +
+                "  CONSTRAINT pk PRIMARY KEY (a_string)) ";
+    }
+
+    private String getCreateViewStmt(String viewName, String fullTableName, String whereClause) {
+        String viewStmt =  "CREATE VIEW " + viewName +
+                " AS SELECT * FROM "+ fullTableName + whereClause;
+        return  viewStmt;
+    }
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java
index f9fa9f8acd..6f587e375d 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java
@@ -86,7 +86,9 @@ public class PhoenixTestDriver extends PhoenixEmbeddedDriver {
     @Override // public for testing
     public synchronized ConnectionQueryServices getConnectionQueryServices(String url, Properties info) throws SQLException {
         checkClosed();
-        if (connectionQueryServices != null) { return connectionQueryServices; }
+        if (connectionQueryServices != null) {
+            return connectionQueryServices;
+        }
         ConnectionInfo connInfo = ConnectionInfo.create(url);
         if (connInfo.isConnectionless()) {
             connectionQueryServices = new ConnectionlessQueryServicesImpl(queryServices, connInfo, info);
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 30f73f77c8..0b4794520f 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -138,6 +138,7 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.SystemExitRule;
+import org.apache.phoenix.cache.ServerMetadataCache;
 import org.apache.phoenix.compat.hbase.CompatUtil;
 import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
 import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
@@ -492,6 +493,8 @@ public abstract class BaseTest {
             LOGGER.error("Exception caught when shutting down mini map reduce cluster", t);
         } finally {
             try {
+                // Clear ServerMetadataCache.
+                ServerMetadataCache.resetCache();
                 utility.shutdownMiniCluster();
             } catch (Throwable t) {
                 LOGGER.error("Exception caught when shutting down mini cluster", t);