You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ap...@apache.org on 2017/01/08 08:05:05 UTC

[1/6] phoenix git commit: PHOENIX-3563 Ensure we release ZooKeeper resources allocated by the Tephra client embedded in the Phoenix connection

Repository: phoenix
Updated Branches:
  refs/heads/4.9-HBase-0.98 4340dadb2 -> 3592782ae
  refs/heads/4.9-HBase-1.1 397fff999 -> 87cf11434
  refs/heads/4.9-HBase-1.2 fb504f2d7 -> f34a605a3
  refs/heads/4.x-HBase-0.98 91d1478cf -> b69b177b3
  refs/heads/4.x-HBase-1.1 e24ec6a8e -> cdc77dfcb
  refs/heads/master a164f0327 -> d8f459498


PHOENIX-3563 Ensure we release ZooKeeper resources allocated by the Tephra client embedded in the Phoenix connection


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d8f45949
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d8f45949
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d8f45949

Branch: refs/heads/master
Commit: d8f4594989c0b73945aaffec5649a0b62ac59724
Parents: a164f03
Author: Andrew Purtell <ap...@apache.org>
Authored: Wed Jan 4 16:48:44 2017 -0800
Committer: Andrew Purtell <ap...@apache.org>
Committed: Sat Jan 7 10:03:23 2017 -0800

----------------------------------------------------------------------
 .../query/ConnectionQueryServicesImpl.java      | 37 ++++++++++++--------
 1 file changed, 23 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/d8f45949/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index f66b358..be34f66 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -254,6 +254,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     private final boolean returnSequenceValues ;
 
     private HConnection connection;
+    private ZKClientService txZKClientService;
     private TransactionServiceClient txServiceClient;
     private volatile boolean initialized;
     private volatile int nSequenceSaltBuckets;
@@ -370,15 +371,16 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
 
         int timeOut = props.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
         // Create instance of the tephra zookeeper client
-        ZKClientService tephraZKClientService = new TephraZKClientService(zkQuorumServersString, timeOut, null, ArrayListMultimap.<String, byte[]>create());
-
-        ZKClientService zkClientService = ZKClientServices.delegate(
-                ZKClients.reWatchOnExpire(
-                        ZKClients.retryOnFailure(tephraZKClientService, RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS))
-                        )
+        txZKClientService = ZKClientServices.delegate(
+            ZKClients.reWatchOnExpire(
+                ZKClients.retryOnFailure(
+                     new TephraZKClientService(zkQuorumServersString, timeOut, null,
+                             ArrayListMultimap.<String, byte[]>create()), 
+                         RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS))
+                     )
                 );
-        zkClientService.startAndWait();
-        ZKDiscoveryService zkDiscoveryService = new ZKDiscoveryService(zkClientService);
+        txZKClientService.startAndWait();
+        ZKDiscoveryService zkDiscoveryService = new ZKDiscoveryService(txZKClientService);
         PooledClientProvider pooledClientProvider = new PooledClientProvider(
                 config, zkDiscoveryService);
         this.txServiceClient = new TransactionServiceClient(config,pooledClientProvider);
@@ -389,11 +391,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             boolean transactionsEnabled = props.getBoolean(
                     QueryServices.TRANSACTIONS_ENABLED,
                     QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED);
-            // only initialize the tx service client if needed
+            this.connection = HBaseFactoryProvider.getHConnectionFactory().createConnection(this.config);
+            // only initialize the tx service client if needed and if we succeeded in getting a connection
+            // to HBase
             if (transactionsEnabled) {
                 initTxServiceClient();
             }
-            this.connection = HBaseFactoryProvider.getHConnectionFactory().createConnection(this.config);
         } catch (IOException e) {
             throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION)
             .setRootCause(e).build().buildException();
@@ -463,14 +466,20 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             } finally {
                 try {
                     childServices.clear();
-                    if (renewLeaseExecutor != null) {
-                        renewLeaseExecutor.shutdownNow();
-                    }
                     synchronized (latestMetaDataLock) {
                         latestMetaData = null;
                         latestMetaDataLock.notifyAll();
                     }
-                    if (connection != null) connection.close();
+                    try {
+                        // close the HBase connection
+                        if (connection != null) connection.close();
+                    } finally {
+                        if (renewLeaseExecutor != null) {
+                            renewLeaseExecutor.shutdownNow();
+                        }
+                        // shut down the tx client service if we created one to support transactions
+                        if (this.txZKClientService != null) this.txZKClientService.stopAndWait();
+                    }
                 } catch (IOException e) {
                     if (sqlE == null) {
                         sqlE = ServerUtil.parseServerException(e);


[5/6] phoenix git commit: PHOENIX-3563 Ensure we release ZooKeeper resources allocated by the Tephra client embedded in the Phoenix connection

Posted by ap...@apache.org.
PHOENIX-3563 Ensure we release ZooKeeper resources allocated by the Tephra client embedded in the Phoenix connection


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/87cf1143
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/87cf1143
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/87cf1143

Branch: refs/heads/4.9-HBase-1.1
Commit: 87cf11434823c4fe325f340f6d19f7604bbf27a6
Parents: 397fff9
Author: Andrew Purtell <ap...@apache.org>
Authored: Wed Jan 4 16:48:44 2017 -0800
Committer: Andrew Purtell <ap...@apache.org>
Committed: Sat Jan 7 18:53:38 2017 -0800

----------------------------------------------------------------------
 .../query/ConnectionQueryServicesImpl.java      | 37 ++++++++++++--------
 1 file changed, 23 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/87cf1143/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index eb26230..30d3a1a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -254,6 +254,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     private final boolean returnSequenceValues ;
 
     private HConnection connection;
+    private ZKClientService txZKClientService;
     private TransactionServiceClient txServiceClient;
     private volatile boolean initialized;
     private volatile int nSequenceSaltBuckets;
@@ -370,15 +371,16 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
 
         int timeOut = props.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
         // Create instance of the tephra zookeeper client
-        ZKClientService tephraZKClientService = new TephraZKClientService(zkQuorumServersString, timeOut, null, ArrayListMultimap.<String, byte[]>create());
-
-        ZKClientService zkClientService = ZKClientServices.delegate(
-                ZKClients.reWatchOnExpire(
-                        ZKClients.retryOnFailure(tephraZKClientService, RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS))
-                        )
+        txZKClientService = ZKClientServices.delegate(
+            ZKClients.reWatchOnExpire(
+                ZKClients.retryOnFailure(
+                     new TephraZKClientService(zkQuorumServersString, timeOut, null,
+                             ArrayListMultimap.<String, byte[]>create()), 
+                         RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS))
+                     )
                 );
-        zkClientService.startAndWait();
-        ZKDiscoveryService zkDiscoveryService = new ZKDiscoveryService(zkClientService);
+        txZKClientService.startAndWait();
+        ZKDiscoveryService zkDiscoveryService = new ZKDiscoveryService(txZKClientService);
         PooledClientProvider pooledClientProvider = new PooledClientProvider(
                 config, zkDiscoveryService);
         this.txServiceClient = new TransactionServiceClient(config,pooledClientProvider);
@@ -389,11 +391,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             boolean transactionsEnabled = props.getBoolean(
                     QueryServices.TRANSACTIONS_ENABLED,
                     QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED);
-            // only initialize the tx service client if needed
+            this.connection = HBaseFactoryProvider.getHConnectionFactory().createConnection(this.config);
+            // only initialize the tx service client if needed and if we succeeded in getting a connection
+            // to HBase
             if (transactionsEnabled) {
                 initTxServiceClient();
             }
-            this.connection = HBaseFactoryProvider.getHConnectionFactory().createConnection(this.config);
         } catch (IOException e) {
             throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION)
             .setRootCause(e).build().buildException();
@@ -463,14 +466,20 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             } finally {
                 try {
                     childServices.clear();
-                    if (renewLeaseExecutor != null) {
-                        renewLeaseExecutor.shutdownNow();
-                    }
                     synchronized (latestMetaDataLock) {
                         latestMetaData = null;
                         latestMetaDataLock.notifyAll();
                     }
-                    if (connection != null) connection.close();
+                    try {
+                        // close the HBase connection
+                        if (connection != null) connection.close();
+                    } finally {
+                        if (renewLeaseExecutor != null) {
+                            renewLeaseExecutor.shutdownNow();
+                        }
+                        // shut down the tx client service if we created one to support transactions
+                        if (this.txZKClientService != null) this.txZKClientService.stopAndWait();
+                    }
                 } catch (IOException e) {
                     if (sqlE == null) {
                         sqlE = ServerUtil.parseServerException(e);


[2/6] phoenix git commit: PHOENIX-3563 Ensure we release ZooKeeper resources allocated by the Tephra client embedded in the Phoenix connection

Posted by ap...@apache.org.
PHOENIX-3563 Ensure we release ZooKeeper resources allocated by the Tephra client embedded in the Phoenix connection


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/cdc77dfc
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/cdc77dfc
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/cdc77dfc

Branch: refs/heads/4.x-HBase-1.1
Commit: cdc77dfcb1a63f5ccb64cf95cdcee914c8803933
Parents: e24ec6a
Author: Andrew Purtell <ap...@apache.org>
Authored: Wed Jan 4 16:48:44 2017 -0800
Committer: Andrew Purtell <ap...@apache.org>
Committed: Sat Jan 7 10:04:22 2017 -0800

----------------------------------------------------------------------
 .../query/ConnectionQueryServicesImpl.java      | 37 ++++++++++++--------
 1 file changed, 23 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/cdc77dfc/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 16350e5..21acd5b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -254,6 +254,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     private final boolean returnSequenceValues ;
 
     private HConnection connection;
+    private ZKClientService txZKClientService;
     private TransactionServiceClient txServiceClient;
     private volatile boolean initialized;
     private volatile int nSequenceSaltBuckets;
@@ -370,15 +371,16 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
 
         int timeOut = props.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
         // Create instance of the tephra zookeeper client
-        ZKClientService tephraZKClientService = new TephraZKClientService(zkQuorumServersString, timeOut, null, ArrayListMultimap.<String, byte[]>create());
-
-        ZKClientService zkClientService = ZKClientServices.delegate(
-                ZKClients.reWatchOnExpire(
-                        ZKClients.retryOnFailure(tephraZKClientService, RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS))
-                        )
+        txZKClientService = ZKClientServices.delegate(
+            ZKClients.reWatchOnExpire(
+                ZKClients.retryOnFailure(
+                     new TephraZKClientService(zkQuorumServersString, timeOut, null,
+                             ArrayListMultimap.<String, byte[]>create()), 
+                         RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS))
+                     )
                 );
-        zkClientService.startAndWait();
-        ZKDiscoveryService zkDiscoveryService = new ZKDiscoveryService(zkClientService);
+        txZKClientService.startAndWait();
+        ZKDiscoveryService zkDiscoveryService = new ZKDiscoveryService(txZKClientService);
         PooledClientProvider pooledClientProvider = new PooledClientProvider(
                 config, zkDiscoveryService);
         this.txServiceClient = new TransactionServiceClient(config,pooledClientProvider);
@@ -389,11 +391,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             boolean transactionsEnabled = props.getBoolean(
                     QueryServices.TRANSACTIONS_ENABLED,
                     QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED);
-            // only initialize the tx service client if needed
+            this.connection = HBaseFactoryProvider.getHConnectionFactory().createConnection(this.config);
+            // only initialize the tx service client if needed and if we succeeded in getting a connection
+            // to HBase
             if (transactionsEnabled) {
                 initTxServiceClient();
             }
-            this.connection = HBaseFactoryProvider.getHConnectionFactory().createConnection(this.config);
         } catch (IOException e) {
             throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION)
             .setRootCause(e).build().buildException();
@@ -463,14 +466,20 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             } finally {
                 try {
                     childServices.clear();
-                    if (renewLeaseExecutor != null) {
-                        renewLeaseExecutor.shutdownNow();
-                    }
                     synchronized (latestMetaDataLock) {
                         latestMetaData = null;
                         latestMetaDataLock.notifyAll();
                     }
-                    if (connection != null) connection.close();
+                    try {
+                        // close the HBase connection
+                        if (connection != null) connection.close();
+                    } finally {
+                        if (renewLeaseExecutor != null) {
+                            renewLeaseExecutor.shutdownNow();
+                        }
+                        // shut down the tx client service if we created one to support transactions
+                        if (this.txZKClientService != null) this.txZKClientService.stopAndWait();
+                    }
                 } catch (IOException e) {
                     if (sqlE == null) {
                         sqlE = ServerUtil.parseServerException(e);


[6/6] phoenix git commit: PHOENIX-3563 Ensure we release ZooKeeper resources allocated by the Tephra client embedded in the Phoenix connection

Posted by ap...@apache.org.
PHOENIX-3563 Ensure we release ZooKeeper resources allocated by the Tephra client embedded in the Phoenix connection


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/3592782a
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/3592782a
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/3592782a

Branch: refs/heads/4.9-HBase-0.98
Commit: 3592782ae2c337b696a879bc2f6610d280729dce
Parents: 4340dad
Author: Andrew Purtell <ap...@apache.org>
Authored: Wed Jan 4 16:48:44 2017 -0800
Committer: Andrew Purtell <ap...@apache.org>
Committed: Sat Jan 7 18:54:14 2017 -0800

----------------------------------------------------------------------
 .../query/ConnectionQueryServicesImpl.java      | 37 ++++++++++++--------
 1 file changed, 23 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/3592782a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index fc0925d..0a1333f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -255,6 +255,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     private final boolean returnSequenceValues ;
 
     private HConnection connection;
+    private ZKClientService txZKClientService;
     private TransactionServiceClient txServiceClient;
     private volatile boolean initialized;
     private volatile int nSequenceSaltBuckets;
@@ -371,15 +372,16 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
 
         int timeOut = props.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
         // Create instance of the tephra zookeeper client
-        ZKClientService tephraZKClientService = new TephraZKClientService(zkQuorumServersString, timeOut, null, ArrayListMultimap.<String, byte[]>create());
-
-        ZKClientService zkClientService = ZKClientServices.delegate(
-                ZKClients.reWatchOnExpire(
-                        ZKClients.retryOnFailure(tephraZKClientService, RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS))
-                        )
+        txZKClientService = ZKClientServices.delegate(
+            ZKClients.reWatchOnExpire(
+                ZKClients.retryOnFailure(
+                     new TephraZKClientService(zkQuorumServersString, timeOut, null,
+                             ArrayListMultimap.<String, byte[]>create()), 
+                         RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS))
+                     )
                 );
-        zkClientService.startAndWait();
-        ZKDiscoveryService zkDiscoveryService = new ZKDiscoveryService(zkClientService);
+        txZKClientService.startAndWait();
+        ZKDiscoveryService zkDiscoveryService = new ZKDiscoveryService(txZKClientService);
         PooledClientProvider pooledClientProvider = new PooledClientProvider(
                 config, zkDiscoveryService);
         this.txServiceClient = new TransactionServiceClient(config,pooledClientProvider);
@@ -390,11 +392,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             boolean transactionsEnabled = props.getBoolean(
                     QueryServices.TRANSACTIONS_ENABLED,
                     QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED);
-            // only initialize the tx service client if needed
+            this.connection = HBaseFactoryProvider.getHConnectionFactory().createConnection(this.config);
+            // only initialize the tx service client if needed and if we succeeded in getting a connection
+            // to HBase
             if (transactionsEnabled) {
                 initTxServiceClient();
             }
-            this.connection = HBaseFactoryProvider.getHConnectionFactory().createConnection(this.config);
         } catch (IOException e) {
             throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION)
             .setRootCause(e).build().buildException();
@@ -464,14 +467,20 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             } finally {
                 try {
                     childServices.clear();
-                    if (renewLeaseExecutor != null) {
-                        renewLeaseExecutor.shutdownNow();
-                    }
                     synchronized (latestMetaDataLock) {
                         latestMetaData = null;
                         latestMetaDataLock.notifyAll();
                     }
-                    if (connection != null) connection.close();
+                    try {
+                        // close the HBase connection
+                        if (connection != null) connection.close();
+                    } finally {
+                        if (renewLeaseExecutor != null) {
+                            renewLeaseExecutor.shutdownNow();
+                        }
+                        // shut down the tx client service if we created one to support transactions
+                        if (this.txZKClientService != null) this.txZKClientService.stopAndWait();
+                    }
                 } catch (IOException e) {
                     if (sqlE == null) {
                         sqlE = ServerUtil.parseServerException(e);


[4/6] phoenix git commit: PHOENIX-3563 Ensure we release ZooKeeper resources allocated by the Tephra client embedded in the Phoenix connection

Posted by ap...@apache.org.
PHOENIX-3563 Ensure we release ZooKeeper resources allocated by the Tephra client embedded in the Phoenix connection


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/f34a605a
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/f34a605a
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/f34a605a

Branch: refs/heads/4.9-HBase-1.2
Commit: f34a605a3934cb41bec8e06244992a82fd895b71
Parents: fb504f2
Author: Andrew Purtell <ap...@apache.org>
Authored: Wed Jan 4 16:48:44 2017 -0800
Committer: Andrew Purtell <ap...@apache.org>
Committed: Sat Jan 7 18:53:33 2017 -0800

----------------------------------------------------------------------
 .../query/ConnectionQueryServicesImpl.java      | 37 ++++++++++++--------
 1 file changed, 23 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/f34a605a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index a2b32a8..f82e77f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -254,6 +254,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     private final boolean returnSequenceValues ;
 
     private HConnection connection;
+    private ZKClientService txZKClientService;
     private TransactionServiceClient txServiceClient;
     private volatile boolean initialized;
     private volatile int nSequenceSaltBuckets;
@@ -370,15 +371,16 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
 
         int timeOut = props.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
         // Create instance of the tephra zookeeper client
-        ZKClientService tephraZKClientService = new TephraZKClientService(zkQuorumServersString, timeOut, null, ArrayListMultimap.<String, byte[]>create());
-
-        ZKClientService zkClientService = ZKClientServices.delegate(
-                ZKClients.reWatchOnExpire(
-                        ZKClients.retryOnFailure(tephraZKClientService, RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS))
-                        )
+        txZKClientService = ZKClientServices.delegate(
+            ZKClients.reWatchOnExpire(
+                ZKClients.retryOnFailure(
+                     new TephraZKClientService(zkQuorumServersString, timeOut, null,
+                             ArrayListMultimap.<String, byte[]>create()), 
+                         RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS))
+                     )
                 );
-        zkClientService.startAndWait();
-        ZKDiscoveryService zkDiscoveryService = new ZKDiscoveryService(zkClientService);
+        txZKClientService.startAndWait();
+        ZKDiscoveryService zkDiscoveryService = new ZKDiscoveryService(txZKClientService);
         PooledClientProvider pooledClientProvider = new PooledClientProvider(
                 config, zkDiscoveryService);
         this.txServiceClient = new TransactionServiceClient(config,pooledClientProvider);
@@ -389,11 +391,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             boolean transactionsEnabled = props.getBoolean(
                     QueryServices.TRANSACTIONS_ENABLED,
                     QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED);
-            // only initialize the tx service client if needed
+            this.connection = HBaseFactoryProvider.getHConnectionFactory().createConnection(this.config);
+            // only initialize the tx service client if needed and if we succeeded in getting a connection
+            // to HBase
             if (transactionsEnabled) {
                 initTxServiceClient();
             }
-            this.connection = HBaseFactoryProvider.getHConnectionFactory().createConnection(this.config);
         } catch (IOException e) {
             throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION)
             .setRootCause(e).build().buildException();
@@ -463,14 +466,20 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             } finally {
                 try {
                     childServices.clear();
-                    if (renewLeaseExecutor != null) {
-                        renewLeaseExecutor.shutdownNow();
-                    }
                     synchronized (latestMetaDataLock) {
                         latestMetaData = null;
                         latestMetaDataLock.notifyAll();
                     }
-                    if (connection != null) connection.close();
+                    try {
+                        // close the HBase connection
+                        if (connection != null) connection.close();
+                    } finally {
+                        if (renewLeaseExecutor != null) {
+                            renewLeaseExecutor.shutdownNow();
+                        }
+                        // shut down the tx client service if we created one to support transactions
+                        if (this.txZKClientService != null) this.txZKClientService.stopAndWait();
+                    }
                 } catch (IOException e) {
                     if (sqlE == null) {
                         sqlE = ServerUtil.parseServerException(e);


[3/6] phoenix git commit: PHOENIX-3563 Ensure we release ZooKeeper resources allocated by the Tephra client embedded in the Phoenix connection

Posted by ap...@apache.org.
PHOENIX-3563 Ensure we release ZooKeeper resources allocated by the Tephra client embedded in the Phoenix connection


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b69b177b
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b69b177b
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b69b177b

Branch: refs/heads/4.x-HBase-0.98
Commit: b69b177b3f5e39d1fa1c3300acfed9290cbe5c52
Parents: 91d1478
Author: Andrew Purtell <ap...@apache.org>
Authored: Wed Jan 4 16:48:44 2017 -0800
Committer: Andrew Purtell <ap...@apache.org>
Committed: Sat Jan 7 18:52:59 2017 -0800

----------------------------------------------------------------------
 .../query/ConnectionQueryServicesImpl.java      | 37 ++++++++++++--------
 1 file changed, 23 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/b69b177b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index f1de0bd..c1688c4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -255,6 +255,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     private final boolean returnSequenceValues ;
 
     private HConnection connection;
+    private ZKClientService txZKClientService;
     private TransactionServiceClient txServiceClient;
     private volatile boolean initialized;
     private volatile int nSequenceSaltBuckets;
@@ -371,15 +372,16 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
 
         int timeOut = props.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
         // Create instance of the tephra zookeeper client
-        ZKClientService tephraZKClientService = new TephraZKClientService(zkQuorumServersString, timeOut, null, ArrayListMultimap.<String, byte[]>create());
-
-        ZKClientService zkClientService = ZKClientServices.delegate(
-                ZKClients.reWatchOnExpire(
-                        ZKClients.retryOnFailure(tephraZKClientService, RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS))
-                        )
+        txZKClientService = ZKClientServices.delegate(
+            ZKClients.reWatchOnExpire(
+                ZKClients.retryOnFailure(
+                     new TephraZKClientService(zkQuorumServersString, timeOut, null,
+                             ArrayListMultimap.<String, byte[]>create()), 
+                         RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS))
+                     )
                 );
-        zkClientService.startAndWait();
-        ZKDiscoveryService zkDiscoveryService = new ZKDiscoveryService(zkClientService);
+        txZKClientService.startAndWait();
+        ZKDiscoveryService zkDiscoveryService = new ZKDiscoveryService(txZKClientService);
         PooledClientProvider pooledClientProvider = new PooledClientProvider(
                 config, zkDiscoveryService);
         this.txServiceClient = new TransactionServiceClient(config,pooledClientProvider);
@@ -390,11 +392,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             boolean transactionsEnabled = props.getBoolean(
                     QueryServices.TRANSACTIONS_ENABLED,
                     QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED);
-            // only initialize the tx service client if needed
+            this.connection = HBaseFactoryProvider.getHConnectionFactory().createConnection(this.config);
+            // only initialize the tx service client if needed and if we succeeded in getting a connection
+            // to HBase
             if (transactionsEnabled) {
                 initTxServiceClient();
             }
-            this.connection = HBaseFactoryProvider.getHConnectionFactory().createConnection(this.config);
         } catch (IOException e) {
             throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION)
             .setRootCause(e).build().buildException();
@@ -464,14 +467,20 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             } finally {
                 try {
                     childServices.clear();
-                    if (renewLeaseExecutor != null) {
-                        renewLeaseExecutor.shutdownNow();
-                    }
                     synchronized (latestMetaDataLock) {
                         latestMetaData = null;
                         latestMetaDataLock.notifyAll();
                     }
-                    if (connection != null) connection.close();
+                    try {
+                        // close the HBase connection
+                        if (connection != null) connection.close();
+                    } finally {
+                        if (renewLeaseExecutor != null) {
+                            renewLeaseExecutor.shutdownNow();
+                        }
+                        // shut down the tx client service if we created one to support transactions
+                        if (this.txZKClientService != null) this.txZKClientService.stopAndWait();
+                    }
                 } catch (IOException e) {
                     if (sqlE == null) {
                         sqlE = ServerUtil.parseServerException(e);