You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by tk...@apache.org on 2023/07/12 17:00:17 UTC
[phoenix] branch PHOENIX-6883-feature updated: PHOENIX-6943 Add MetadataCache on each region server. (#1637)
This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch PHOENIX-6883-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-6883-feature by this push:
new 68693129d1 PHOENIX-6943 Add MetadataCache on each region server. (#1637)
68693129d1 is described below
commit 68693129d1e7d2aad90858aabca9dc3897fc93ef
Author: Rushabh Shah <sh...@gmail.com>
AuthorDate: Wed Jul 12 10:00:12 2023 -0700
PHOENIX-6943 Add MetadataCache on each region server. (#1637)
* PHOENIX-6943 Add MetadataCache on each region server.
* PHOENIX-6943 Addressing checkstyle warnings
* PHOENIX-6943 Addressing review comments
* PHOENIX-6943 Rename test related method
---------
Co-authored-by: Rushabh Shah <ru...@rushabh-ltmflld.internal.salesforce.com>
---
.../apache/phoenix/cache/ServerMetadataCache.java | 162 +++++++++++++++++
.../phoenix/cache/ServerMetadataCacheTest.java | 201 +++++++++++++++++++++
.../org/apache/phoenix/jdbc/PhoenixTestDriver.java | 4 +-
.../java/org/apache/phoenix/query/BaseTest.java | 3 +
4 files changed, 369 insertions(+), 1 deletion(-)
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerMetadataCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerMetadataCache.java
new file mode 100644
index 0000000000..6f7d7365c1
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerMetadataCache.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.cache;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.com.google.common.cache.Cache;
+import org.apache.phoenix.thirdparty.com.google.common.cache.CacheBuilder;
+import org.apache.phoenix.thirdparty.com.google.common.cache.RemovalListener;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
+/**
+ * This manages the cache for all the objects(data table, views, indexes) on each region server.
+ * Currently, it only stores LAST_DDL_TIMESTAMP in the cache.
+ */
+public class ServerMetadataCache {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ServerMetadataCache.class);
+ private static final String PHOENIX_COPROC_REGIONSERVER_CACHE_TTL_MS =
+ "phoenix.coprocessor.regionserver.cache.ttl.ms";
+ // Keeping default cache expiry for 30 mins since we won't have stale entry
+ // for more than 30 mins.
+ private static final long DEFAULT_PHOENIX_COPROC_REGIONSERVER_CACHE_TTL_MS
+ = 30 * 60 * 1000L; // 30 mins
+ private static final String PHOENIX_COPROC_REGIONSERVER_CACHE_SIZE
+ = "phoenix.coprocessor.regionserver.cache.size";
+ private static final long DEFAULT_PHOENIX_COPROC_REGIONSERVER_CACHE_SIZE = 10000L;
+ private static volatile ServerMetadataCache INSTANCE;
+ private Configuration conf;
+ // key is the combination of <tenantID, schema name, table name>, value is the lastDDLTimestamp
+ private final Cache<ImmutableBytesPtr, Long> lastDDLTimestampMap;
+ private Connection connectionForTesting;
+
+ /**
+ * Creates/gets an instance of ServerMetadataCache.
+ * @param conf configuration
+ * @return cache
+ */
+ public static ServerMetadataCache getInstance(Configuration conf) {
+ ServerMetadataCache result = INSTANCE;
+ if (result == null) {
+ synchronized (ServerMetadataCache.class) {
+ result = INSTANCE;
+ if (result == null) {
+ INSTANCE = result = new ServerMetadataCache(conf);
+ }
+ }
+ }
+ return result;
+ }
+
+ private ServerMetadataCache(Configuration conf) {
+ this.conf = conf;
+ long maxTTL = conf.getLong(PHOENIX_COPROC_REGIONSERVER_CACHE_TTL_MS,
+ DEFAULT_PHOENIX_COPROC_REGIONSERVER_CACHE_TTL_MS);
+ long maxSize = conf.getLong(PHOENIX_COPROC_REGIONSERVER_CACHE_SIZE,
+ DEFAULT_PHOENIX_COPROC_REGIONSERVER_CACHE_SIZE);
+ lastDDLTimestampMap = CacheBuilder.newBuilder()
+ .removalListener((RemovalListener<ImmutableBytesPtr, Long>) notification -> {
+ String key = notification.getKey().toString();
+ LOGGER.debug("Expiring " + key + " because of "
+ + notification.getCause().name());
+ })
+ // maximum number of entries this cache can handle.
+ .maximumSize(maxSize)
+ .expireAfterAccess(maxTTL, TimeUnit.MILLISECONDS)
+ .build();
+ }
+
+ /**
+ * Returns the last DDL timestamp from the table.
+ * If not found in cache, then query SYSCAT regionserver.
+ * @param tenantID tenant id
+ * @param schemaName schema name
+ * @param tableName table name
+ * @return last DDL timestamp
+ * @throws Exception
+ */
+ public long getLastDDLTimestampForTable(byte[] tenantID, byte[] schemaName, byte[] tableName)
+ throws IOException {
+ String fullTableNameStr = SchemaUtil.getTableName(schemaName, tableName);
+ byte[] tableKey = SchemaUtil.getTableKey(tenantID, schemaName, tableName);
+ ImmutableBytesPtr tableKeyPtr = new ImmutableBytesPtr(tableKey);
+ // Lookup in cache if present.
+ Long lastDDLTimestamp = lastDDLTimestampMap.getIfPresent(tableKeyPtr);
+ if (lastDDLTimestamp != null) {
+ LOGGER.trace("Retrieving last ddl timestamp value from cache for tableName: {}",
+ fullTableNameStr);
+ return lastDDLTimestamp;
+ }
+
+ PTable table;
+ String tenantIDStr = Bytes.toString(tenantID);
+ if (tenantIDStr == null || tenantIDStr.isEmpty()) {
+ tenantIDStr = null;
+ }
+ Properties properties = new Properties();
+ if (tenantIDStr != null) {
+ properties.setProperty(TENANT_ID_ATTRIB, tenantIDStr);
+ }
+ try (Connection connection = getConnection(properties)) {
+ // Using PhoenixRuntime#getTableNoCache since se don't want to read cached value.
+ table = PhoenixRuntime.getTableNoCache(connection, fullTableNameStr);
+ // TODO PhoenixRuntime#getTableNoCache can throw TableNotFoundException.
+ // In that case, do we want to throw non retryable exception back to the client?
+ // Update cache with the latest DDL timestamp from SYSCAT server.
+ lastDDLTimestampMap.put(tableKeyPtr, table.getLastDDLTimestamp());
+ } catch (SQLException sqle) {
+ // Throw IOException back to the client and let the client retry depending on
+ // the configured retry policies.
+ LOGGER.warn("Exception while calling getTableNoCache for tenant id: {},"
+ + " tableName: {}", tenantIDStr, fullTableNameStr, sqle);
+ throw new IOException(sqle);
+ }
+ return table.getLastDDLTimestamp();
+ }
+
+ // This is used by tests to override specific connection to use.
+ private Connection getConnection(Properties properties) throws SQLException {
+ return connectionForTesting != null ? connectionForTesting
+ : QueryUtil.getConnectionOnServer(properties, this.conf);
+ }
+
+ @VisibleForTesting
+ public void setConnectionForTesting(Connection connection) {
+ this.connectionForTesting = connection;
+ }
+
+ @VisibleForTesting
+ public static void resetCache() {
+ LOGGER.info("Resetting ServerMetadataCache");
+ INSTANCE = null;
+ }
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java b/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java
new file mode 100644
index 0000000000..26b226b8d0
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.cache;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class ServerMetadataCacheTest extends BaseTest {
+ @BeforeClass
+ public static synchronized void doSetup() throws Exception {
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ @After
+ public void resetMetadataCache() {
+ ServerMetadataCache.resetCache();
+ }
+
+ /**
+ * Make sure cache is working fine for base table.
+ * @throws Exception
+ */
+ @Test
+ public void testCacheForBaseTable() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String tableNameStr = generateUniqueName();
+ PTable pTable;
+ // use a spyed ConnectionQueryServices so we can verify calls to getTable
+ ConnectionQueryServices spyCQS = Mockito.spy(driver.getConnectionQueryServices(getUrl(),
+ PropertiesUtil.deepCopy(TEST_PROPERTIES)));
+ try(Connection conn = spyCQS.connect(getUrl(), props)) {
+ conn.setAutoCommit(false);
+ String ddl = getCreateTableStmt(tableNameStr);
+ // Create a test table.
+ conn.createStatement().execute(ddl);
+ pTable = PhoenixRuntime.getTableNoCache(conn,
+ tableNameStr);// --> First call to CQSI#getTable
+ ServerMetadataCache cache = ServerMetadataCache.getInstance(config);
+ // Override the connection to use in ServerMetadataCache
+ cache.setConnectionForTesting(conn);
+ byte[] tableName = Bytes.toBytes(tableNameStr);
+ long lastDDLTimestampFromCache = cache.getLastDDLTimestampForTable(
+ null, null, tableName); // --> Second call to CQSI#getTable
+ // Make sure the lastDDLTimestamp are the same.
+ assertEquals(pTable.getLastDDLTimestamp().longValue(), lastDDLTimestampFromCache);
+ // Verify that we made 2 calls to CQSI#getTable.
+ verify(spyCQS, times(2)).getTable(
+ any(), any(), eq(tableName), anyLong(), anyLong());
+ // Make the same call 2 times to make sure it returns from the cache.
+ cache.getLastDDLTimestampForTable(null, null, tableName);
+ cache.getLastDDLTimestampForTable(null, null, tableName);
+ // Both the above 2 calls were served from the cache.
+ verify(spyCQS, times(2)).getTable(
+ any(), any(), eq(tableName), anyLong(), anyLong());
+ }
+ }
+
+ /**
+ * Make sure cache is working fine for global view.
+ * @throws Exception
+ */
+ @Test
+ public void testCacheForGlobalView() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String tableNameStr = generateUniqueName();
+ PTable viewTable;
+ // use a spyed ConnectionQueryServices so we can verify calls to getTable
+ ConnectionQueryServices spyCQS = Mockito.spy(driver.getConnectionQueryServices(getUrl(),
+ PropertiesUtil.deepCopy(TEST_PROPERTIES)));
+ try (Connection conn = spyCQS.connect(getUrl(), props)) {
+ conn.setAutoCommit(false);
+ String ddl = getCreateTableStmt(tableNameStr);
+ // Create a test table.
+ conn.createStatement().execute(ddl);
+ // Create view on table.
+ String whereClause = " WHERE COL1 = 1000";
+ String viewNameStr = generateUniqueName();
+ conn.createStatement().execute(getCreateViewStmt(viewNameStr, tableNameStr, whereClause));
+ viewTable = PhoenixRuntime.getTableNoCache(conn, viewNameStr); // --> First call to CQSI#getTable
+ ServerMetadataCache cache = ServerMetadataCache.getInstance(config);
+ // Override the connection to use in ServerMetadataCache
+ cache.setConnectionForTesting(conn);
+
+ long lastDDLTimestampFromCache = cache.getLastDDLTimestampForTable(
+ null, null, Bytes.toBytes(viewNameStr)); // --> Second call to CQSI#getTable
+
+ byte[] viewNameBytes = Bytes.toBytes(viewNameStr);
+ // Make sure the lastDDLTimestamp are the same.
+ assertEquals(viewTable.getLastDDLTimestamp().longValue(), lastDDLTimestampFromCache);
+ // Verify that we made 2 calls to CQSI#getTable.
+ verify(spyCQS, times(2)).getTable(
+ any(), any(), eq(viewNameBytes), anyLong(), anyLong());
+ // Make the same call 2 times to make sure it returns from the cache.
+ cache.getLastDDLTimestampForTable(null, null, viewNameBytes);
+ cache.getLastDDLTimestampForTable(null, null, viewNameBytes);
+ verify(spyCQS, times(2)).getTable(
+ any(), any(), eq(viewNameBytes), anyLong(), anyLong());
+ }
+ }
+
+ /**
+ * Make sure cache is working fine for tenant view.
+ * @throws Exception
+ */
+ @Test
+ public void testCacheForTenantView() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String tableNameStr = generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(false);
+ String ddl = getCreateTableStmt(tableNameStr);
+ // Create a test table.
+ conn.createStatement().execute(ddl);
+ }
+ String tenantId = "T_" + generateUniqueName();
+ Properties tenantProps = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ tenantProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+ PTable tenantViewTable;
+ // Create view on table.
+ String whereClause = " WHERE COL1 = 1000";
+ String tenantViewNameStr = generateUniqueName();
+ ConnectionQueryServices spyCQS = Mockito.spy(driver.getConnectionQueryServices(getUrl(),
+ PropertiesUtil.deepCopy(TEST_PROPERTIES)));
+ try (Connection conn = spyCQS.connect(getUrl(), tenantProps)) {
+ conn.createStatement().execute(getCreateViewStmt(tenantViewNameStr,
+ tableNameStr, whereClause));
+ tenantViewTable = PhoenixRuntime.getTableNoCache(conn,
+ tenantViewNameStr); // --> First call to CQSI#getTable
+ ServerMetadataCache cache = ServerMetadataCache.getInstance(config);
+ // Override the connection to use in ServerMetadataCache
+ cache.setConnectionForTesting(conn);
+ byte[] tenantIDBytes = Bytes.toBytes(tenantId);
+ long lastDDLTimestampFromCache = cache.getLastDDLTimestampForTable(tenantIDBytes,
+ null, Bytes.toBytes(tenantViewNameStr)); // --> Second call to CQSI#getTable
+ assertEquals(tenantViewTable.getLastDDLTimestamp().longValue(),
+ lastDDLTimestampFromCache);
+ byte[] tenantViewNameBytes = Bytes.toBytes(tenantViewNameStr);
+ // Verify that we made 2 calls to CQSI#getTable.
+ verify(spyCQS, times(2)).getTable(
+ any(), any(), eq(tenantViewNameBytes), anyLong(), anyLong());
+ // Make the same call 2 times to make sure it returns from the cache.
+ cache.getLastDDLTimestampForTable(tenantIDBytes,
+ null, Bytes.toBytes(tenantViewNameStr));
+ cache.getLastDDLTimestampForTable(tenantIDBytes,
+ null, Bytes.toBytes(tenantViewNameStr));
+ verify(spyCQS, times(2)).getTable(
+ any(), any(), eq(tenantViewNameBytes), anyLong(), anyLong());
+ }
+ }
+
+ private String getCreateTableStmt(String tableName) {
+ return "CREATE TABLE " + tableName +
+ " (a_string varchar not null, col1 integer" +
+ " CONSTRAINT pk PRIMARY KEY (a_string)) ";
+ }
+
+ private String getCreateViewStmt(String viewName, String fullTableName, String whereClause) {
+ String viewStmt = "CREATE VIEW " + viewName +
+ " AS SELECT * FROM "+ fullTableName + whereClause;
+ return viewStmt;
+ }
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java
index f9fa9f8acd..6f587e375d 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java
@@ -86,7 +86,9 @@ public class PhoenixTestDriver extends PhoenixEmbeddedDriver {
@Override // public for testing
public synchronized ConnectionQueryServices getConnectionQueryServices(String url, Properties info) throws SQLException {
checkClosed();
- if (connectionQueryServices != null) { return connectionQueryServices; }
+ if (connectionQueryServices != null) {
+ return connectionQueryServices;
+ }
ConnectionInfo connInfo = ConnectionInfo.create(url);
if (connInfo.isConnectionless()) {
connectionQueryServices = new ConnectionlessQueryServicesImpl(queryServices, connInfo, info);
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 30f73f77c8..0b4794520f 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -138,6 +138,7 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.SystemExitRule;
+import org.apache.phoenix.cache.ServerMetadataCache;
import org.apache.phoenix.compat.hbase.CompatUtil;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
@@ -492,6 +493,8 @@ public abstract class BaseTest {
LOGGER.error("Exception caught when shutting down mini map reduce cluster", t);
} finally {
try {
+ // Clear ServerMetadataCache.
+ ServerMetadataCache.resetCache();
utility.shutdownMiniCluster();
} catch (Throwable t) {
LOGGER.error("Exception caught when shutting down mini cluster", t);