You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sh...@apache.org on 2024/03/08 05:57:50 UTC
(phoenix) branch PHOENIX-6883-feature updated: PHOENIX-7251 : Refactor server-side code to support multiple ServerMetadataCache for ITs which create multiple RSs or mini clusters (#1845)
This is an automated email from the ASF dual-hosted git repository.
shahrs87 pushed a commit to branch PHOENIX-6883-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-6883-feature by this push:
new 7930acadba PHOENIX-7251 : Refactor server-side code to support multiple ServerMetadataCache for ITs which create multiple RSs or mini clusters (#1845)
7930acadba is described below
commit 7930acadbacbc31b3b51b542c92c1121434ae4d0
Author: palash <pa...@gmail.com>
AuthorDate: Thu Mar 7 21:57:45 2024 -0800
PHOENIX-7251 : Refactor server-side code to support multiple ServerMetadataCache for ITs which create multiple RSs or mini clusters (#1845)
---
.../apache/phoenix/cache/ServerMetadataCache.java | 187 ++-------------------
...dataCache.java => ServerMetadataCacheImpl.java} | 64 ++-----
.../coprocessor/PhoenixRegionServerEndpoint.java | 10 +-
.../coprocessor/VerifyLastDDLTimestamp.java | 10 +-
.../phoenix/end2end/BackwardCompatibilityIT.java | 2 +-
.../apache/phoenix/end2end/BasePermissionsIT.java | 1 +
.../MigrateSystemTablesToSystemNamespaceIT.java | 1 +
.../PartialResultServerConfigurationIT.java | 1 +
.../PhoenixRegionServerEndpointTestImpl.java | 45 +++++
.../end2end/ServerMetadataCacheTestImpl.java | 89 ++++++++++
.../end2end/index/DropIndexDuringUpsertIT.java | 2 +
.../end2end/index/IndexAsyncThresholdIT.java | 2 +
.../index/ReplicationWithWALAnnotationIT.java | 9 +-
.../execute/UpsertSelectOverlappingBatchesIT.java | 2 +
.../index/FailForUnsupportedHBaseVersionsIT.java | 2 +
.../jdbc/HighAvailabilityTestingUtility.java | 2 +
.../monitoring/PhoenixTableLevelMetricsIT.java | 3 +
.../ConnectionQueryServicesMetricsIT.java | 3 +
.../phoenix/query/MaxConcurrentConnectionsIT.java | 2 +
...taCacheTest.java => ServerMetadataCacheIT.java} | 76 ++++++---
.../hbase/index/write/TestWALRecoveryCaching.java | 2 +
.../java/org/apache/phoenix/query/BaseTest.java | 13 +-
22 files changed, 264 insertions(+), 264 deletions(-)
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/cache/ServerMetadataCache.java b/phoenix-core-client/src/main/java/org/apache/phoenix/cache/ServerMetadataCache.java
index 4e47a1382a..f251dc7123 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/cache/ServerMetadataCache.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/cache/ServerMetadataCache.java
@@ -1,11 +1,10 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -17,177 +16,13 @@
*/
package org.apache.phoenix.cache;
-import java.sql.Connection;
import java.sql.SQLException;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.coprocessorclient.metrics.MetricsMetadataCachingSource;
-import org.apache.phoenix.coprocessorclient.metrics.MetricsPhoenixCoprocessorSourceFactory;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.phoenix.thirdparty.com.google.common.cache.Cache;
-import org.apache.phoenix.thirdparty.com.google.common.cache.CacheBuilder;
-import org.apache.phoenix.thirdparty.com.google.common.cache.RemovalListener;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.QueryUtil;
-import org.apache.phoenix.util.SchemaUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
/**
- * This manages the cache for all the objects(data table, views, indexes) on each region server.
- * Currently, it only stores LAST_DDL_TIMESTAMP in the cache.
+ * Interface for server side metadata cache hosted on each region server.
*/
-public class ServerMetadataCache {
- private static final Logger LOGGER = LoggerFactory.getLogger(ServerMetadataCache.class);
- private static final String PHOENIX_COPROC_REGIONSERVER_CACHE_TTL_MS =
- "phoenix.coprocessor.regionserver.cache.ttl.ms";
- // Keeping default cache expiry for 30 mins since we won't have stale entry
- // for more than 30 mins.
- private static final long DEFAULT_PHOENIX_COPROC_REGIONSERVER_CACHE_TTL_MS
- = 30 * 60 * 1000L; // 30 mins
- private static final String PHOENIX_COPROC_REGIONSERVER_CACHE_SIZE
- = "phoenix.coprocessor.regionserver.cache.size";
- private static final long DEFAULT_PHOENIX_COPROC_REGIONSERVER_CACHE_SIZE = 10000L;
- private static volatile ServerMetadataCache INSTANCE;
- private Configuration conf;
- // key is the combination of <tenantID, schema name, table name>, value is the lastDDLTimestamp
- private final Cache<ImmutableBytesPtr, Long> lastDDLTimestampMap;
- private Connection connectionForTesting;
- private MetricsMetadataCachingSource metricsSource;
-
- /**
- * Creates/gets an instance of ServerMetadataCache.
- * @param conf configuration
- * @return cache
- */
- public static ServerMetadataCache getInstance(Configuration conf) {
- ServerMetadataCache result = INSTANCE;
- if (result == null) {
- synchronized (ServerMetadataCache.class) {
- result = INSTANCE;
- if (result == null) {
- INSTANCE = result = new ServerMetadataCache(conf);
- }
- }
- }
- return result;
- }
-
- private ServerMetadataCache(Configuration conf) {
- this.conf = conf;
- this.metricsSource = MetricsPhoenixCoprocessorSourceFactory
- .getInstance().getMetadataCachingSource();
- long maxTTL = conf.getLong(PHOENIX_COPROC_REGIONSERVER_CACHE_TTL_MS,
- DEFAULT_PHOENIX_COPROC_REGIONSERVER_CACHE_TTL_MS);
- long maxSize = conf.getLong(PHOENIX_COPROC_REGIONSERVER_CACHE_SIZE,
- DEFAULT_PHOENIX_COPROC_REGIONSERVER_CACHE_SIZE);
- lastDDLTimestampMap = CacheBuilder.newBuilder()
- .removalListener((RemovalListener<ImmutableBytesPtr, Long>) notification -> {
- String key = notification.getKey().toString();
- LOGGER.debug("Expiring " + key + " because of "
- + notification.getCause().name());
- })
- // maximum number of entries this cache can handle.
- .maximumSize(maxSize)
- .expireAfterAccess(maxTTL, TimeUnit.MILLISECONDS)
- .build();
- }
-
- /**
- * Returns the last DDL timestamp from the table.
- * If not found in cache, then query SYSCAT regionserver.
- * @param tenantID tenant id
- * @param schemaName schema name
- * @param tableName table name
- * @return last DDL timestamp
- * @throws Exception
- */
- public long getLastDDLTimestampForTable(byte[] tenantID, byte[] schemaName, byte[] tableName)
- throws SQLException {
- String fullTableNameStr = SchemaUtil.getTableName(schemaName, tableName);
- byte[] tableKey = SchemaUtil.getTableKey(tenantID, schemaName, tableName);
- ImmutableBytesPtr tableKeyPtr = new ImmutableBytesPtr(tableKey);
- // Lookup in cache if present.
- Long lastDDLTimestamp = lastDDLTimestampMap.getIfPresent(tableKeyPtr);
- if (lastDDLTimestamp != null) {
- metricsSource.incrementRegionServerMetadataCacheHitCount();
- LOGGER.trace("Retrieving last ddl timestamp value from cache for tableName: {}",
- fullTableNameStr);
- return lastDDLTimestamp;
- }
- metricsSource.incrementRegionServerMetadataCacheMissCount();
- PTable table;
- String tenantIDStr = Bytes.toString(tenantID);
- if (tenantIDStr == null || tenantIDStr.isEmpty()) {
- tenantIDStr = null;
- }
- Properties properties = new Properties();
- if (tenantIDStr != null) {
- properties.setProperty(TENANT_ID_ATTRIB, tenantIDStr);
- }
- try (Connection connection = getConnection(properties)) {
- // Using PhoenixRuntime#getTableNoCache since se don't want to read cached value.
- table = PhoenixRuntime.getTableNoCache(connection, fullTableNameStr);
- // TODO PhoenixRuntime#getTableNoCache can throw TableNotFoundException.
- // In that case, do we want to throw non retryable exception back to the client?
- // Update cache with the latest DDL timestamp from SYSCAT server.
- lastDDLTimestampMap.put(tableKeyPtr, table.getLastDDLTimestamp());
- }
- return table.getLastDDLTimestamp();
- }
-
- /**
- * Invalidate cache for the given tenantID, schema name and table name.
- * Guava cache is thread safe so we don't have to synchronize it explicitly.
- * @param tenantID tenantID
- * @param schemaName schemaName
- * @param tableName tableName
- */
- public void invalidate(byte[] tenantID, byte[] schemaName, byte[] tableName) {
- String fullTableNameStr = SchemaUtil.getTableName(schemaName, tableName);
- LOGGER.debug("Invalidating server metadata cache for tenantID: {}, full table: {}",
- Bytes.toString(tenantID), fullTableNameStr);
- byte[] tableKey = SchemaUtil.getTableKey(tenantID, schemaName, tableName);
- ImmutableBytesPtr tableKeyPtr = new ImmutableBytesPtr(tableKey);
- lastDDLTimestampMap.invalidate(tableKeyPtr);
- }
-
- /**
- * This should be used only in the tests. DO NOT use this in production code.
- */
- @VisibleForTesting
- public Long getLastDDLTimestampForTableFromCacheOnly(byte[] tenantID, byte[] schemaName,
- byte[] tableName) {
- byte[] tableKey = SchemaUtil.getTableKey(tenantID, schemaName, tableName);
- ImmutableBytesPtr tableKeyPtr = new ImmutableBytesPtr(tableKey);
- return lastDDLTimestampMap.getIfPresent(tableKeyPtr);
- }
-
- // This is used by tests to override specific connection to use.
- private Connection getConnection(Properties properties) throws SQLException {
- return connectionForTesting != null ? connectionForTesting
- : QueryUtil.getConnectionOnServer(properties, this.conf);
- }
-
- @VisibleForTesting
- public void setConnectionForTesting(Connection connection) {
- this.connectionForTesting = connection;
- }
-
- @VisibleForTesting
- public static void resetCache() {
- LOGGER.info("Resetting ServerMetadataCache");
- INSTANCE = null;
- }
-
- @VisibleForTesting
- public static void setInstance(ServerMetadataCache cache) {
- INSTANCE = cache;
- }
+public interface ServerMetadataCache {
+ long getLastDDLTimestampForTable(byte[] tenantID, byte[] schemaName, byte[] tableName)
+ throws SQLException;
+ void invalidate(byte[] tenantID, byte[] schemaName, byte[] tableName);
}
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/cache/ServerMetadataCache.java b/phoenix-core-client/src/main/java/org/apache/phoenix/cache/ServerMetadataCacheImpl.java
similarity index 78%
copy from phoenix-core-client/src/main/java/org/apache/phoenix/cache/ServerMetadataCache.java
copy to phoenix-core-client/src/main/java/org/apache/phoenix/cache/ServerMetadataCacheImpl.java
index 4e47a1382a..bd2946992e 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/cache/ServerMetadataCache.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/cache/ServerMetadataCacheImpl.java
@@ -23,12 +23,12 @@ import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessorclient.metrics.MetricsMetadataCachingSource;
import org.apache.phoenix.coprocessorclient.metrics.MetricsPhoenixCoprocessorSourceFactory;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.phoenix.thirdparty.com.google.common.cache.Cache;
import org.apache.phoenix.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.phoenix.thirdparty.com.google.common.cache.RemovalListener;
@@ -43,8 +43,12 @@ import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
* This manages the cache for all the objects(data table, views, indexes) on each region server.
* Currently, it only stores LAST_DDL_TIMESTAMP in the cache.
*/
-public class ServerMetadataCache {
- private static final Logger LOGGER = LoggerFactory.getLogger(ServerMetadataCache.class);
+public class ServerMetadataCacheImpl implements ServerMetadataCache {
+
+ protected Configuration conf;
+ // key is the combination of <tenantID, schema name, table name>, value is the lastDDLTimestamp
+ protected final Cache<ImmutableBytesPtr, Long> lastDDLTimestampMap;
+ private static final Logger LOGGER = LoggerFactory.getLogger(ServerMetadataCacheImpl.class);
private static final String PHOENIX_COPROC_REGIONSERVER_CACHE_TTL_MS =
"phoenix.coprocessor.regionserver.cache.ttl.ms";
// Keeping default cache expiry for 30 mins since we won't have stale entry
@@ -54,33 +58,30 @@ public class ServerMetadataCache {
private static final String PHOENIX_COPROC_REGIONSERVER_CACHE_SIZE
= "phoenix.coprocessor.regionserver.cache.size";
private static final long DEFAULT_PHOENIX_COPROC_REGIONSERVER_CACHE_SIZE = 10000L;
- private static volatile ServerMetadataCache INSTANCE;
- private Configuration conf;
- // key is the combination of <tenantID, schema name, table name>, value is the lastDDLTimestamp
- private final Cache<ImmutableBytesPtr, Long> lastDDLTimestampMap;
- private Connection connectionForTesting;
+ private static volatile ServerMetadataCacheImpl cacheInstance;
private MetricsMetadataCachingSource metricsSource;
/**
* Creates/gets an instance of ServerMetadataCache.
+ *
* @param conf configuration
* @return cache
*/
- public static ServerMetadataCache getInstance(Configuration conf) {
- ServerMetadataCache result = INSTANCE;
+ public static ServerMetadataCacheImpl getInstance(Configuration conf) {
+ ServerMetadataCacheImpl result = cacheInstance;
if (result == null) {
- synchronized (ServerMetadataCache.class) {
- result = INSTANCE;
+ synchronized (ServerMetadataCacheImpl.class) {
+ result = cacheInstance;
if (result == null) {
- INSTANCE = result = new ServerMetadataCache(conf);
+ cacheInstance = result = new ServerMetadataCacheImpl(conf);
}
}
}
return result;
}
- private ServerMetadataCache(Configuration conf) {
- this.conf = conf;
+ public ServerMetadataCacheImpl(Configuration conf) {
+ this.conf = HBaseConfiguration.create(conf);
this.metricsSource = MetricsPhoenixCoprocessorSourceFactory
.getInstance().getMetadataCachingSource();
long maxTTL = conf.getLong(PHOENIX_COPROC_REGIONSERVER_CACHE_TTL_MS,
@@ -158,36 +159,7 @@ public class ServerMetadataCache {
lastDDLTimestampMap.invalidate(tableKeyPtr);
}
- /**
- * This should be used only in the tests. DO NOT use this in production code.
- */
- @VisibleForTesting
- public Long getLastDDLTimestampForTableFromCacheOnly(byte[] tenantID, byte[] schemaName,
- byte[] tableName) {
- byte[] tableKey = SchemaUtil.getTableKey(tenantID, schemaName, tableName);
- ImmutableBytesPtr tableKeyPtr = new ImmutableBytesPtr(tableKey);
- return lastDDLTimestampMap.getIfPresent(tableKeyPtr);
- }
-
- // This is used by tests to override specific connection to use.
- private Connection getConnection(Properties properties) throws SQLException {
- return connectionForTesting != null ? connectionForTesting
- : QueryUtil.getConnectionOnServer(properties, this.conf);
- }
-
- @VisibleForTesting
- public void setConnectionForTesting(Connection connection) {
- this.connectionForTesting = connection;
- }
-
- @VisibleForTesting
- public static void resetCache() {
- LOGGER.info("Resetting ServerMetadataCache");
- INSTANCE = null;
- }
-
- @VisibleForTesting
- public static void setInstance(ServerMetadataCache cache) {
- INSTANCE = cache;
+ protected Connection getConnection(Properties properties) throws SQLException {
+ return QueryUtil.getConnectionOnServer(properties, this.conf);
}
}
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
index d95ca1f9f0..3531e2f8a4 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.cache.ServerMetadataCache;
+import org.apache.phoenix.cache.ServerMetadataCacheImpl;
import org.apache.phoenix.coprocessor.generated.RegionServerEndpointProtos;
import org.apache.phoenix.coprocessorclient.metrics.MetricsMetadataCachingSource;
import org.apache.phoenix.coprocessorclient.metrics.MetricsPhoenixCoprocessorSourceFactory;
@@ -60,6 +61,7 @@ public class PhoenixRegionServerEndpoint
RegionServerEndpointProtos.ValidateLastDDLTimestampRequest request,
RpcCallback<RegionServerEndpointProtos.ValidateLastDDLTimestampResponse> done) {
metricsSource.incrementValidateTimestampRequestCount();
+ ServerMetadataCache cache = getServerMetadataCache();
for (RegionServerEndpointProtos.LastDDLTimestampRequest lastDDLTimestampRequest
: request.getLastDDLTimestampRequestsList()) {
byte[] tenantID = lastDDLTimestampRequest.getTenantId().toByteArray();
@@ -71,7 +73,7 @@ public class PhoenixRegionServerEndpoint
try {
LOGGER.debug("Verifying last ddl timestamp for tenantID: {}, tableName: {}",
tenantIDStr, fullTableName);
- VerifyLastDDLTimestamp.verifyLastDDLTimestamp(this.conf, tenantID, schemaName,
+ VerifyLastDDLTimestamp.verifyLastDDLTimestamp(cache, tenantID, schemaName,
tableName, clientLastDDLTimestamp);
} catch (Throwable t) {
String errorMsg = String.format("Verifying last ddl timestamp FAILED for "
@@ -101,7 +103,7 @@ public class PhoenixRegionServerEndpoint
String tenantIDStr = Bytes.toString(tenantID);
LOGGER.info("PhoenixRegionServerEndpoint invalidating the cache for tenantID: {},"
+ " tableName: {}", tenantIDStr, fullTableName);
- ServerMetadataCache cache = ServerMetadataCache.getInstance(conf);
+ ServerMetadataCache cache = getServerMetadataCache();
cache.invalidate(tenantID, schemaName, tableName);
}
}
@@ -110,4 +112,8 @@ public class PhoenixRegionServerEndpoint
public Iterable<Service> getServices() {
return Collections.singletonList(this);
}
+
+ public ServerMetadataCache getServerMetadataCache() {
+ return ServerMetadataCacheImpl.getInstance(conf);
+ }
}
\ No newline at end of file
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/VerifyLastDDLTimestamp.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/VerifyLastDDLTimestamp.java
index 2f1554279e..4c5bf09704 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/VerifyLastDDLTimestamp.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/VerifyLastDDLTimestamp.java
@@ -17,10 +17,8 @@
*/
package org.apache.phoenix.coprocessor;
-import java.io.IOException;
import java.sql.SQLException;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.cache.ServerMetadataCache;
import org.apache.phoenix.exception.StaleMetadataCacheException;
@@ -49,13 +47,13 @@ public class VerifyLastDDLTimestamp {
* @param schemaName schema name
* @param tableName table name
* @param clientLastDDLTimestamp last ddl timestamp provided by client
- * @param conf configuration
+ * @param cache ServerMetadataCache
* @throws SQLException StaleMetadataCacheException if client provided timestamp
* is stale.
*/
- public static void verifyLastDDLTimestamp(Configuration conf, byte[] tenantID,
- byte[] schemaName, byte[] tableName, long clientLastDDLTimestamp) throws SQLException {
- ServerMetadataCache cache = ServerMetadataCache.getInstance(conf);
+ public static void verifyLastDDLTimestamp(ServerMetadataCache cache, byte[] tenantID,
+ byte[] schemaName, byte[] tableName, long clientLastDDLTimestamp)
+ throws SQLException {
long lastDDLTimestamp = cache.getLastDDLTimestampForTable(tenantID, schemaName, tableName);
// Is it possible to have client last ddl timestamp greater than server side?
if (clientLastDDLTimestamp < lastDDLTimestamp) {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityIT.java
index 9e94f4eb1c..9b258cead2 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityIT.java
@@ -137,7 +137,7 @@ public class BackwardCompatibilityIT {
DriverManager.deregisterDriver(PhoenixDriver.INSTANCE);
} finally {
hbaseTestUtil.shutdownMiniCluster();
- ServerMetadataCache.resetCache();
+ ServerMetadataCacheTestImpl.resetCache();
}
System.setProperty("java.io.tmpdir", tmpDir);
assertFalse("refCount leaked", refCountLeaked);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java
index 397c0b4992..363f672537 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java
@@ -167,6 +167,7 @@ public abstract class BasePermissionsIT extends BaseTest {
static void initCluster(boolean isNamespaceMapped, boolean useCustomAccessController) throws Exception {
if (null != testUtil) {
+ ServerMetadataCacheTestImpl.resetCache();
testUtil.shutdownMiniCluster();
testUtil = null;
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MigrateSystemTablesToSystemNamespaceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MigrateSystemTablesToSystemNamespaceIT.java
index b75d0a3258..29fc6c526a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MigrateSystemTablesToSystemNamespaceIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MigrateSystemTablesToSystemNamespaceIT.java
@@ -98,6 +98,7 @@ public class MigrateSystemTablesToSystemNamespaceIT extends BaseTest {
try {
if (testUtil != null) {
boolean refCountLeaked = isAnyStoreRefCountLeaked();
+ ServerMetadataCacheTestImpl.resetCache();
testUtil.shutdownMiniCluster();
testUtil = null;
assertFalse("refCount leaked", refCountLeaked);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialResultServerConfigurationIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialResultServerConfigurationIT.java
index d529f6ebfd..b636f56fa3 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialResultServerConfigurationIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialResultServerConfigurationIT.java
@@ -84,6 +84,7 @@ public class PartialResultServerConfigurationIT {
try {
DriverManager.deregisterDriver(PhoenixDriver.INSTANCE);
} finally {
+ ServerMetadataCacheTestImpl.resetCache();
hbaseTestUtil.shutdownMiniCluster();
}
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointTestImpl.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointTestImpl.java
new file mode 100644
index 0000000000..c7b6a9414a
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointTestImpl.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
+import org.apache.phoenix.cache.ServerMetadataCache;
+import org.apache.phoenix.coprocessor.PhoenixRegionServerEndpoint;
+
+import java.io.IOException;
+
+/**
+ * PhoenixRegionServerEndpoint for integration tests.
+ * Uses {@link ServerMetadataCacheTestImpl} to support keeping multiple cache instances.
+ */
+public class PhoenixRegionServerEndpointTestImpl extends PhoenixRegionServerEndpoint {
+ protected ServerName serverName;
+
+ @Override
+ public void start(CoprocessorEnvironment env) throws IOException {
+ super.start(env);
+ this.serverName = ((RegionServerCoprocessorEnvironment)env).getServerName();
+ }
+
+ @Override
+ public ServerMetadataCache getServerMetadataCache() {
+ return ServerMetadataCacheTestImpl.getInstance(conf, serverName);
+ }
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerMetadataCacheTestImpl.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerMetadataCacheTestImpl.java
new file mode 100644
index 0000000000..c919f76db6
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerMetadataCacheTestImpl.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.phoenix.cache.ServerMetadataCacheImpl;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Implementation of {@link ServerMetadataCache} for Integration Tests.
+ * Supports keeping more than one instanceof the cache keyed on the regionserver ServerName.
+ *
+ * PhoenixRegionServerEndpoint is a region server coproc. There is a 1-1 correspondence between
+ * PhoenixRegionServerEndpoint and ServerMetadataCache. In ITs we can have multiple regionservers
+ * per cluster so we need multiple instances of ServerMetadataCache in the same jvm. Tests using
+ * HighAvailabilityTestingUtility create 2 clusters so we need to have one instance of
+ * ServerMetadataCache for each regionserver in each cluster.
+ */
+public class ServerMetadataCacheTestImpl extends ServerMetadataCacheImpl {
+ private static volatile Map<ServerName, ServerMetadataCacheTestImpl> INSTANCES = new HashMap<>();
+ private Connection connectionForTesting;
+
+ ServerMetadataCacheTestImpl(Configuration conf) {
+ super(conf);
+ }
+
+ public static ServerMetadataCacheTestImpl getInstance(Configuration conf, ServerName serverName) {
+ ServerMetadataCacheTestImpl result = INSTANCES.get(serverName);
+ if (result == null) {
+ synchronized (ServerMetadataCacheTestImpl.class) {
+ result = INSTANCES.get(serverName);
+ if (result == null) {
+ result = new ServerMetadataCacheTestImpl(conf);
+ INSTANCES.put(serverName, result);
+ }
+ }
+ }
+ return result;
+ }
+
+ public static void setInstance(ServerName serverName, ServerMetadataCacheTestImpl cache) {
+ INSTANCES.put(serverName, cache);
+ }
+
+ public Long getLastDDLTimestampForTableFromCacheOnly(byte[] tenantID, byte[] schemaName,
+ byte[] tableName) {
+ byte[] tableKey = SchemaUtil.getTableKey(tenantID, schemaName, tableName);
+ ImmutableBytesPtr tableKeyPtr = new ImmutableBytesPtr(tableKey);
+ return lastDDLTimestampMap.getIfPresent(tableKeyPtr);
+ }
+
+ public void setConnectionForTesting(Connection connection) {
+ this.connectionForTesting = connection;
+ }
+
+ public static void resetCache() {
+ INSTANCES.clear();
+ }
+
+ @Override
+ protected Connection getConnection(Properties properties) throws SQLException {
+ return connectionForTesting != null ? connectionForTesting
+ : super.getConnection(properties);
+ }
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropIndexDuringUpsertIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropIndexDuringUpsertIT.java
index c1527dd3f3..5e88f9bbac 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropIndexDuringUpsertIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropIndexDuringUpsertIT.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl;
import org.apache.phoenix.jdbc.PhoenixTestDriver;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.query.QueryServices;
@@ -101,6 +102,7 @@ public abstract class DropIndexDuringUpsertIT extends BaseTest {
service.shutdownNow();
destroyDriver(driver);
} finally {
+ ServerMetadataCacheTestImpl.resetCache();
util.shutdownMiniCluster();
}
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexAsyncThresholdIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexAsyncThresholdIT.java
index 51f77fae71..aaf58d2e65 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexAsyncThresholdIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexAsyncThresholdIT.java
@@ -17,6 +17,7 @@
*/
package org.apache.phoenix.end2end.index;
+import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -123,6 +124,7 @@ public class IndexAsyncThresholdIT extends BaseTest {
} catch (Throwable t) {
logger.error("Exception caught when shutting down mini cluster", t);
} finally {
+ ServerMetadataCacheTestImpl.resetCache();
ConnectionFactory.shutdown();
}
assertFalse("refCount leaked", refCountLeaked);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReplicationWithWALAnnotationIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReplicationWithWALAnnotationIT.java
index ee4e387d4b..de24e4c385 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReplicationWithWALAnnotationIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReplicationWithWALAnnotationIT.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.phoenix.coprocessor.ReplicationSinkEndpoint;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.query.BaseTest;
@@ -126,8 +127,12 @@ public class ReplicationWithWALAnnotationIT extends BaseTest {
@AfterClass
public static void afterClass() throws Exception {
- utility1.shutdownMiniCluster();
- utility2.shutdownMiniCluster();
+ try {
+ utility1.shutdownMiniCluster();
+ utility2.shutdownMiniCluster();
+ } finally {
+ ServerMetadataCacheTestImpl.resetCache();
+ }
}
private static void setupConfigsAndStartCluster() throws Exception {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
index 4994a4b7d6..66c48566bb 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.query.QueryServices;
@@ -88,6 +89,7 @@ public class UpsertSelectOverlappingBatchesIT extends BaseTest {
@AfterClass
public static synchronized void tearDownClass() throws Exception {
SlowBatchRegionObserver.SLOW_MUTATE = false;
+ ServerMetadataCacheTestImpl.resetCache();
getUtility().shutdownMiniCluster();
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/FailForUnsupportedHBaseVersionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/FailForUnsupportedHBaseVersionsIT.java
index ab66e03955..9ea6771fed 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/FailForUnsupportedHBaseVersionsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/FailForUnsupportedHBaseVersionsIT.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl;
import org.apache.phoenix.hbase.index.covered.ColumnGroup;
import org.apache.phoenix.hbase.index.covered.CoveredColumn;
import org.apache.phoenix.hbase.index.covered.CoveredColumnIndexSpecifierBuilder;
@@ -160,6 +161,7 @@ public class FailForUnsupportedHBaseVersionsIT {
} finally {
// cleanup
+ ServerMetadataCacheTestImpl.resetCache();
util.shutdownMiniCluster();
}
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java
index 1cb6c0ba79..bc67e4272a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java
@@ -18,6 +18,7 @@
package org.apache.phoenix.jdbc;
import org.apache.hadoop.hbase.StartMiniClusterOption;
+import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl;
import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.commons.lang3.RandomUtils;
@@ -472,6 +473,7 @@ public class HighAvailabilityTestingUtility {
admin1.close();
admin2.close();
try {
+ ServerMetadataCacheTestImpl.resetCache();
hbaseCluster1.shutdownMiniCluster();
hbaseCluster2.shutdownMiniCluster();
} catch (Exception e) {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java
index e070075a46..e77f292dca 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl;
import org.apache.phoenix.exception.PhoenixIOException;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.CommitException;
@@ -189,6 +190,8 @@ public class PhoenixTableLevelMetricsIT extends BaseTest {
}
} catch (Exception e) {
// ignore
+ } finally {
+ ServerMetadataCacheTestImpl.resetCache();
}
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsIT.java
index 454662b28a..0b3b6236a0 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsIT.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl;
import org.apache.phoenix.jdbc.ConnectionInfo;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDriver;
@@ -127,6 +128,8 @@ public class ConnectionQueryServicesMetricsIT extends BaseTest {
}
} catch (Exception e) {
// ignore
+ } finally {
+ ServerMetadataCacheTestImpl.resetCache();
}
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/query/MaxConcurrentConnectionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/query/MaxConcurrentConnectionsIT.java
index 5adeeb18e3..92bf8aefaa 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/query/MaxConcurrentConnectionsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/query/MaxConcurrentConnectionsIT.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.query;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDriver;
import org.apache.phoenix.util.DelayedRegionServer;
@@ -73,6 +74,7 @@ public class MaxConcurrentConnectionsIT extends BaseTest {
//Have to shutdown our special delayed region server
@AfterClass
public static void tearDown() throws Exception {
+ ServerMetadataCacheTestImpl.resetCache();
hbaseTestUtil.shutdownMiniCluster();
}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java b/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheIT.java
similarity index 96%
rename from phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java
rename to phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheIT.java
index 963b97cc00..590ca7774a 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheIT.java
@@ -17,11 +17,15 @@
*/
package org.apache.phoenix.cache;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessorclient.metrics.MetricsMetadataCachingSource;
import org.apache.phoenix.coprocessorclient.metrics.MetricsPhoenixCoprocessorSourceFactory;
import org.apache.phoenix.end2end.IndexToolIT;
import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.end2end.PhoenixRegionServerEndpointTestImpl;
+import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixResultSet;
@@ -61,6 +65,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Random;
+import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY;
import static org.apache.phoenix.query.ConnectionQueryServicesImpl.INVALIDATE_SERVER_METADATA_CACHE_EX_MESSAGE;
import static org.apache.phoenix.query.QueryServices.PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
@@ -75,12 +80,15 @@ import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+// End to end tests for metadata caching re-design.
@Category(ParallelStatsDisabledIT.class)
-public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
+public class ServerMetadataCacheIT extends ParallelStatsDisabledIT {
private final Random RANDOM = new Random(42);
private final long NEVER = (long) ConnectionProperty.UPDATE_CACHE_FREQUENCY.getValue("NEVER");
+ private static ServerName serverName;
+
@BeforeClass
public static synchronized void doSetup() throws Exception {
Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
@@ -91,6 +99,8 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB,
Long.toString(Long.MAX_VALUE));
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ assertEquals(1, getUtility().getHBaseCluster().getNumLiveRegionServers());
+ serverName = getUtility().getHBaseCluster().getRegionServer(0).getServerName();
}
@Before
@@ -100,7 +110,23 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
@After
public void resetMetadataCache() {
- ServerMetadataCache.resetCache();
+ ServerMetadataCacheTestImpl.resetCache();
+ }
+
+ /**
+ * Get the server metadata cache instance from the endpoint loaded on the region server.
+ */
+ private ServerMetadataCacheTestImpl getServerMetadataCache() {
+ String phoenixRegionServerEndpoint = config.get(REGIONSERVER_COPROCESSOR_CONF_KEY);
+ assertNotNull(phoenixRegionServerEndpoint);
+ RegionServerCoprocessor coproc = getUtility().getHBaseCluster()
+ .getRegionServer(0)
+ .getRegionServerCoprocessorHost()
+ .findCoprocessor(phoenixRegionServerEndpoint);
+ assertNotNull(coproc);
+ ServerMetadataCache cache = ((PhoenixRegionServerEndpointTestImpl)coproc).getServerMetadataCache();
+ assertNotNull(cache);
+ return (ServerMetadataCacheTestImpl)cache;
}
/**
@@ -121,7 +147,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
createTable(conn, tableNameStr, NEVER);
pTable = PhoenixRuntime.getTableNoCache(conn,
tableNameStr);// --> First call to CQSI#getTable
- ServerMetadataCache cache = ServerMetadataCache.getInstance(config);
+ ServerMetadataCacheTestImpl cache = getServerMetadataCache();
// Override the connection to use in ServerMetadataCache
cache.setConnectionForTesting(conn);
byte[] tableName = Bytes.toBytes(tableNameStr);
@@ -162,7 +188,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
String viewNameStr = generateUniqueName();
createViewWhereClause(conn, tableNameStr, viewNameStr, whereClause);
viewTable = PhoenixRuntime.getTableNoCache(conn, viewNameStr); // --> First call to CQSI#getTable
- ServerMetadataCache cache = ServerMetadataCache.getInstance(config);
+ ServerMetadataCacheTestImpl cache = getServerMetadataCache();;
// Override the connection to use in ServerMetadataCache
cache.setConnectionForTesting(conn);
@@ -209,7 +235,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
createViewWhereClause(conn, tableNameStr, tenantViewNameStr, whereClause);
tenantViewTable = PhoenixRuntime.getTableNoCache(conn,
tenantViewNameStr); // --> First call to CQSI#getTable
- ServerMetadataCache cache = ServerMetadataCache.getInstance(config);
+ ServerMetadataCacheTestImpl cache = getServerMetadataCache();;
// Override the connection to use in ServerMetadataCache
cache.setConnectionForTesting(conn);
byte[] tenantIDBytes = Bytes.toBytes(tenantId);
@@ -246,7 +272,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
// Create a test table.
createTable(conn, tableNameStr, NEVER);
pTable = PhoenixRuntime.getTableNoCache(conn, tableNameStr);
- ServerMetadataCache cache = ServerMetadataCache.getInstance(config);
+ ServerMetadataCacheTestImpl cache = getServerMetadataCache();;
// Override the connection to use in ServerMetadataCache
cache.setConnectionForTesting(conn);
byte[] tableName = Bytes.toBytes(tableNameStr);
@@ -276,7 +302,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
// Create a test table.
createTable(conn, fullTableName, NEVER);
pTable = PhoenixRuntime.getTableNoCache(conn, fullTableName);
- ServerMetadataCache cache = ServerMetadataCache.getInstance(config);
+ ServerMetadataCacheTestImpl cache = getServerMetadataCache();;
// Override the connection to use in ServerMetadataCache
cache.setConnectionForTesting(conn);
byte[] tableNameBytes = Bytes.toBytes(fullTableName);
@@ -313,7 +339,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
try (Connection conn = DriverManager.getConnection(getUrl(), tenantProps)) {
createViewWhereClause(conn, tableNameStr, tenantViewNameStr, whereClause);
tenantViewTable = PhoenixRuntime.getTableNoCache(conn, tenantViewNameStr);
- ServerMetadataCache cache = ServerMetadataCache.getInstance(config);
+ ServerMetadataCacheTestImpl cache = getServerMetadataCache();;
// Override the connection to use in ServerMetadataCache
cache.setConnectionForTesting(conn);
byte[] tenantIDBytes = Bytes.toBytes(tenantId);
@@ -340,7 +366,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
String tableNameStr = generateUniqueName();
byte[] tableNameBytes = Bytes.toBytes(tableNameStr);
PTable pTable;
- ServerMetadataCache cache = ServerMetadataCache.getInstance(config);
+ ServerMetadataCacheTestImpl cache = getServerMetadataCache();;
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
conn.setAutoCommit(false);
// Create a test table.
@@ -377,7 +403,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
String tableNameStr = generateUniqueName();
byte[] tableNameBytes = Bytes.toBytes(tableNameStr);
PTable pTable;
- ServerMetadataCache cache = ServerMetadataCache.getInstance(config);
+ ServerMetadataCacheTestImpl cache = getServerMetadataCache();;
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
conn.setAutoCommit(false);
// Create a test table.
@@ -411,7 +437,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
String indexNameStr = "IND_" + generateUniqueName();
byte[] indexNameBytes = Bytes.toBytes(indexNameStr);
PTable indexTable;
- ServerMetadataCache cache = ServerMetadataCache.getInstance(config);
+ ServerMetadataCacheTestImpl cache = getServerMetadataCache();;
try (Connection conn = DriverManager.getConnection(url, props)) {
conn.setAutoCommit(false);
// Create a test table.
@@ -457,7 +483,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
byte[] tableNameBytes = Bytes.toBytes(tableName);
String indexName = generateUniqueName();
byte[] indexNameBytes = Bytes.toBytes(indexName);
- ServerMetadataCache cache = ServerMetadataCache.getInstance(config);
+ ServerMetadataCacheTestImpl cache = getServerMetadataCache();;
try (Connection conn = DriverManager.getConnection(getUrl())) {
conn.setAutoCommit(true);
createTable(conn, tableName, NEVER);
@@ -507,7 +533,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
String globalViewIndexName = "GV_IDX_" + generateUniqueName();
byte[] globalViewIndexNameBytes = Bytes.toBytes(globalViewIndexName);
- ServerMetadataCache cache = ServerMetadataCache.getInstance(config);
+ ServerMetadataCacheTestImpl cache = getServerMetadataCache();;
try(Connection conn = DriverManager.getConnection(getUrl());
Statement stmt = conn.createStatement()) {
String whereClause = " WHERE v1 < 1000";
@@ -614,7 +640,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
String tableName = generateUniqueName();
ConnectionQueryServices spyCqs1 = Mockito.spy(driver.getConnectionQueryServices(url1, props));
ConnectionQueryServices spyCqs2 = Mockito.spy(driver.getConnectionQueryServices(url2, props));
- ServerMetadataCache cache = null;
+ ServerMetadataCacheTestImpl cache = null;
try (Connection conn1 = spyCqs1.connect(url1, props);
Connection conn2 = spyCqs2.connect(url2, props)) {
@@ -624,11 +650,11 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
upsert(conn1, tableName, true);
// Instrument ServerMetadataCache to throw a SQLException once
- cache = ServerMetadataCache.getInstance(config);
- ServerMetadataCache spyCache = Mockito.spy(cache);
+ cache = getServerMetadataCache();;
+ ServerMetadataCacheTestImpl spyCache = Mockito.spy(cache);
Mockito.doThrow(new SQLException("FAIL")).doCallRealMethod().when(spyCache)
.getLastDDLTimestampForTable(any(), any(), eq(Bytes.toBytes(tableName)));
- ServerMetadataCache.setInstance(spyCache);
+ ServerMetadataCacheTestImpl.setInstance(serverName, spyCache);
// query using client-2 should succeed
query(conn2, tableName);
@@ -652,7 +678,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
ConnectionQueryServices spyCqs1 = Mockito.spy(driver.getConnectionQueryServices(url1, props));
ConnectionQueryServices spyCqs2 = Mockito.spy(driver.getConnectionQueryServices(url2, props));
int expectedNumCacheUpdates;
- ServerMetadataCache cache = null;
+ ServerMetadataCacheTestImpl cache = null;
try (Connection conn1 = spyCqs1.connect(url1, props);
Connection conn2 = spyCqs2.connect(url2, props)) {
@@ -674,11 +700,11 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
Mockito.reset(spyCqs2);
// Instrument ServerMetadataCache to throw a SQLException once
- cache = ServerMetadataCache.getInstance(config);
- ServerMetadataCache spyCache = Mockito.spy(cache);
+ cache = getServerMetadataCache();;
+ ServerMetadataCacheTestImpl spyCache = Mockito.spy(cache);
Mockito.doThrow(new SQLException("FAIL")).doCallRealMethod().when(spyCache)
.getLastDDLTimestampForTable(any(), any(), eq(Bytes.toBytes(tableName)));
- ServerMetadataCache.setInstance(spyCache);
+ ServerMetadataCacheTestImpl.setInstance(serverName, spyCache);
// query using client-2 should succeed, one cache update
query(conn2, tableName);
@@ -702,7 +728,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
String tableName = generateUniqueName();
ConnectionQueryServices spyCqs1 = Mockito.spy(driver.getConnectionQueryServices(url1, props));
ConnectionQueryServices spyCqs2 = Mockito.spy(driver.getConnectionQueryServices(url2, props));
- ServerMetadataCache cache = null;
+ ServerMetadataCacheTestImpl cache = null;
try (Connection conn1 = spyCqs1.connect(url1, props);
Connection conn2 = spyCqs2.connect(url2, props)) {
@@ -712,12 +738,12 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
upsert(conn1, tableName, true);
// Instrument ServerMetadataCache to throw a SQLException twice
- cache = ServerMetadataCache.getInstance(config);
- ServerMetadataCache spyCache = Mockito.spy(cache);
+ cache = getServerMetadataCache();;
+ ServerMetadataCacheTestImpl spyCache = Mockito.spy(cache);
SQLException e = new SQLException("FAIL");
Mockito.doThrow(e).when(spyCache)
.getLastDDLTimestampForTable(any(), any(), eq(Bytes.toBytes(tableName)));
- ServerMetadataCache.setInstance(spyCache);
+ ServerMetadataCacheTestImpl.setInstance(serverName, spyCache);
// query using client-2 should fail
query(conn2, tableName);
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java
index 3fb7bffb5a..ebea1f1eff 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl;
import org.apache.phoenix.hbase.index.IndexTableName;
import org.apache.phoenix.hbase.index.IndexTestingUtils;
import org.apache.phoenix.hbase.index.Indexer;
@@ -269,6 +270,7 @@ public class TestWALRecoveryCaching {
scanner.close();
index.close();
primary.close();
+ ServerMetadataCacheTestImpl.resetCache();
util.shutdownMiniCluster();
}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 527bcc6e73..341b808479 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -136,14 +136,14 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.SystemExitRule;
-import org.apache.phoenix.cache.ServerMetadataCache;
import org.apache.phoenix.compat.hbase.CompatUtil;
-import org.apache.phoenix.coprocessor.PhoenixRegionServerEndpoint;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
import org.apache.phoenix.end2end.ParallelStatsDisabledTest;
import org.apache.phoenix.end2end.ParallelStatsEnabledIT;
import org.apache.phoenix.end2end.ParallelStatsEnabledTest;
+import org.apache.phoenix.end2end.PhoenixRegionServerEndpointTestImpl;
+import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -493,7 +493,7 @@ public abstract class BaseTest {
} finally {
try {
// Clear ServerMetadataCache.
- ServerMetadataCache.resetCache();
+ ServerMetadataCacheTestImpl.resetCache();
utility.shutdownMiniCluster();
} catch (Throwable t) {
LOGGER.error("Exception caught when shutting down mini cluster", t);
@@ -646,13 +646,14 @@ public abstract class BaseTest {
}
/*
- Set property hbase.coprocessor.regionserver.classes to include PhoenixRegionServerEndpoint
- by default, if some other regionserver coprocs are not already present.
+ Set property hbase.coprocessor.regionserver.classes to include test implementation of
+ PhoenixRegionServerEndpoint by default, if some other regionserver coprocs
+ are not already present.
*/
private static void setPhoenixRegionServerEndpoint(Configuration conf) {
String value = conf.get(REGIONSERVER_COPROCESSOR_CONF_KEY);
if (value == null) {
- value = PhoenixRegionServerEndpoint.class.getName();
+ value = PhoenixRegionServerEndpointTestImpl.class.getName();
}
conf.set(REGIONSERVER_COPROCESSOR_CONF_KEY, value);
}