You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2017/02/14 23:41:59 UTC
[10/50] [abbrv] phoenix git commit: PHOENIX-3611 Cache for client
connections will expire (and close) entries in LRU fashion.
PHOENIX-3611 Cache for client connections will expire (and close) entries in LRU fashion.
Signed-off-by: Andrew Purtell <ap...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/badb9b40
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/badb9b40
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/badb9b40
Branch: refs/heads/encodecolumns2
Commit: badb9b40b67e1dfc6b1bba1b368aa0ea461773f7
Parents: 2fd9b08
Author: Geoffrey <gj...@salesforce.com>
Authored: Thu Jan 19 16:08:20 2017 -0800
Committer: Andrew Purtell <ap...@apache.org>
Committed: Fri Jan 20 16:18:02 2017 -0800
----------------------------------------------------------------------
.../org/apache/phoenix/jdbc/PhoenixDriver.java | 99 +++++++++++++-------
.../org/apache/phoenix/query/QueryServices.java | 5 +-
.../phoenix/query/QueryServicesOptions.java | 2 +
3 files changed, 70 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/badb9b40/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
index 1fb827c..ba06ed9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
@@ -23,20 +23,13 @@ import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.*;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.concurrent.GuardedBy;
+import com.google.common.cache.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
@@ -50,7 +43,6 @@ import org.apache.phoenix.query.QueryServicesOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -147,13 +139,43 @@ public final class PhoenixDriver extends PhoenixEmbeddedDriver {
}
// One entry per cluster here
- private final ConcurrentMap<ConnectionInfo,ConnectionQueryServices> connectionQueryServicesMap = new ConcurrentHashMap<ConnectionInfo,ConnectionQueryServices>(3);
+ private final Cache<ConnectionInfo, ConnectionQueryServices> connectionQueryServicesCache =
+ initializeConnectionCache();
public PhoenixDriver() { // for Squirrel
// Use production services implementation
super();
}
-
+
+ private Cache<ConnectionInfo, ConnectionQueryServices> initializeConnectionCache() {
+ Configuration config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+ int maxCacheSize = config.getInt(QueryServices.CLIENT_CONNECTION_CACHE_MAX_SIZE,
+ QueryServicesOptions.DEFAULT_CLIENT_CONNECTION_CACHE_MAX_SIZE);
+ int maxCacheDuration = config.getInt(QueryServices.CLIENT_CONNECTION_CACHE_MAX_DURATION_MILLISECONDS,
+ QueryServicesOptions.DEFAULT_CLIENT_CONNECTION_CACHE_MAX_DURATION);
+ RemovalListener<ConnectionInfo, ConnectionQueryServices> cacheRemovalListener =
+ new RemovalListener<ConnectionInfo, ConnectionQueryServices>() {
+ @Override
+ public void onRemoval(RemovalNotification<ConnectionInfo, ConnectionQueryServices> notification) {
+ String connInfoIdentifier = notification.getKey().toString();
+ logger.debug("Expiring " + connInfoIdentifier + " because of "
+ + notification.getCause().name());
+
+ try {
+ notification.getValue().close();
+ }
+ catch (SQLException se) {
+ logger.error("Error while closing expired cache connection " + connInfoIdentifier, se);
+ }
+ }
+ };
+ return CacheBuilder.newBuilder()
+ .maximumSize(maxCacheSize)
+ .expireAfterAccess(maxCacheDuration, TimeUnit.MILLISECONDS)
+ .removalListener(cacheRemovalListener)
+ .build();
+ }
+
// writes guarded by "this"
private volatile QueryServices services;
@@ -206,38 +228,49 @@ public final class PhoenixDriver extends PhoenixEmbeddedDriver {
}
@Override
- protected ConnectionQueryServices getConnectionQueryServices(String url, Properties info) throws SQLException {
+ protected ConnectionQueryServices getConnectionQueryServices(String url, final Properties info) throws SQLException {
try {
lockInterruptibly(LockMode.READ);
checkClosed();
ConnectionInfo connInfo = ConnectionInfo.create(url);
- QueryServices services = getQueryServices();
- // Also performs the Kerberos login if the URL/properties request this
- ConnectionInfo normalizedConnInfo = connInfo.normalize(services.getProps(), info);
- ConnectionQueryServices connectionQueryServices = connectionQueryServicesMap.get(normalizedConnInfo);
- if (connectionQueryServices == null) {
- if (normalizedConnInfo.isConnectionless()) {
- connectionQueryServices = new ConnectionlessQueryServicesImpl(services, normalizedConnInfo, info);
- } else {
- connectionQueryServices = new ConnectionQueryServicesImpl(services, normalizedConnInfo, info);
- }
- ConnectionQueryServices prevValue = connectionQueryServicesMap.putIfAbsent(normalizedConnInfo, connectionQueryServices);
- if (prevValue != null) {
- connectionQueryServices = prevValue;
- }
- }
- boolean success = false;
SQLException sqlE = null;
+ boolean success = false;
+ final QueryServices services = getQueryServices();
+ ConnectionQueryServices connectionQueryServices = null;
+ // Also performs the Kerberos login if the URL/properties request this
+ final ConnectionInfo normalizedConnInfo = connInfo.normalize(services.getProps(), info);
try {
+ connectionQueryServices =
+ connectionQueryServicesCache.get(normalizedConnInfo, new Callable<ConnectionQueryServices>() {
+ @Override
+ public ConnectionQueryServices call() throws Exception {
+ ConnectionQueryServices connectionQueryServices;
+ if (normalizedConnInfo.isConnectionless()) {
+ connectionQueryServices = new ConnectionlessQueryServicesImpl(services, normalizedConnInfo, info);
+ } else {
+ connectionQueryServices = new ConnectionQueryServicesImpl(services, normalizedConnInfo, info);
+ }
+
+ return connectionQueryServices;
+ }
+ });
+
connectionQueryServices.init(url, info);
success = true;
- } catch (SQLException e) {
+ } catch (ExecutionException ee){
+ if (ee.getCause() instanceof SQLException) {
+ sqlE = (SQLException) ee.getCause();
+ } else {
+ throw new SQLException(ee);
+ }
+ }
+ catch (SQLException e) {
sqlE = e;
}
finally {
if (!success) {
// Remove from map, as initialization failed
- connectionQueryServicesMap.remove(normalizedConnInfo);
+ connectionQueryServicesCache.invalidate(normalizedConnInfo);
if (sqlE != null) {
throw sqlE;
}
@@ -320,8 +353,4 @@ public final class PhoenixDriver extends PhoenixEmbeddedDriver {
}
}
- @VisibleForTesting
- protected ConcurrentMap<ConnectionInfo,ConnectionQueryServices> getCachedConnections() {
- return this.connectionQueryServicesMap;
- }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/badb9b40/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 044768a..e77e01f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -232,7 +232,10 @@ public interface QueryServices extends SQLCloseable {
public static final String CLIENT_CACHE_ENCODING = "phoenix.table.client.cache.encoding";
public static final String AUTO_UPGRADE_ENABLED = "phoenix.autoupgrade.enabled";
-
+
+ public static final String CLIENT_CONNECTION_CACHE_MAX_SIZE = "phoenix.client.connection.cache.max.size";
+ public static final String CLIENT_CONNECTION_CACHE_MAX_DURATION_MILLISECONDS =
+ "phoenix.client.connection.max.duration";
/**
* Get executor service used for parallel scans
*/
http://git-wip-us.apache.org/repos/asf/phoenix/blob/badb9b40/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index a785436..13fb9ea 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -259,6 +259,8 @@ public class QueryServicesOptions {
public static final String DEFAULT_CLIENT_CACHE_ENCODING = PTableRefFactory.Encoding.OBJECT.toString();
public static final boolean DEFAULT_AUTO_UPGRADE_ENABLED = true;
+ public static final int DEFAULT_CLIENT_CONNECTION_CACHE_MAX_SIZE = 100;
+ public static final int DEFAULT_CLIENT_CONNECTION_CACHE_MAX_DURATION = 86400000;
@SuppressWarnings("serial")
public static final Set<String> DEFAULT_QUERY_SERVER_SKIP_WORDS = new HashSet<String>() {