You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sh...@apache.org on 2023/12/22 16:44:46 UTC
(phoenix) branch PHOENIX-6883-feature updated: PHOENIX-7115 Create separate handler thread pool for invalidating server metadata cache (#1748)
This is an automated email from the ASF dual-hosted git repository.
shahrs87 pushed a commit to branch PHOENIX-6883-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-6883-feature by this push:
new 83134d9731 PHOENIX-7115 Create separate handler thread pool for invalidating server metadata cache (#1748)
83134d9731 is described below
commit 83134d9731ab12072d23573d2c38252489d306bb
Author: Rushabh Shah <sh...@apache.org>
AuthorDate: Fri Dec 22 08:44:40 2023 -0800
PHOENIX-7115 Create separate handler thread pool for invalidating server metadata cache (#1748)
---
.../FailingPhoenixRegionServerEndpoint.java | 4 +-
.../phoenix/end2end/InvalidateMetadataCacheIT.java | 4 +-
.../hadoop/hbase/ipc/PhoenixRpcScheduler.java | 37 +++-
.../hbase/ipc/PhoenixRpcSchedulerFactory.java | 14 +-
...java => InvalidateMetadataCacheController.java} | 36 ++--
... InvalidateMetadataCacheControllerFactory.java} | 26 +--
.../controller/ServerSideRPCControllerFactory.java | 3 -
.../phoenix/coprocessor/MetaDataEndpointImpl.java | 195 +----------------
.../phoenix/query/ConnectionQueryServices.java | 4 +
.../phoenix/query/ConnectionQueryServicesImpl.java | 236 +++++++++++++++++++--
.../query/ConnectionlessQueryServicesImpl.java | 7 +
.../query/DelegateConnectionQueryServices.java | 7 +
.../org/apache/phoenix/query/QueryServices.java | 10 +-
.../apache/phoenix/query/QueryServicesOptions.java | 2 +
...dulerTest.java => PhoenixRpcSchedulerTest.java} | 61 ++++--
.../phoenix/cache/ServerMetadataCacheTest.java | 26 ++-
16 files changed, 405 insertions(+), 267 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/FailingPhoenixRegionServerEndpoint.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/FailingPhoenixRegionServerEndpoint.java
index 7e40cfe76f..5f33610c29 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/FailingPhoenixRegionServerEndpoint.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/FailingPhoenixRegionServerEndpoint.java
@@ -27,8 +27,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
-import static org.apache.phoenix.coprocessor.MetaDataEndpointImpl.PHOENIX_METADATA_CACHE_INVALIDATION_TIMEOUT_MS;
-import static org.apache.phoenix.coprocessor.MetaDataEndpointImpl.PHOENIX_METADATA_CACHE_INVALIDATION_TIMEOUT_MS_DEFAULT;
+import static org.apache.phoenix.query.QueryServices.PHOENIX_METADATA_CACHE_INVALIDATION_TIMEOUT_MS;
+import static org.apache.phoenix.query.QueryServices.PHOENIX_METADATA_CACHE_INVALIDATION_TIMEOUT_MS_DEFAULT;
public class FailingPhoenixRegionServerEndpoint extends PhoenixRegionServerEndpoint {
private static final Logger LOGGER = LoggerFactory.getLogger(FailingPhoenixRegionServerEndpoint.class);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/InvalidateMetadataCacheIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/InvalidateMetadataCacheIT.java
index c0727b9d68..b8d1d55732 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/InvalidateMetadataCacheIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/InvalidateMetadataCacheIT.java
@@ -34,8 +34,8 @@ import java.util.Map;
import java.util.Properties;
import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY;
-import static org.apache.phoenix.coprocessor.MetaDataEndpointImpl.PHOENIX_METADATA_CACHE_INVALIDATION_TIMEOUT_MS;
-import static org.apache.phoenix.coprocessor.MetaDataEndpointImpl.PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED;
+import static org.apache.phoenix.query.QueryServices.PHOENIX_METADATA_CACHE_INVALIDATION_TIMEOUT_MS;
+import static org.apache.phoenix.query.QueryServices.PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.fail;
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java
index ea6a5c9719..0a04ad4787 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java
@@ -27,12 +27,13 @@ import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_INVALIDATE_CACHE_HANDLER_COUNT;
+
/**
* {@link RpcScheduler} that first checks to see if this is an index or metadata update before passing off the
* call to the delegate {@link RpcScheduler}.
*/
public class PhoenixRpcScheduler extends CompatPhoenixRpcScheduler {
-
// copied from org.apache.hadoop.hbase.ipc.SimpleRpcScheduler in HBase 0.98.4
private static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY = "ipc.server.callqueue.handler.factor";
private static final String CALLQUEUE_LENGTH_CONF_KEY = "ipc.server.max.callqueue.length";
@@ -41,28 +42,44 @@ public class PhoenixRpcScheduler extends CompatPhoenixRpcScheduler {
private int indexPriority;
private int metadataPriority;
private int serverSidePriority;
+ private int invalidateMetadataCachePriority;
private RpcExecutor indexCallExecutor;
private RpcExecutor metadataCallExecutor;
private RpcExecutor serverSideCallExecutor;
+ // Executor for invalidating server side metadata cache RPCs.
+ private RpcExecutor invalidateMetadataCacheCallExecutor;
private int port;
- public PhoenixRpcScheduler(Configuration conf, RpcScheduler delegate, int indexPriority, int metadataPriority, int serversidePriority, PriorityFunction priorityFunction, Abortable abortable) {
+ public PhoenixRpcScheduler(Configuration conf, RpcScheduler delegate, int indexPriority,
+ int metadataPriority, int serversidePriority,
+ int invalidateMetadataCachePriority,
+ PriorityFunction priorityFunction, Abortable abortable) {
// copied from org.apache.hadoop.hbase.ipc.SimpleRpcScheduler in HBase 0.98.4
int indexHandlerCount = conf.getInt(QueryServices.INDEX_HANDLER_COUNT_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_HANDLER_COUNT);
int metadataHandlerCount = conf.getInt(QueryServices.METADATA_HANDLER_COUNT_ATTRIB, QueryServicesOptions.DEFAULT_METADATA_HANDLER_COUNT);
int serverSideHandlerCount = conf.getInt(QueryServices.SERVER_SIDE_HANDLER_COUNT_ATTRIB, QueryServicesOptions.DEFAULT_SERVERSIDE_HANDLER_COUNT);
+ int invalidateMetadataCacheHandlerCount = conf.getInt(
+ QueryServices.INVALIDATE_CACHE_HANDLER_COUNT_ATTRIB,
+ DEFAULT_INVALIDATE_CACHE_HANDLER_COUNT);
int maxIndexQueueLength = conf.getInt(CALLQUEUE_LENGTH_CONF_KEY, indexHandlerCount*DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
int maxMetadataQueueLength = conf.getInt(CALLQUEUE_LENGTH_CONF_KEY, metadataHandlerCount*DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
int maxServerSideQueueLength = conf.getInt(CALLQUEUE_LENGTH_CONF_KEY, serverSideHandlerCount*DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
+ int maxInvalidateMetadataCacheQueueLength = conf.getInt(CALLQUEUE_LENGTH_CONF_KEY,
+ invalidateMetadataCacheHandlerCount * DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
+
this.indexPriority = indexPriority;
this.metadataPriority = metadataPriority;
this.serverSidePriority = serversidePriority;
+ this.invalidateMetadataCachePriority = invalidateMetadataCachePriority;
this.delegate = delegate;
this.indexCallExecutor = new BalancedQueueRpcExecutor("Index", indexHandlerCount, maxIndexQueueLength, priorityFunction,conf,abortable);
this.metadataCallExecutor = new BalancedQueueRpcExecutor("Metadata", metadataHandlerCount, maxMetadataQueueLength, priorityFunction,conf,abortable);
this.serverSideCallExecutor = new BalancedQueueRpcExecutor("ServerSide", serverSideHandlerCount, maxServerSideQueueLength, priorityFunction,conf,abortable);
+ this.invalidateMetadataCacheCallExecutor = new BalancedQueueRpcExecutor(
+ "InvalidateMetadataCache", invalidateMetadataCacheHandlerCount,
+ maxInvalidateMetadataCacheQueueLength, priorityFunction, conf, abortable);
}
@Override
@@ -77,6 +94,7 @@ public class PhoenixRpcScheduler extends CompatPhoenixRpcScheduler {
indexCallExecutor.start(port);
metadataCallExecutor.start(port);
serverSideCallExecutor.start(port);
+ invalidateMetadataCacheCallExecutor.start(port);
}
@Override
@@ -85,6 +103,7 @@ public class PhoenixRpcScheduler extends CompatPhoenixRpcScheduler {
indexCallExecutor.stop();
metadataCallExecutor.stop();
serverSideCallExecutor.stop();
+ invalidateMetadataCacheCallExecutor.stop();
}
@Override
@@ -97,6 +116,8 @@ public class PhoenixRpcScheduler extends CompatPhoenixRpcScheduler {
return metadataCallExecutor.dispatch(callTask);
} else if (serverSidePriority == priority) {
return serverSideCallExecutor.dispatch(callTask);
+ } else if (invalidateMetadataCachePriority == priority) {
+ return invalidateMetadataCacheCallExecutor.dispatch(callTask);
} else {
return delegate.dispatch(callTask);
}
@@ -114,7 +135,8 @@ public class PhoenixRpcScheduler extends CompatPhoenixRpcScheduler {
return this.delegate.getGeneralQueueLength()
+ this.indexCallExecutor.getQueueLength()
+ this.metadataCallExecutor.getQueueLength()
- + this.serverSideCallExecutor.getQueueLength();
+ + this.serverSideCallExecutor.getQueueLength()
+ + this.invalidateMetadataCacheCallExecutor.getQueueLength();
}
@Override
@@ -132,7 +154,8 @@ public class PhoenixRpcScheduler extends CompatPhoenixRpcScheduler {
return this.delegate.getActiveRpcHandlerCount()
+ this.indexCallExecutor.getActiveHandlerCount()
+ this.metadataCallExecutor.getActiveHandlerCount()
- + this.serverSideCallExecutor.getActiveHandlerCount();
+ + this.serverSideCallExecutor.getActiveHandlerCount()
+ + this.invalidateMetadataCacheCallExecutor.getActiveHandlerCount();
}
@Override
@@ -155,6 +178,11 @@ public class PhoenixRpcScheduler extends CompatPhoenixRpcScheduler {
this.metadataCallExecutor = executor;
}
+ @VisibleForTesting
+ public void setInvalidateMetadataCacheExecutorForTesting(RpcExecutor executor) {
+ this.invalidateMetadataCacheCallExecutor = executor;
+ }
+
@VisibleForTesting
public void setServerSideExecutorForTesting(RpcExecutor executor) {
this.serverSideCallExecutor = executor;
@@ -229,5 +257,4 @@ public class PhoenixRpcScheduler extends CompatPhoenixRpcScheduler {
public int getActiveReplicationRpcHandlerCount() {
return this.delegate.getActiveReplicationRpcHandlerCount();
}
-
}
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerFactory.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerFactory.java
index 74adf01573..d57828c260 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerFactory.java
@@ -63,12 +63,16 @@ public class PhoenixRpcSchedulerFactory implements RpcSchedulerFactory {
validatePriority(serverSidePriority);
// validate index and metadata priorities are not the same
- Preconditions.checkArgument(indexPriority != metadataPriority, "Index and Metadata priority must not be same "+ indexPriority);
+ Preconditions.checkArgument(indexPriority != metadataPriority,
+ "Index and Metadata priority must not be same " + indexPriority);
LOGGER.info("Using custom Phoenix Index RPC Handling with index rpc priority "
+ indexPriority + " and metadata rpc priority " + metadataPriority);
- PhoenixRpcScheduler scheduler =
- new PhoenixRpcScheduler(conf, delegate, indexPriority, metadataPriority, serverSidePriority, priorityFunction,abortable);
+ int invalidateCachePriority = getInvalidateMetadataCachePriority(conf);
+ validatePriority(invalidateCachePriority);
+ PhoenixRpcScheduler scheduler = new PhoenixRpcScheduler(conf, delegate, indexPriority,
+ metadataPriority, serverSidePriority, invalidateCachePriority, priorityFunction,
+ abortable);
return scheduler;
}
@@ -97,4 +101,8 @@ public class PhoenixRpcSchedulerFactory implements RpcSchedulerFactory {
return conf.getInt(QueryServices.SERVER_SIDE_PRIOIRTY_ATTRIB, QueryServicesOptions.DEFAULT_SERVER_SIDE_PRIORITY);
}
+ public static int getInvalidateMetadataCachePriority(Configuration conf) {
+ return conf.getInt(QueryServices.INVALIDATE_METADATA_CACHE_PRIORITY_ATTRIB,
+ QueryServicesOptions.DEFAULT_INVALIDATE_METADATA_CACHE_PRIORITY);
+ }
}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerSideRPCControllerFactory.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InvalidateMetadataCacheController.java
similarity index 51%
copy from phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerSideRPCControllerFactory.java
copy to phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InvalidateMetadataCacheController.java
index ba7fb6339d..3c9eda613d 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerSideRPCControllerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InvalidateMetadataCacheController.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,24 +18,34 @@
package org.apache.hadoop.hbase.ipc.controller;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ipc.DelegatingHBaseRpcController;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory;
/**
- * {@link RpcControllerFactory} that should only be used when
- * making server-server remote RPCs to the region servers hosting Phoenix SYSTEM tables.
+ * Controller used to invalidate server side metadata cache RPCs.
*/
-public class ServerSideRPCControllerFactory {
+public class InvalidateMetadataCacheController extends DelegatingHBaseRpcController {
+ private int priority;
+
+ public InvalidateMetadataCacheController(HBaseRpcController delegate, Configuration conf) {
+ super(delegate);
+ this.priority = PhoenixRpcSchedulerFactory.getInvalidateMetadataCachePriority(conf);
+ }
- private static final Logger LOG = LoggerFactory.getLogger(ServerSideRPCControllerFactory.class);
- protected final Configuration conf;
+ @Override
+ public void setPriority(int priority) {
+ this.priority = priority;
+ }
- public ServerSideRPCControllerFactory(Configuration conf) {
- this.conf = conf;
+ @Override
+ public void setPriority(TableName tn) {
+ // Nothing
}
- public ServerToServerRpcController newController() {
- return new ServerToServerRpcControllerImpl(this.conf);
+ @Override
+ public int getPriority() {
+ return this.priority;
}
}
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerSideRPCControllerFactory.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InvalidateMetadataCacheControllerFactory.java
similarity index 61%
copy from phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerSideRPCControllerFactory.java
copy to phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InvalidateMetadataCacheControllerFactory.java
index ba7fb6339d..ee6b3b24ff 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerSideRPCControllerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InvalidateMetadataCacheControllerFactory.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,24 +18,24 @@
package org.apache.hadoop.hbase.ipc.controller;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
- * {@link RpcControllerFactory} that should only be used when
- * making server-server remote RPCs to the region servers hosting Phoenix SYSTEM tables.
+ * Factory to instantiate InvalidateMetadataCacheControllers
*/
-public class ServerSideRPCControllerFactory {
-
- private static final Logger LOG = LoggerFactory.getLogger(ServerSideRPCControllerFactory.class);
- protected final Configuration conf;
+public class InvalidateMetadataCacheControllerFactory extends RpcControllerFactory {
+ public InvalidateMetadataCacheControllerFactory(Configuration conf) {
+ super(conf);
+ }
- public ServerSideRPCControllerFactory(Configuration conf) {
- this.conf = conf;
+ @Override
+ public HBaseRpcController newController() {
+ HBaseRpcController delegate = super.newController();
+ return getController(delegate);
}
- public ServerToServerRpcController newController() {
- return new ServerToServerRpcControllerImpl(this.conf);
+ private HBaseRpcController getController(HBaseRpcController delegate) {
+ return new InvalidateMetadataCacheController(delegate, conf);
}
}
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerSideRPCControllerFactory.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerSideRPCControllerFactory.java
index ba7fb6339d..a1a97cf6ce 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerSideRPCControllerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerSideRPCControllerFactory.java
@@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.ipc.controller;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* {@link RpcControllerFactory} that should only be used when
@@ -28,7 +26,6 @@ import org.slf4j.LoggerFactory;
*/
public class ServerSideRPCControllerFactory {
- private static final Logger LOG = LoggerFactory.getLogger(ServerSideRPCControllerFactory.class);
protected final Configuration conf;
public ServerSideRPCControllerFactory(Configuration conf) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index c9aa4acb73..28d561e465 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -101,23 +101,15 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
@@ -133,11 +125,9 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagUtil;
-import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Mutation;
@@ -154,12 +144,10 @@ import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.ipc.RpcUtil;
-import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.Region.RowLock;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.VersionInfo;
@@ -187,9 +175,6 @@ import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionRequest
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionResponse;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest;
-import org.apache.phoenix.coprocessor.generated.RegionServerEndpointProtos;
-import org.apache.phoenix.coprocessor.metrics.MetricsMetadataCachingSource;
-import org.apache.phoenix.coprocessor.metrics.MetricsPhoenixCoprocessorSourceFactory;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.expression.Expression;
@@ -268,7 +253,6 @@ import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixKeyValueUtil;
import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.PhoenixStopWatch;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
@@ -285,7 +269,6 @@ import com.google.protobuf.ByteString;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
-import com.google.protobuf.ServiceException;
/**
* Endpoint co-processor through which all Phoenix metadata mutations flow.
@@ -328,12 +311,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements RegionCopr
// Column to track tables that have been upgraded based on PHOENIX-2067
public static final String ROW_KEY_ORDER_OPTIMIZABLE = "ROW_KEY_ORDER_OPTIMIZABLE";
public static final byte[] ROW_KEY_ORDER_OPTIMIZABLE_BYTES = Bytes.toBytes(ROW_KEY_ORDER_OPTIMIZABLE);
- public static final String PHOENIX_METADATA_CACHE_INVALIDATION_TIMEOUT_MS =
- "phoenix.metadata.cache.invalidation.timeoutMs";
- // Default to 10 seconds.
- public static final long PHOENIX_METADATA_CACHE_INVALIDATION_TIMEOUT_MS_DEFAULT = 10 * 1000;
- public static final String PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED =
- "phoenix.metadata.invalidate.cache.enabled";
+
// KeyValues for Table
private static final Cell TABLE_TYPE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY,
TABLE_FAMILY_BYTES, TABLE_TYPE_BYTES);
@@ -612,8 +590,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
private boolean allowSplittableSystemCatalogRollback;
private MetricsMetadataSource metricsSource;
- private MetricsMetadataCachingSource metricsMetadataCachingSource;
- private long metadataCacheInvalidationTimeoutMs;
+
public static void setFailConcurrentMutateAddColumnOneTimeForTesting(boolean fail) {
failConcurrentMutateAddColumnOneTimeForTesting = fail;
}
@@ -649,16 +626,11 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
new ReadOnlyProps(config.iterator()));
this.allowSplittableSystemCatalogRollback = config.getBoolean(QueryServices.ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK,
QueryServicesOptions.DEFAULT_ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK);
- this.metadataCacheInvalidationTimeoutMs = config.getLong(
- PHOENIX_METADATA_CACHE_INVALIDATION_TIMEOUT_MS,
- PHOENIX_METADATA_CACHE_INVALIDATION_TIMEOUT_MS_DEFAULT);
LOGGER.info("Starting Tracing-Metrics Systems");
// Start the phoenix trace collection
Tracing.addTraceMetricsSource();
Metrics.ensureConfigured();
metricsSource = MetricsMetadataSourceFactory.getMetadataMetricsSource();
- metricsMetadataCachingSource
- = MetricsPhoenixCoprocessorSourceFactory.getInstance().getMetadataCachingSource();
}
@Override
@@ -3483,170 +3455,17 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
}
}
- /**
- * Invalidate metadata cache from all region servers for the given list of
- * InvalidateServerMetadataCacheRequest.
- * @throws Throwable
- */
private void invalidateServerMetadataCache(List<InvalidateServerMetadataCacheRequest> requests)
throws Throwable {
- Configuration conf = env.getConfiguration();
- boolean invalidateCacheEnabled = conf.getBoolean(PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED,
- false);
- if (!invalidateCacheEnabled) {
- LOGGER.info("Skip invalidating server metadata cache since conf property"
- + " phoenix.metadata.invalidate.cache.enabled is set to false");
- return;
- }
- metricsMetadataCachingSource.incrementMetadataCacheInvalidationOperationsCount();
Properties properties = new Properties();
// Skip checking of system table existence since the system tables should have created
// by now.
properties.setProperty(SKIP_SYSTEM_TABLES_EXISTENCE_CHECK, "true");
try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(properties,
- env.getConfiguration()).unwrap(PhoenixConnection.class);
- Admin admin = connection.getQueryServices().getAdmin()) {
- // This will incur an extra RPC to the master. This RPC is required since we want to
- // get current list of regionservers.
- Collection<ServerName> serverNames = admin.getRegionServers(true);
- PhoenixStopWatch stopWatch = new PhoenixStopWatch().start();
- try {
- invalidateServerMetadataCacheWithRetries(admin, serverNames, requests, false);
- metricsMetadataCachingSource.incrementMetadataCacheInvalidationSuccessCount();
- } catch (Throwable t) {
- metricsMetadataCachingSource.incrementMetadataCacheInvalidationFailureCount();
- throw t;
- } finally {
- metricsMetadataCachingSource
- .addMetadataCacheInvalidationTotalTime(stopWatch.stop().elapsedMillis());
- }
- }
- }
-
- /**
- * Invalidate metadata cache on all regionservers with retries for the given list of
- * InvalidateServerMetadataCacheRequest. Each InvalidateServerMetadataCacheRequest contains
- * tenantID, schema name and table name.
- * We retry once before failing the operation.
- *
- * @param admin
- * @param serverNames
- * @param invalidateCacheRequests
- * @param isRetry
- * @throws Throwable
- */
- private void invalidateServerMetadataCacheWithRetries(Admin admin,
- Collection<ServerName> serverNames,
- List<InvalidateServerMetadataCacheRequest> invalidateCacheRequests,
- boolean isRetry) throws Throwable {
- RegionServerEndpointProtos.InvalidateServerMetadataCacheRequest protoRequest =
- getRequest(invalidateCacheRequests);
- // TODO Do I need my own executor or can I re-use QueryServices#Executor
- // since it is supposed to be used only for scans according to documentation?
- List<CompletableFuture<Void>> futures = new ArrayList<>();
- Map<Future, ServerName> map = new HashMap<>();
- for (ServerName serverName : serverNames) {
- CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
- try {
- PhoenixStopWatch innerWatch = new PhoenixStopWatch().start();
- // TODO Using the same as ServerCacheClient but need to think if we need some
- // special controller for invalidating cache since this is in the path of
- // DDL operations. We also need to think of we need separate RPC handler
- // threads for this?
- ServerRpcController controller = new ServerRpcController();
- for (InvalidateServerMetadataCacheRequest invalidateCacheRequest
- : invalidateCacheRequests) {
- LOGGER.info("Sending invalidate metadata cache for {} to region server:"
- + " {}", invalidateCacheRequest.toString(), serverName);
- }
- RegionServerEndpointProtos.RegionServerEndpointService.BlockingInterface
- service = RegionServerEndpointProtos.RegionServerEndpointService
- .newBlockingStub(admin.coprocessorService(serverName));
- // The timeout for this particular request is managed by config parameter:
- // hbase.rpc.timeout. Even if the future times out, this runnable can be in
- // RUNNING state and will not be interrupted.
- service.invalidateServerMetadataCache(controller, protoRequest);
- long cacheInvalidationTime = innerWatch.stop().elapsedMillis();
- LOGGER.info("Invalidating metadata cache"
- + " on region server: {} completed successfully and it took {} ms",
- serverName, cacheInvalidationTime);
- metricsMetadataCachingSource
- .addMetadataCacheInvalidationRpcTime(cacheInvalidationTime);
- } catch (ServiceException se) {
- LOGGER.error("Invalidating metadata cache failed for regionserver {}",
- serverName, se);
- IOException ioe = ServerUtil.parseServiceException(se);
- throw new CompletionException(ioe);
- }
- });
- futures.add(future);
- map.put(future, serverName);
- }
-
- // Here we create one master like future which tracks individual future
- // for each region server.
- CompletableFuture<Void> allFutures = CompletableFuture.allOf(
- futures.toArray(new CompletableFuture[0]));
- try {
- allFutures.get(metadataCacheInvalidationTimeoutMs, TimeUnit.MILLISECONDS);
- } catch (Throwable t) {
- List<ServerName> failedServers = getFailedServers(futures, map);
- LOGGER.error("Invalidating metadata cache for failed for region servers: {}",
- failedServers, t);
- if (isRetry) {
- // If this is a retry attempt then just fail the operation.
- if (allFutures.isCompletedExceptionally()) {
- if (t instanceof ExecutionException) {
- t = t.getCause();
- }
- }
- throw t;
- } else {
- // This is the first attempt, we can retry once.
- // Indicate that this is a retry attempt.
- invalidateServerMetadataCacheWithRetries(admin, failedServers,
- invalidateCacheRequests, true);
- }
- }
- }
-
- /*
- Get the list of regionservers that failed the invalidateCache rpc.
- */
- private List<ServerName> getFailedServers(List<CompletableFuture<Void>> futures,
- Map<Future, ServerName> map) {
- List<ServerName> failedServers = new ArrayList<>();
- for (CompletableFuture completedFuture : futures) {
- if (completedFuture.isDone() == false) {
- // If this task is still running, cancel it and keep in retry list.
- ServerName sn = map.get(completedFuture);
- failedServers.add(sn);
- // Even though we cancel this future but it doesn't interrupt the executing thread.
- completedFuture.cancel(true);
- } else if (completedFuture.isCompletedExceptionally()
- || completedFuture.isCancelled()) {
- // This means task is done but completed with exception
- // or was canceled. Add it to retry list.
- ServerName sn = map.get(completedFuture);
- failedServers.add(sn);
- }
- }
- return failedServers;
- }
-
- private RegionServerEndpointProtos.InvalidateServerMetadataCacheRequest getRequest(
- List<InvalidateServerMetadataCacheRequest> requests) {
- RegionServerEndpointProtos.InvalidateServerMetadataCacheRequest.Builder builder =
- RegionServerEndpointProtos.InvalidateServerMetadataCacheRequest.newBuilder();
- for (InvalidateServerMetadataCacheRequest request: requests) {
- RegionServerEndpointProtos.InvalidateServerMetadataCache.Builder innerBuilder
- = RegionServerEndpointProtos.InvalidateServerMetadataCache.newBuilder();
- innerBuilder.setTenantId(ByteStringer.wrap(request.getTenantId()));
- innerBuilder.setSchemaName(ByteStringer.wrap(request.getSchemaName()));
- innerBuilder.setTableName(ByteStringer.wrap(request.getTableName()));
- builder.addInvalidateServerMetadataCacheRequests(innerBuilder.build());
+ env.getConfiguration()).unwrap(PhoenixConnection.class)) {
+ ConnectionQueryServices queryServices = connection.getQueryServices();
+ queryServices.invalidateServerMetadataCache(requests);
}
- return builder.build();
}
/**
@@ -3830,7 +3649,9 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
buildTable(key, cacheKey, region, clientTimeStamp, clientVersion);
return table;
} finally {
- if (!wasLocked && rowLock != null) rowLock.release();
+ if (!wasLocked && rowLock != null) {
+ rowLock.release();
+ }
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index e527023bc2..45315f89a3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.compile.MutationPlan;
+import org.apache.phoenix.coprocessor.InvalidateServerMetadataCacheRequest;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.MutationState;
@@ -230,4 +231,7 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated
}
int getConnectionCount(boolean isInternal);
+
+ void invalidateServerMetadataCache(List<InvalidateServerMetadataCacheRequest> requests)
+ throws Throwable;
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 3a83a878be..94f9d557fd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder.KEEP_
import static org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder.MAX_VERSIONS;
import static org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder.REPLICATION_SCOPE;
import static org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder.TTL;
+import static org.apache.hadoop.hbase.ipc.RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY;
import static org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP;
import static org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_15_0;
import static org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0;
@@ -109,11 +110,14 @@ import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
@@ -127,6 +131,7 @@ import java.util.regex.Pattern;
import javax.annotation.concurrent.GuardedBy;
import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
@@ -159,7 +164,9 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.ipc.controller.InvalidateMetadataCacheControllerFactory;
import org.apache.hadoop.hbase.ipc.controller.ServerToServerRpcController;
import org.apache.hadoop.hbase.ipc.controller.ServerSideRPCControllerFactory;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
@@ -176,6 +183,7 @@ import org.apache.hadoop.ipc.RemoteException;
import org.apache.phoenix.compile.MutationPlan;
import org.apache.phoenix.coprocessor.ChildLinkMetaDataEndpoint;
import org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver;
+import org.apache.phoenix.coprocessor.InvalidateServerMetadataCacheRequest;
import org.apache.phoenix.coprocessor.MetaDataEndpointImpl;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
@@ -211,6 +219,9 @@ import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionRespons
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest;
+import org.apache.phoenix.coprocessor.generated.RegionServerEndpointProtos;
+import org.apache.phoenix.coprocessor.metrics.MetricsMetadataCachingSource;
+import org.apache.phoenix.coprocessor.metrics.MetricsPhoenixCoprocessorSourceFactory;
import org.apache.phoenix.exception.InvalidRegionSplitPolicyException;
import org.apache.phoenix.exception.PhoenixIOException;
import org.apache.phoenix.exception.RetriableUpgradeException;
@@ -409,6 +420,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
// writes guarded by "liveRegionServersLock"
private volatile List<ServerName> liveRegionServers;
private final Object liveRegionServersLock = new Object();
+ // Writes guarded by invalidateMetadataCacheConnLock
+ private Connection invalidateMetadataCacheConnection = null;
+ private final Object invalidateMetadataCacheConnLock = new Object();
+ private MetricsMetadataCachingSource metricsMetadataCachingSource;
+ public static final String INVALIDATE_SERVER_METADATA_CACHE_EX_MESSAGE =
+ "Cannot invalidate server metadata cache on a non-server connection";
private static interface FeatureSupported {
boolean isSupported(ConnectionQueryServices services);
@@ -541,35 +558,61 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
e.printStackTrace();
}
}
-
+ nSequenceSaltBuckets = config.getInt(QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB,
+ QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS);
+ this.metricsMetadataCachingSource = MetricsPhoenixCoprocessorSourceFactory.getInstance()
+ .getMetadataCachingSource();
}
- private void openConnection() throws SQLException {
+ private Connection openConnection(Configuration conf) throws SQLException {
+ Connection localConnection;
try {
- this.connection = HBaseFactoryProvider.getHConnectionFactory().createConnection(this.config);
+ localConnection = HBaseFactoryProvider.getHConnectionFactory().createConnection(conf);
GLOBAL_HCONNECTIONS_COUNTER.increment();
LOGGER.info("HConnection established. Stacktrace for informational purposes: "
- + connection + " " + LogUtil.getCallerStackTrace());
+ + localConnection + " " + LogUtil.getCallerStackTrace());
} catch (IOException e) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION)
.setRootCause(e).build().buildException();
}
- if (this.connection.isClosed()) { // TODO: why the heck doesn't this throw above?
+ if (localConnection.isClosed()) { // TODO: why the heck doesn't this throw above?
throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION).build().buildException();
}
+ return localConnection;
+ }
+
+ /**
+ * We create a long-lived hbase connection to run invalidate cache RPCs. We override
+ * CUSTOM_CONTROLLER_CONF_KEY to instantiate InvalidateMetadataCacheController which has
+ * a special priority for invalidate metadata cache operations.
+ * @return hbase connection
+ * @throws SQLException SQLException
+ */
+ public Connection getInvalidateMetadataCacheConnection() throws SQLException {
+ if (invalidateMetadataCacheConnection != null) {
+ return invalidateMetadataCacheConnection;
+ }
+
+ synchronized (invalidateMetadataCacheConnLock) {
+ Configuration clonedConfiguration = PropertiesUtil.cloneConfig(this.config);
+ clonedConfiguration.setClass(CUSTOM_CONTROLLER_CONF_KEY,
+ InvalidateMetadataCacheControllerFactory.class, RpcControllerFactory.class);
+ invalidateMetadataCacheConnection = openConnection(clonedConfiguration);
+ }
+ return invalidateMetadataCacheConnection;
}
/**
* Close the HBase connection and decrement the counter.
* @throws IOException throws IOException
*/
- private void closeConnection() throws IOException {
+ private void closeConnection(Connection connection) throws IOException {
if (connection != null) {
connection.close();
LOGGER.info("{} HConnection closed. Stacktrace for informational"
+ " purposes: {}", connection, LogUtil.getCallerStackTrace());
+ GLOBAL_HCONNECTIONS_COUNTER.decrement();
}
- GLOBAL_HCONNECTIONS_COUNTER.decrement();
}
@Override
@@ -678,8 +721,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
latestMetaDataLock.notifyAll();
}
try {
- // close the HBase connection
- closeConnection();
+ // close HBase connections.
+ closeConnection(this.connection);
+ closeConnection(this.invalidateMetadataCacheConnection);
} finally {
if (renewLeaseExecutor != null) {
renewLeaseExecutor.shutdownNow();
@@ -3529,7 +3573,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
try {
GLOBAL_QUERY_SERVICES_COUNTER.increment();
LOGGER.info("An instance of ConnectionQueryServices was created.");
- openConnection();
+ connection = openConnection(config);
hConnectionEstablished = true;
boolean lastDDLTimestampValidationEnabled
= getProps().getBoolean(
@@ -3546,9 +3590,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
success = true;
return null;
}
- nSequenceSaltBuckets = ConnectionQueryServicesImpl.this.props.getInt(
- QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB,
- QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS);
boolean isDoNotUpgradePropSet = UpgradeUtil.isNoUpgradeSet(props);
Properties scnProps = PropertiesUtil.deepCopy(props);
scnProps.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
@@ -3639,7 +3680,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
try {
if (!success && hConnectionEstablished) {
- closeConnection();
+ closeConnection(connection);
+ closeConnection(invalidateMetadataCacheConnection);
}
} catch (IOException e) {
SQLException ex = new SQLException(e);
@@ -4459,7 +4501,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
} else {
nSequenceSaltBuckets = getSaltBuckets(e);
}
-
updateSystemSequenceWithCacheOnWriteProps(metaConnection);
}
return metaConnection;
@@ -6232,4 +6273,169 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
public List<LinkedBlockingQueue<WeakReference<PhoenixConnection>>> getCachedConnections() {
return connectionQueues;
}
+
+ /**
+ * Invalidate metadata cache from all region servers for the given list of
+ * InvalidateServerMetadataCacheRequest.
+ * @throws Throwable
+ */
+ public void invalidateServerMetadataCache(List<InvalidateServerMetadataCacheRequest> requests)
+ throws Throwable {
+ boolean invalidateCacheEnabled =
+ config.getBoolean(PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED, false);
+ if (!invalidateCacheEnabled) {
+ LOGGER.info("Skip invalidating server metadata cache since conf property"
+ + " phoenix.metadata.invalidate.cache.enabled is set to false");
+ return;
+ }
+ if (!QueryUtil.isServerConnection(props)) {
+ LOGGER.warn(INVALIDATE_SERVER_METADATA_CACHE_EX_MESSAGE);
+ throw new Exception(INVALIDATE_SERVER_METADATA_CACHE_EX_MESSAGE);
+ }
+
+ metricsMetadataCachingSource.incrementMetadataCacheInvalidationOperationsCount();
+ Admin admin = getInvalidateMetadataCacheConnection().getAdmin();
+ // This will incur an extra RPC to the master. This RPC is required since we want to
+ // get current list of regionservers.
+ Collection<ServerName> serverNames = admin.getRegionServers(true);
+ PhoenixStopWatch stopWatch = new PhoenixStopWatch().start();
+ try {
+ invalidateServerMetadataCacheWithRetries(admin, serverNames, requests, false);
+ metricsMetadataCachingSource.incrementMetadataCacheInvalidationSuccessCount();
+ } catch (Throwable t) {
+ metricsMetadataCachingSource.incrementMetadataCacheInvalidationFailureCount();
+ throw t;
+ } finally {
+ metricsMetadataCachingSource
+ .addMetadataCacheInvalidationTotalTime(stopWatch.stop().elapsedMillis());
+ }
+ }
+
+ /**
+ * Invalidate metadata cache on all regionservers with retries for the given list of
+ * InvalidateServerMetadataCacheRequest. Each InvalidateServerMetadataCacheRequest contains
+ * tenantID, schema name and table name.
+ * We retry once before failing the operation.
+ *
+ * @param admin
+ * @param serverNames
+ * @param invalidateCacheRequests
+ * @param isRetry
+ * @throws Throwable
+ */
+ private void invalidateServerMetadataCacheWithRetries(Admin admin,
+ Collection<ServerName> serverNames,
+ List<InvalidateServerMetadataCacheRequest> invalidateCacheRequests,
+ boolean isRetry) throws Throwable {
+ RegionServerEndpointProtos.InvalidateServerMetadataCacheRequest protoRequest =
+ getRequest(invalidateCacheRequests);
+ // TODO Do I need my own executor or can I re-use QueryServices#Executor
+ // since it is supposed to be used only for scans according to documentation?
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ Map<Future, ServerName> map = new HashMap<>();
+ for (ServerName serverName : serverNames) {
+ CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
+ try {
+ PhoenixStopWatch innerWatch = new PhoenixStopWatch().start();
+ for (InvalidateServerMetadataCacheRequest invalidateCacheRequest
+ : invalidateCacheRequests) {
+ LOGGER.info("Sending invalidate metadata cache for {} to region server:"
+ + " {}", invalidateCacheRequest.toString(), serverName);
+ }
+
+ RegionServerEndpointProtos.RegionServerEndpointService.BlockingInterface
+ service = RegionServerEndpointProtos.RegionServerEndpointService
+ .newBlockingStub(admin.coprocessorService(serverName));
+ // The timeout for this particular request is managed by config parameter:
+ // hbase.rpc.timeout. Even if the future times out, this runnable can be in
+ // RUNNING state and will not be interrupted.
+ // We use the controller set in hbase connection.
+ service.invalidateServerMetadataCache(null, protoRequest);
+ long cacheInvalidationTime = innerWatch.stop().elapsedMillis();
+ LOGGER.info("Invalidating metadata cache"
+ + " on region server: {} completed successfully and it took {} ms",
+ serverName, cacheInvalidationTime);
+ metricsMetadataCachingSource
+ .addMetadataCacheInvalidationRpcTime(cacheInvalidationTime);
+ } catch (ServiceException se) {
+ LOGGER.error("Invalidating metadata cache failed for regionserver {}",
+ serverName, se);
+ IOException ioe = ServerUtil.parseServiceException(se);
+ throw new CompletionException(ioe);
+ }
+ });
+ futures.add(future);
+ map.put(future, serverName);
+ }
+ // Here we create one master like future which tracks individual future
+ // for each region server.
+ CompletableFuture<Void> allFutures = CompletableFuture.allOf(
+ futures.toArray(new CompletableFuture[0]));
+ long metadataCacheInvalidationTimeoutMs = config.getLong(
+ PHOENIX_METADATA_CACHE_INVALIDATION_TIMEOUT_MS,
+ PHOENIX_METADATA_CACHE_INVALIDATION_TIMEOUT_MS_DEFAULT);
+ try {
+ allFutures.get(metadataCacheInvalidationTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (Throwable t) {
+ List<ServerName> failedServers = getFailedServers(futures, map);
+ LOGGER.error("Invalidating metadata cache for failed for region servers: {}",
+ failedServers, t);
+ if (isRetry) {
+ // If this is a retry attempt then just fail the operation.
+ if (allFutures.isCompletedExceptionally()) {
+ if (t instanceof ExecutionException) {
+ t = t.getCause();
+ }
+ }
+ throw t;
+ } else {
+ // This is the first attempt, we can retry once.
+ // Indicate that this is a retry attempt.
+ invalidateServerMetadataCacheWithRetries(admin, failedServers,
+ invalidateCacheRequests, true);
+ }
+ }
+ }
+
+ /**
+ * Get the list of regionservers that failed the invalidateCache rpc.
+ * @param futures futtures
+ * @param map map of future to server names
+ * @return the list of servers that failed the invalidateCache RPC.
+ */
+ private List<ServerName> getFailedServers(List<CompletableFuture<Void>> futures,
+ Map<Future, ServerName> map) {
+ List<ServerName> failedServers = new ArrayList<>();
+ for (CompletableFuture completedFuture : futures) {
+ if (!completedFuture.isDone()) {
+ // If this task is still running, cancel it and keep in retry list.
+ ServerName sn = map.get(completedFuture);
+ failedServers.add(sn);
+ // Even though we cancel this future but it doesn't interrupt the executing thread.
+ completedFuture.cancel(true);
+ } else if (completedFuture.isCompletedExceptionally()
+ || completedFuture.isCancelled()) {
+ // This means task is done but completed with exception
+ // or was canceled. Add it to retry list.
+ ServerName sn = map.get(completedFuture);
+ failedServers.add(sn);
+ }
+ }
+ return failedServers;
+ }
+
+ private RegionServerEndpointProtos.InvalidateServerMetadataCacheRequest getRequest(
+ List<InvalidateServerMetadataCacheRequest> requests) {
+ RegionServerEndpointProtos.InvalidateServerMetadataCacheRequest.Builder builder =
+ RegionServerEndpointProtos.InvalidateServerMetadataCacheRequest.newBuilder();
+ for (InvalidateServerMetadataCacheRequest request: requests) {
+ RegionServerEndpointProtos.InvalidateServerMetadataCache.Builder innerBuilder
+ = RegionServerEndpointProtos.InvalidateServerMetadataCache.newBuilder();
+ innerBuilder.setTenantId(ByteStringer.wrap(request.getTenantId()));
+ innerBuilder.setSchemaName(ByteStringer.wrap(request.getSchemaName()));
+ innerBuilder.setTableName(ByteStringer.wrap(request.getTableName()));
+ builder.addInvalidateServerMetadataCacheRequests(innerBuilder.build());
+ }
+ return builder.build();
+ }
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index dfe2a22874..62b6514f41 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.compile.MutationPlan;
+import org.apache.phoenix.coprocessor.InvalidateServerMetadataCacheRequest;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
@@ -824,4 +825,10 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
public int getConnectionCount(boolean isInternal) {
return 0;
}
+
+ @Override
+ public void invalidateServerMetadataCache(List<InvalidateServerMetadataCacheRequest> requests)
+ throws Throwable {
+ // No-op
+ }
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index 0278345948..674f0c5d5e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.compile.MutationPlan;
+import org.apache.phoenix.coprocessor.InvalidateServerMetadataCacheRequest;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
@@ -432,4 +433,10 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
public int getConnectionCount(boolean isInternal) {
return getDelegate().getConnectionCount(isInternal);
}
+
+ @Override
+ public void invalidateServerMetadataCache(List<InvalidateServerMetadataCacheRequest> requests)
+ throws Throwable {
+ getDelegate().invalidateServerMetadataCache(requests);
+ }
}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 0316e18df9..71496f3630 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -186,6 +186,9 @@ public interface QueryServices extends SQLCloseable {
public static final String INDEX_PRIOIRTY_ATTRIB = "phoenix.index.rpc.priority";
public static final String METADATA_PRIOIRTY_ATTRIB = "phoenix.metadata.rpc.priority";
public static final String SERVER_SIDE_PRIOIRTY_ATTRIB = "phoenix.serverside.rpc.priority";
+ String INVALIDATE_METADATA_CACHE_PRIORITY_ATTRIB =
+ "phoenix.invalidate.metadata.cache.rpc.priority";
+
public static final String ALLOW_LOCAL_INDEX_ATTRIB = "phoenix.index.allowLocalIndex";
// Retries when doing server side writes to SYSTEM.CATALOG
@@ -251,6 +254,7 @@ public interface QueryServices extends SQLCloseable {
public static final String INDEX_HANDLER_COUNT_ATTRIB = "phoenix.rpc.index.handler.count";
public static final String METADATA_HANDLER_COUNT_ATTRIB = "phoenix.rpc.metadata.handler.count";
public static final String SERVER_SIDE_HANDLER_COUNT_ATTRIB = "phoenix.rpc.serverside.handler.count";
+ String INVALIDATE_CACHE_HANDLER_COUNT_ATTRIB = "phoenix.rpc.invalidate.cache.handler.count";
public static final String FORCE_ROW_KEY_ORDER_ATTRIB = "phoenix.query.force.rowkeyorder";
public static final String ALLOW_USER_DEFINED_FUNCTIONS_ATTRIB = "phoenix.functions.allowUserDefinedFunctions";
@@ -438,7 +442,11 @@ public interface QueryServices extends SQLCloseable {
* Parameter to disable the server merges for hinted uncovered indexes
*/
String SERVER_MERGE_FOR_UNCOVERED_INDEX = "phoenix.query.global.server.merge.enable";
-
+ String PHOENIX_METADATA_CACHE_INVALIDATION_TIMEOUT_MS =
+ "phoenix.metadata.cache.invalidation.timeoutMs";
+ // Default to 10 seconds.
+ long PHOENIX_METADATA_CACHE_INVALIDATION_TIMEOUT_MS_DEFAULT = 10 * 1000;
+ String PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED = "phoenix.metadata.invalidate.cache.enabled";
/**
* Param to determine whether client can disable validation to figure out if any of the
* descendent views extend primary key of their parents. Since this is a bit of
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index dafeadf08a..673f60d69d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -239,10 +239,12 @@ public class QueryServicesOptions {
public static final int DEFAULT_SERVER_SIDE_PRIORITY = 500;
public static final int DEFAULT_INDEX_PRIORITY = 1000;
public static final int DEFAULT_METADATA_PRIORITY = 2000;
+ public static final int DEFAULT_INVALIDATE_METADATA_CACHE_PRIORITY = 3000;
public static final boolean DEFAULT_ALLOW_LOCAL_INDEX = true;
public static final int DEFAULT_INDEX_HANDLER_COUNT = 30;
public static final int DEFAULT_METADATA_HANDLER_COUNT = 30;
public static final int DEFAULT_SERVERSIDE_HANDLER_COUNT = 30;
+ public static final int DEFAULT_INVALIDATE_CACHE_HANDLER_COUNT = 10;
public static final int DEFAULT_SYSTEM_MAX_VERSIONS = 1;
public static final boolean DEFAULT_SYSTEM_KEEP_DELETED_CELLS = false;
diff --git a/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java b/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerTest.java
similarity index 75%
rename from phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
rename to phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerTest.java
index 15b83e41b5..aa629c3118 100644
--- a/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
+++ b/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerTest.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.ipc;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.when;
import java.net.InetSocketAddress;
@@ -38,16 +39,9 @@ import org.mockito.Mockito;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
-/**
- * Test that the rpc scheduler schedules index writes to the index handler queue and sends
- * everything else to the standard queues
- */
-public class PhoenixIndexRpcSchedulerTest {
-
+public class PhoenixRpcSchedulerTest {
private static final Configuration conf = HBaseConfiguration.create();
private static final InetSocketAddress isa = new InetSocketAddress("localhost", 0);
-
-
private class AbortServer implements Abortable {
private boolean aborted = false;
@@ -62,25 +56,29 @@ public class PhoenixIndexRpcSchedulerTest {
}
}
+ /**
+ * Test that the rpc scheduler schedules index writes to the index handler queue and sends
+ * everything else to the standard queues
+ */
@Test
public void testIndexPriorityWritesToIndexHandler() throws Exception {
RpcScheduler mock = Mockito.mock(RpcScheduler.class);
PriorityFunction qosFunction = Mockito.mock(PriorityFunction.class);
Abortable abortable = new AbortServer();
- PhoenixRpcScheduler scheduler = new PhoenixRpcScheduler(conf, mock, 200, 250, 225, qosFunction,abortable);
+ PhoenixRpcScheduler scheduler = new PhoenixRpcScheduler(conf, mock, 200, 250, 225, 230, qosFunction,abortable);
BalancedQueueRpcExecutor executor = new BalancedQueueRpcExecutor("test-queue", 1, 1,qosFunction,conf,abortable);
scheduler.setIndexExecutorForTesting(executor);
dispatchCallWithPriority(scheduler, 200);
List<BlockingQueue<CallRunner>> queues = executor.getQueues();
assertEquals(1, queues.size());
BlockingQueue<CallRunner> queue = queues.get(0);
- queue.poll(20, TimeUnit.SECONDS);
+ assertNotNull(queue.poll(5, TimeUnit.SECONDS));
// try again, this time we tweak the ranges we support
- scheduler = new PhoenixRpcScheduler(conf, mock, 101, 110, 105, qosFunction,abortable);
+ scheduler = new PhoenixRpcScheduler(conf, mock, 101, 110, 105, 115, qosFunction,abortable);
scheduler.setIndexExecutorForTesting(executor);
dispatchCallWithPriority(scheduler, 101);
- queue.poll(20, TimeUnit.SECONDS);
+ assertNotNull(queue.poll(5, TimeUnit.SECONDS));
Mockito.verify(mock, Mockito.times(2)).init(Mockito.any(Context.class));
scheduler.stop();
@@ -92,7 +90,7 @@ public class PhoenixIndexRpcSchedulerTest {
RpcScheduler mock = Mockito.mock(RpcScheduler.class);
PriorityFunction qosFunction = Mockito.mock(PriorityFunction.class);
Abortable abortable = new AbortServer();
- PhoenixRpcScheduler scheduler1 = new PhoenixRpcScheduler(conf, mock, 200, 250, 100, qosFunction,abortable);
+ PhoenixRpcScheduler scheduler1 = new PhoenixRpcScheduler(conf, mock, 200, 250, 100, 300, qosFunction,abortable);
RpcExecutor executor1 = scheduler1.getServerSideExecutorForTesting();
for (int c = 0; c < 10; c++) {
dispatchCallWithPriority(scheduler1, 100);
@@ -103,7 +101,7 @@ public class PhoenixIndexRpcSchedulerTest {
if (queue1.size() > 0) {
numDispatches1 += queue1.size();
for (int i = 0; i < queue1.size(); i++) {
- queue1.poll(20, TimeUnit.SECONDS);
+ assertNotNull(queue1.poll(5, TimeUnit.SECONDS));
}
}
}
@@ -111,7 +109,7 @@ public class PhoenixIndexRpcSchedulerTest {
scheduler1.stop();
// try again, with the incorrect executor
- PhoenixRpcScheduler scheduler2 = new PhoenixRpcScheduler(conf, mock, 101, 110, 50, qosFunction,abortable);
+ PhoenixRpcScheduler scheduler2 = new PhoenixRpcScheduler(conf, mock, 101, 110, 50, 25, qosFunction,abortable);
RpcExecutor executor2 = scheduler2.getIndexExecutorForTesting();
dispatchCallWithPriority(scheduler2, 50);
List<BlockingQueue<CallRunner>> queues2 = executor2.getQueues();
@@ -119,7 +117,7 @@ public class PhoenixIndexRpcSchedulerTest {
for (BlockingQueue<CallRunner> queue2 : queues2) {
if (queue2.size() > 0) {
numDispatches2++;
- queue2.poll(20, TimeUnit.SECONDS);
+ assertNotNull(queue2.poll(5, TimeUnit.SECONDS));
}
}
assertEquals(0, numDispatches2);
@@ -140,12 +138,12 @@ public class PhoenixIndexRpcSchedulerTest {
PriorityFunction qosFunction = Mockito.mock(PriorityFunction.class);
Abortable abortable = new AbortServer();
RpcScheduler mock = Mockito.mock(RpcScheduler.class);
- PhoenixRpcScheduler scheduler = new PhoenixRpcScheduler(conf, mock, 200, 250, 225, qosFunction,abortable);
+ PhoenixRpcScheduler scheduler = new PhoenixRpcScheduler(conf, mock, 200, 250, 225, 275, qosFunction,abortable);
dispatchCallWithPriority(scheduler, 100);
dispatchCallWithPriority(scheduler, 251);
// try again, this time we tweak the ranges we support
- scheduler = new PhoenixRpcScheduler(conf, mock, 101, 110, 105, qosFunction,abortable);
+ scheduler = new PhoenixRpcScheduler(conf, mock, 101, 110, 105, 115, qosFunction,abortable);
dispatchCallWithPriority(scheduler, 200);
dispatchCallWithPriority(scheduler, 111);
@@ -154,6 +152,33 @@ public class PhoenixIndexRpcSchedulerTest {
scheduler.stop();
}
+ /**
+ * Test that the rpc scheduler schedules invalidate metadata cache RPC to
+ * the invalidate metadata cache executor.
+ */
+ @Test
+ public void testInvalidateMetadataCacheExecutor() throws Exception {
+ RpcScheduler mock = Mockito.mock(RpcScheduler.class);
+ PriorityFunction qosFunction = Mockito.mock(PriorityFunction.class);
+ Abortable abortable = new AbortServer();
+ // Set invalidate metadata cache priority to 230.
+ int invalidateMetadataCacheCallPriority = 230;
+ PhoenixRpcScheduler scheduler = new PhoenixRpcScheduler(conf, mock,
+ 200, 250, 225, invalidateMetadataCacheCallPriority, qosFunction,abortable);
+ BalancedQueueRpcExecutor executor = new BalancedQueueRpcExecutor("test-queue",
+ 1, 1, qosFunction, conf, abortable);
+ scheduler.setInvalidateMetadataCacheExecutorForTesting(executor);
+ dispatchCallWithPriority(scheduler, invalidateMetadataCacheCallPriority);
+ List<BlockingQueue<CallRunner>> queues = executor.getQueues();
+ assertEquals(1, queues.size());
+ BlockingQueue<CallRunner> queue = queues.get(0);
+ assertEquals(1, queue.size());
+ assertNotNull(queue.poll(5, TimeUnit.SECONDS));
+ Mockito.verify(mock, Mockito.times(1)).init(Mockito.any(RpcScheduler.Context.class));
+ scheduler.stop();
+ executor.stop();
+ }
+
private void dispatchCallWithPriority(RpcScheduler scheduler, int priority) throws Exception {
CallRunner task = Mockito.mock(CallRunner.class);
RequestHeader header = RequestHeader.newBuilder().setPriority(priority).build();
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java b/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java
index 3ba83ff2ee..11f85172ff 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessor.metrics.MetricsMetadataCachingSource;
import org.apache.phoenix.coprocessor.metrics.MetricsPhoenixCoprocessorSourceFactory;
import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.monitoring.GlobalClientMetrics;
@@ -46,8 +47,6 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -58,12 +57,14 @@ import java.util.Map;
import java.util.Properties;
import java.util.Random;
-import static org.apache.phoenix.coprocessor.MetaDataEndpointImpl.PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED;
+import static org.apache.phoenix.query.ConnectionQueryServicesImpl.INVALIDATE_SERVER_METADATA_CACHE_EX_MESSAGE;
+import static org.apache.phoenix.query.QueryServices.PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
@@ -75,7 +76,6 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
private final Random RANDOM = new Random(42);
private final long NEVER = (long) ConnectionProperty.UPDATE_CACHE_FREQUENCY.getValue("NEVER");
- private static final Logger LOGGER = LoggerFactory.getLogger(ServerMetadataCacheTest.class);
@BeforeClass
public static synchronized void doSetup() throws Exception {
@@ -1422,8 +1422,24 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
}
}
- //Helper methods
+ /*
+ Tests that invalidate server metadata cache fails on a non server connection.
+ */
+ @Test
+ public void testInvalidateMetadataCacheOnNonServerConnection() {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (PhoenixConnection conn = DriverManager.getConnection(getUrl(), props)
+ .unwrap(PhoenixConnection.class)) {
+ ConnectionQueryServices cqs = conn.getQueryServices();
+ cqs.invalidateServerMetadataCache(null);
+ fail("Shouldn't come here");
+ } catch (Throwable t) {
+ assertNotNull(t);
+ assertTrue(t.getMessage().contains(INVALIDATE_SERVER_METADATA_CACHE_EX_MESSAGE));
+ }
+ }
+ //Helper methods
private long getLastDDLTimestamp(String tableName) throws SQLException {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
// Need to use different connection than what is used for creating table or indexes.