You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2017/02/14 23:41:59 UTC

[10/50] [abbrv] phoenix git commit: PHOENIX-3611 Cache for client connections will expire (and close) entries in LRU fashion.

PHOENIX-3611 Cache for client connections will expire (and close) entries in LRU fashion.

Signed-off-by: Andrew Purtell <ap...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/badb9b40
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/badb9b40
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/badb9b40

Branch: refs/heads/encodecolumns2
Commit: badb9b40b67e1dfc6b1bba1b368aa0ea461773f7
Parents: 2fd9b08
Author: Geoffrey <gj...@salesforce.com>
Authored: Thu Jan 19 16:08:20 2017 -0800
Committer: Andrew Purtell <ap...@apache.org>
Committed: Fri Jan 20 16:18:02 2017 -0800

----------------------------------------------------------------------
 .../org/apache/phoenix/jdbc/PhoenixDriver.java  | 99 +++++++++++++-------
 .../org/apache/phoenix/query/QueryServices.java |  5 +-
 .../phoenix/query/QueryServicesOptions.java     |  2 +
 3 files changed, 70 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/badb9b40/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
index 1fb827c..ba06ed9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
@@ -23,20 +23,13 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.*;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.annotation.concurrent.GuardedBy;
 
+import com.google.common.cache.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
@@ -50,7 +43,6 @@ import org.apache.phoenix.query.QueryServicesOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 
@@ -147,13 +139,43 @@ public final class PhoenixDriver extends PhoenixEmbeddedDriver {
     }
 
     // One entry per cluster here
-    private final ConcurrentMap<ConnectionInfo,ConnectionQueryServices> connectionQueryServicesMap = new ConcurrentHashMap<ConnectionInfo,ConnectionQueryServices>(3);
+    private final Cache<ConnectionInfo, ConnectionQueryServices> connectionQueryServicesCache =
+        initializeConnectionCache();
 
     public PhoenixDriver() { // for Squirrel
         // Use production services implementation
         super();
     }
-    
+
+    private Cache<ConnectionInfo, ConnectionQueryServices> initializeConnectionCache() {
+        Configuration config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+        int maxCacheSize = config.getInt(QueryServices.CLIENT_CONNECTION_CACHE_MAX_SIZE,
+            QueryServicesOptions.DEFAULT_CLIENT_CONNECTION_CACHE_MAX_SIZE);
+        int maxCacheDuration = config.getInt(QueryServices.CLIENT_CONNECTION_CACHE_MAX_DURATION_MILLISECONDS,
+            QueryServicesOptions.DEFAULT_CLIENT_CONNECTION_CACHE_MAX_DURATION);
+        RemovalListener<ConnectionInfo, ConnectionQueryServices> cacheRemovalListener =
+            new RemovalListener<ConnectionInfo, ConnectionQueryServices>() {
+                @Override
+                public void onRemoval(RemovalNotification<ConnectionInfo, ConnectionQueryServices> notification) {
+                    String connInfoIdentifier = notification.getKey().toString();
+                    logger.debug("Expiring " + connInfoIdentifier + " because of "
+                        + notification.getCause().name());
+
+                    try {
+                        notification.getValue().close();
+                    }
+                    catch (SQLException se) {
+                        logger.error("Error while closing expired cache connection " + connInfoIdentifier, se);
+                    }
+                }
+            };
+        return CacheBuilder.newBuilder()
+            .maximumSize(maxCacheSize)
+            .expireAfterAccess(maxCacheDuration, TimeUnit.MILLISECONDS)
+            .removalListener(cacheRemovalListener)
+            .build();
+    }
+
     // writes guarded by "this"
     private volatile QueryServices services;
     
@@ -206,38 +228,49 @@ public final class PhoenixDriver extends PhoenixEmbeddedDriver {
     }
     
     @Override
-    protected ConnectionQueryServices getConnectionQueryServices(String url, Properties info) throws SQLException {
+    protected ConnectionQueryServices getConnectionQueryServices(String url, final Properties info) throws SQLException {
         try {
             lockInterruptibly(LockMode.READ);
             checkClosed();
             ConnectionInfo connInfo = ConnectionInfo.create(url);
-            QueryServices services = getQueryServices();
-            // Also performs the Kerberos login if the URL/properties request this
-            ConnectionInfo normalizedConnInfo = connInfo.normalize(services.getProps(), info);
-            ConnectionQueryServices connectionQueryServices = connectionQueryServicesMap.get(normalizedConnInfo);
-            if (connectionQueryServices == null) {
-                if (normalizedConnInfo.isConnectionless()) {
-                    connectionQueryServices = new ConnectionlessQueryServicesImpl(services, normalizedConnInfo, info);
-                } else {
-                    connectionQueryServices = new ConnectionQueryServicesImpl(services, normalizedConnInfo, info);
-                }
-                ConnectionQueryServices prevValue = connectionQueryServicesMap.putIfAbsent(normalizedConnInfo, connectionQueryServices);
-                if (prevValue != null) {
-                    connectionQueryServices = prevValue;
-                }
-            }
-            boolean success = false;
             SQLException sqlE = null;
+            boolean success = false;
+            final QueryServices services = getQueryServices();
+            ConnectionQueryServices connectionQueryServices = null;
+            // Also performs the Kerberos login if the URL/properties request this
+            final ConnectionInfo normalizedConnInfo = connInfo.normalize(services.getProps(), info);
             try {
+                connectionQueryServices =
+                    connectionQueryServicesCache.get(normalizedConnInfo, new Callable<ConnectionQueryServices>() {
+                        @Override
+                        public ConnectionQueryServices call() throws Exception {
+                            ConnectionQueryServices connectionQueryServices;
+                            if (normalizedConnInfo.isConnectionless()) {
+                                connectionQueryServices = new ConnectionlessQueryServicesImpl(services, normalizedConnInfo, info);
+                            } else {
+                                connectionQueryServices = new ConnectionQueryServicesImpl(services, normalizedConnInfo, info);
+                            }
+
+                            return connectionQueryServices;
+                        }
+                    });
+
                 connectionQueryServices.init(url, info);
                 success = true;
-            } catch (SQLException e) {
+            } catch (ExecutionException ee){
+                if (ee.getCause() instanceof  SQLException) {
+                    sqlE = (SQLException) ee.getCause();
+                } else {
+                    throw new SQLException(ee);
+                }
+            }
+              catch (SQLException e) {
                 sqlE = e;
             }
             finally {
                 if (!success) {
                     // Remove from map, as initialization failed
-                    connectionQueryServicesMap.remove(normalizedConnInfo);
+                    connectionQueryServicesCache.invalidate(normalizedConnInfo);
                     if (sqlE != null) {
                         throw sqlE;
                     }
@@ -320,8 +353,4 @@ public final class PhoenixDriver extends PhoenixEmbeddedDriver {
         }
     }
 
-    @VisibleForTesting
-    protected ConcurrentMap<ConnectionInfo,ConnectionQueryServices> getCachedConnections() {
-        return this.connectionQueryServicesMap;
-    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/badb9b40/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 044768a..e77e01f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -232,7 +232,10 @@ public interface QueryServices extends SQLCloseable {
     
     public static final String CLIENT_CACHE_ENCODING = "phoenix.table.client.cache.encoding";
     public static final String AUTO_UPGRADE_ENABLED = "phoenix.autoupgrade.enabled";
-	
+
+    public static final String CLIENT_CONNECTION_CACHE_MAX_SIZE = "phoenix.client.connection.cache.max.size";
+    public static final String CLIENT_CONNECTION_CACHE_MAX_DURATION_MILLISECONDS =
+        "phoenix.client.connection.max.duration";
     /**
      * Get executor service used for parallel scans
      */

http://git-wip-us.apache.org/repos/asf/phoenix/blob/badb9b40/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index a785436..13fb9ea 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -259,6 +259,8 @@ public class QueryServicesOptions {
     
     public static final String DEFAULT_CLIENT_CACHE_ENCODING = PTableRefFactory.Encoding.OBJECT.toString();
     public static final boolean DEFAULT_AUTO_UPGRADE_ENABLED = true;
+    public static final int DEFAULT_CLIENT_CONNECTION_CACHE_MAX_SIZE = 100;
+    public static final int DEFAULT_CLIENT_CONNECTION_CACHE_MAX_DURATION = 86400000;
 
     @SuppressWarnings("serial")
     public static final Set<String> DEFAULT_QUERY_SERVER_SKIP_WORDS = new HashSet<String>() {