You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ap...@apache.org on 2017/01/08 08:05:07 UTC
[3/6] phoenix git commit: PHOENIX-3563 Ensure we release ZooKeeper
resources allocated by the Tephra client embedded in the Phoenix connection
PHOENIX-3563 Ensure we release ZooKeeper resources allocated by the Tephra client embedded in the Phoenix connection
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b69b177b
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b69b177b
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b69b177b
Branch: refs/heads/4.x-HBase-0.98
Commit: b69b177b3f5e39d1fa1c3300acfed9290cbe5c52
Parents: 91d1478
Author: Andrew Purtell <ap...@apache.org>
Authored: Wed Jan 4 16:48:44 2017 -0800
Committer: Andrew Purtell <ap...@apache.org>
Committed: Sat Jan 7 18:52:59 2017 -0800
----------------------------------------------------------------------
.../query/ConnectionQueryServicesImpl.java | 37 ++++++++++++--------
1 file changed, 23 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b69b177b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index f1de0bd..c1688c4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -255,6 +255,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
private final boolean returnSequenceValues ;
private HConnection connection;
+ private ZKClientService txZKClientService;
private TransactionServiceClient txServiceClient;
private volatile boolean initialized;
private volatile int nSequenceSaltBuckets;
@@ -371,15 +372,16 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
int timeOut = props.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
// Create instance of the tephra zookeeper client
- ZKClientService tephraZKClientService = new TephraZKClientService(zkQuorumServersString, timeOut, null, ArrayListMultimap.<String, byte[]>create());
-
- ZKClientService zkClientService = ZKClientServices.delegate(
- ZKClients.reWatchOnExpire(
- ZKClients.retryOnFailure(tephraZKClientService, RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS))
- )
+ txZKClientService = ZKClientServices.delegate(
+ ZKClients.reWatchOnExpire(
+ ZKClients.retryOnFailure(
+ new TephraZKClientService(zkQuorumServersString, timeOut, null,
+ ArrayListMultimap.<String, byte[]>create()),
+ RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS))
+ )
);
- zkClientService.startAndWait();
- ZKDiscoveryService zkDiscoveryService = new ZKDiscoveryService(zkClientService);
+ txZKClientService.startAndWait();
+ ZKDiscoveryService zkDiscoveryService = new ZKDiscoveryService(txZKClientService);
PooledClientProvider pooledClientProvider = new PooledClientProvider(
config, zkDiscoveryService);
this.txServiceClient = new TransactionServiceClient(config,pooledClientProvider);
@@ -390,11 +392,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
boolean transactionsEnabled = props.getBoolean(
QueryServices.TRANSACTIONS_ENABLED,
QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED);
- // only initialize the tx service client if needed
+ this.connection = HBaseFactoryProvider.getHConnectionFactory().createConnection(this.config);
+ // only initialize the tx service client if needed and if we succeeded in getting a connection
+ // to HBase
if (transactionsEnabled) {
initTxServiceClient();
}
- this.connection = HBaseFactoryProvider.getHConnectionFactory().createConnection(this.config);
} catch (IOException e) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION)
.setRootCause(e).build().buildException();
@@ -464,14 +467,20 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
} finally {
try {
childServices.clear();
- if (renewLeaseExecutor != null) {
- renewLeaseExecutor.shutdownNow();
- }
synchronized (latestMetaDataLock) {
latestMetaData = null;
latestMetaDataLock.notifyAll();
}
- if (connection != null) connection.close();
+ try {
+ // close the HBase connection
+ if (connection != null) connection.close();
+ } finally {
+ if (renewLeaseExecutor != null) {
+ renewLeaseExecutor.shutdownNow();
+ }
+ // shut down the tx client service if we created one to support transactions
+ if (this.txZKClientService != null) this.txZKClientService.stopAndWait();
+ }
} catch (IOException e) {
if (sqlE == null) {
sqlE = ServerUtil.parseServerException(e);