You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2014/01/07 03:05:21 UTC
[1/5] git commit: ACCUMULO-2128 added waitForZooKeeperClientThreads
method
Updated Branches:
refs/heads/1.6.0-SNAPSHOT ee3ccb82d -> 9c092cadd
ACCUMULO-2128 added waitForZooKeeperClientThreads method
Signed-off-by: Keith Turner <kt...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/c94a73f4
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/c94a73f4
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/c94a73f4
Branch: refs/heads/1.6.0-SNAPSHOT
Commit: c94a73f478c91a24e35583d41bb39102461c54fa
Parents: 715825b
Author: Jared Winick <ja...@koverse.com>
Authored: Thu Jan 2 22:30:59 2014 -0700
Committer: Keith Turner <kt...@apache.org>
Committed: Mon Jan 6 20:17:58 2014 -0500
----------------------------------------------------------------------
.../org/apache/accumulo/core/util/CleanUp.java | 32 ++++++++++++++++++--
1 file changed, 30 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c94a73f4/src/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java b/src/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java
index ba02f0b..5b2a4b9 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java
@@ -16,20 +16,48 @@
*/
package org.apache.accumulo.core.util;
+import java.util.Set;
+
import org.apache.accumulo.core.client.impl.ThriftTransportPool;
import org.apache.accumulo.core.zookeeper.ZooSession;
+import org.apache.log4j.Logger;
/**
*
*/
public class CleanUp {
+
+ private static final Logger log = Logger.getLogger(CleanUp.class);
+
/**
* kills all threads created by internal Accumulo singleton resources. After this method is called, no accumulo client will work in the current classloader.
*/
public static void shutdownNow() {
ThriftTransportPool.getInstance().shutdown();
ZooSession.shutdown();
- // need to get code from jared w
- // waitForZooKeeperClientThreads();
+ waitForZooKeeperClientThreads();
+ }
+
+ /**
+ * As documented in https://issues.apache.org/jira/browse/ZOOKEEPER-1816, ZooKeeper.close()
+ * is a non-blocking call. This method will wait on the ZooKeeper internal threads to exit.
+ */
+ private static void waitForZooKeeperClientThreads() {
+ Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
+ for (Thread thread : threadSet) {
+ // find ZooKeeper threads that were created in the same ClassLoader as the current thread.
+ if (thread.getClass().getName().startsWith("org.apache.zookeeper.ClientCnxn") &&
+ thread.getContextClassLoader().equals(Thread.currentThread().getContextClassLoader())) {
+
+ // wait for the thread the die
+ while (thread.isAlive()) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ log.error(e.getMessage(), e);
+ }
+ }
+ }
+ }
}
}
[5/5] git commit: Merge remote-tracking branch
'origin/1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT
Posted by kt...@apache.org.
Merge remote-tracking branch 'origin/1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT
Conflicts:
core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/9c092cad
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/9c092cad
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/9c092cad
Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 9c092cadde75250fa84e049eeae33687636dbc99
Parents: ee3ccb8 400b991
Author: Keith Turner <kt...@apache.org>
Authored: Mon Jan 6 21:11:02 2014 -0500
Committer: Keith Turner <kt...@apache.org>
Committed: Mon Jan 6 21:11:02 2014 -0500
----------------------------------------------------------------------
.../core/client/impl/ThriftTransportPool.java | 100 +++++++++++---
.../org/apache/accumulo/core/util/CleanUp.java | 63 +++++++++
.../accumulo/fate/zookeeper/ZooSession.java | 23 +++-
.../accumulo/test/functional/CleanUpIT.java | 137 +++++++++++++++++++
4 files changed, 304 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9c092cad/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
index a553cc1,f123289..a5fecd5
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
@@@ -357,35 -376,20 +375,35 @@@ public class ThriftTransportPool
private ThriftTransportPool() {}
- public TTransport getTransport(String location, int port) throws TTransportException {
- return getTransport(location, port, 0);
- }
-
- public TTransport getTransportWithDefaultTimeout(InetSocketAddress addr, AccumuloConfiguration conf) throws TTransportException {
- return getTransport(addr.getAddress().getHostAddress(), addr.getPort(), conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
+ public TTransport getTransportWithDefaultTimeout(HostAndPort addr, AccumuloConfiguration conf) throws TTransportException {
+ return getTransport(String.format("%s:%d", addr.getHostText(), addr.getPort()), conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT), SslConnectionParams.forClient(conf));
}
- public TTransport getTransport(InetSocketAddress addr, long timeout) throws TTransportException {
- return getTransport(addr.getAddress().getHostAddress(), addr.getPort(), timeout);
+ public TTransport getTransport(String location, long milliseconds, SslConnectionParams sslParams) throws TTransportException {
+ return getTransport(new ThriftTransportKey(location, milliseconds, sslParams));
}
- public TTransport getTransportWithDefaultTimeout(String location, int port, AccumuloConfiguration conf) throws TTransportException {
- return getTransport(location, port, conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
+ private TTransport getTransport(ThriftTransportKey cacheKey) throws TTransportException {
+ synchronized (this) {
+ // atomically reserve location if it exist in cache
- List<CachedConnection> ccl = cache.get(cacheKey);
++ List<CachedConnection> ccl = getCache().get(cacheKey);
+
+ if (ccl == null) {
+ ccl = new LinkedList<CachedConnection>();
- cache.put(cacheKey, ccl);
++ getCache().put(cacheKey, ccl);
+ }
+
+ for (CachedConnection cachedConnection : ccl) {
+ if (!cachedConnection.isReserved()) {
+ cachedConnection.setReserved(true);
+ if (log.isTraceEnabled())
+ log.trace("Using existing connection to " + cacheKey.getLocation() + ":" + cacheKey.getPort());
+ return cachedConnection.transport;
+ }
+ }
+ }
+
+ return createNewTransport(cacheKey);
}
Pair<String,TTransport> getAnyTransport(List<ThriftTransportKey> servers, boolean preferCachedConnection) throws TTransportException {
@@@ -484,9 -531,9 +507,9 @@@
CachedTTransport ctsc = (CachedTTransport) tsc;
ArrayList<CachedConnection> closeList = new ArrayList<ThriftTransportPool.CachedConnection>();
-
+
synchronized (this) {
- List<CachedConnection> ccl = cache.get(ctsc.getCacheKey());
+ List<CachedConnection> ccl = getCache().get(ctsc.getCacheKey());
for (Iterator<CachedConnection> iterator = ccl.iterator(); iterator.hasNext();) {
CachedConnection cachedConnection = iterator.next();
if (cachedConnection.transport == tsc) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9c092cad/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9c092cad/test/src/test/java/org/apache/accumulo/test/functional/CleanUpIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/functional/CleanUpIT.java
index 0000000,0000000..ad5b2fd
new file mode 100644
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/functional/CleanUpIT.java
@@@ -1,0 -1,0 +1,137 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.accumulo.test.functional;
++
++import java.util.Iterator;
++import java.util.Map.Entry;
++import java.util.Set;
++
++import org.apache.accumulo.core.client.BatchWriter;
++import org.apache.accumulo.core.client.BatchWriterConfig;
++import org.apache.accumulo.core.client.Scanner;
++import org.apache.accumulo.core.data.Key;
++import org.apache.accumulo.core.data.Mutation;
++import org.apache.accumulo.core.data.Value;
++import org.apache.accumulo.core.security.Authorizations;
++import org.apache.accumulo.core.util.CleanUp;
++import org.junit.Test;
++
++/**
++ *
++ */
++public class CleanUpIT extends SimpleMacIT {
++ @Test(timeout = 30 * 1000)
++ public void run() throws Exception {
++
++ String tableName = getTableNames(1)[0];
++ getConnector().tableOperations().create(tableName);
++
++ BatchWriter bw = getConnector().createBatchWriter(tableName, new BatchWriterConfig());
++
++ Mutation m1 = new Mutation("r1");
++ m1.put("cf1", "cq1", 1, "5");
++
++ bw.addMutation(m1);
++
++ bw.flush();
++
++ Scanner scanner = getConnector().createScanner(tableName, new Authorizations());
++
++ int count = 0;
++ for (Entry<Key,Value> entry : scanner) {
++ count++;
++ if (!entry.getValue().toString().equals("5")) {
++ throw new Exception("Unexpected value " + entry.getValue());
++ }
++ }
++
++ if (count != 1) {
++ throw new Exception("Unexpected count " + count);
++ }
++
++ if (countThreads() < 2) {
++ printThreadNames();
++ throw new Exception("Not seeing expected threads");
++ }
++
++ CleanUp.shutdownNow();
++
++ Mutation m2 = new Mutation("r2");
++ m2.put("cf1", "cq1", 1, "6");
++
++ try {
++ bw.addMutation(m1);
++ bw.flush();
++ throw new Exception("batch writer did not fail");
++ } catch (Exception e) {
++
++ }
++
++ try {
++ // expect this to fail also, want to clean up batch writer threads
++ bw.close();
++ throw new Exception("batch writer close not fail");
++ } catch (Exception e) {
++
++ }
++
++ try {
++ count = 0;
++ Iterator<Entry<Key,Value>> iter = scanner.iterator();
++ while (iter.hasNext()) {
++ iter.next();
++ count++;
++ }
++ throw new Exception("scanner did not fail");
++ } catch (Exception e) {
++
++ }
++
++ if (countThreads() > 0) {
++ printThreadNames();
++ throw new Exception("Threads did not go away");
++ }
++ }
++
++ private void printThreadNames() {
++ Set<Thread> threads = Thread.getAllStackTraces().keySet();
++ for (Thread thread : threads) {
++ System.out.println("thread name:" + thread.getName());
++ thread.getStackTrace();
++
++ }
++ }
++
++ /**
++ * count threads that should be cleaned up
++ *
++ */
++ private int countThreads() {
++ int count = 0;
++ Set<Thread> threads = Thread.getAllStackTraces().keySet();
++ for (Thread thread : threads) {
++
++ if (thread.getName().toLowerCase().contains("sendthread") || thread.getName().toLowerCase().contains("eventthread"))
++ count++;
++
++ if (thread.getName().toLowerCase().contains("thrift") && thread.getName().toLowerCase().contains("pool"))
++ count++;
++ }
++
++ return count;
++ }
++}
[4/5] git commit: Merge branch '1.4.5-SNAPSHOT' into 1.5.1-SNAPSHOT
Posted by kt...@apache.org.
Merge branch '1.4.5-SNAPSHOT' into 1.5.1-SNAPSHOT
Conflicts:
core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/400b991f
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/400b991f
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/400b991f
Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 400b991fb80114f6672e18496f1b2359b2e22c3d
Parents: a91ee4d 8f9fe41
Author: Keith Turner <kt...@apache.org>
Authored: Mon Jan 6 20:48:26 2014 -0500
Committer: Keith Turner <kt...@apache.org>
Committed: Mon Jan 6 20:48:26 2014 -0500
----------------------------------------------------------------------
.../core/client/impl/ThriftTransportPool.java | 100 +++++++++---
.../org/apache/accumulo/core/util/CleanUp.java | 63 ++++++++
.../accumulo/fate/zookeeper/ZooSession.java | 23 ++-
.../accumulo/test/functional/CleanUpTest.java | 153 +++++++++++++++++++
test/system/auto/simple/cleanup.py | 30 ++++
5 files changed, 350 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/400b991f/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
index ceeab21,0000000..f123289
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
@@@ -1,607 -1,0 +1,671 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.SecurityPermission;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
++import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.TTimeoutTransport;
+import org.apache.accumulo.core.util.ThriftUtil;
+import org.apache.log4j.Logger;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+public class ThriftTransportPool {
+ private static SecurityPermission TRANSPORT_POOL_PERMISSION = new SecurityPermission("transportPoolPermission");
+
+ private static final Random random = new Random();
+ private long killTime = 1000 * 3;
+
+ private Map<ThriftTransportKey,List<CachedConnection>> cache = new HashMap<ThriftTransportKey,List<CachedConnection>>();
+ private Map<ThriftTransportKey,Long> errorCount = new HashMap<ThriftTransportKey,Long>();
+ private Map<ThriftTransportKey,Long> errorTime = new HashMap<ThriftTransportKey,Long>();
+ private Set<ThriftTransportKey> serversWarnedAbout = new HashSet<ThriftTransportKey>();
++
++ private CountDownLatch closerExitLatch;
+
+ private static final Logger log = Logger.getLogger(ThriftTransportPool.class);
+
+ private static final Long ERROR_THRESHOLD = 20l;
+ private static final int STUCK_THRESHOLD = 2 * 60 * 1000;
+
+ private static class CachedConnection {
+
+ public CachedConnection(CachedTTransport t) {
+ this.transport = t;
+ }
+
+ void setReserved(boolean reserved) {
+ this.transport.setReserved(reserved);
+ }
+
+ boolean isReserved() {
+ return this.transport.reserved;
+ }
+
+ CachedTTransport transport;
+
+ long lastReturnTime;
+ }
+
++ public static class TransportPoolShutdownException extends RuntimeException {
++ private static final long serialVersionUID = 1L;
++ }
++
+ private static class Closer implements Runnable {
+ final ThriftTransportPool pool;
++ private CountDownLatch closerExitLatch;
+
- public Closer(ThriftTransportPool pool) {
++ public Closer(ThriftTransportPool pool, CountDownLatch closerExitLatch) {
+ this.pool = pool;
++ this.closerExitLatch = closerExitLatch;
+ }
+
- public void run() {
++ private void closeConnections() {
+ while (true) {
+
+ ArrayList<CachedConnection> connectionsToClose = new ArrayList<CachedConnection>();
+
+ synchronized (pool) {
- for (List<CachedConnection> ccl : pool.cache.values()) {
++ for (List<CachedConnection> ccl : pool.getCache().values()) {
+ Iterator<CachedConnection> iter = ccl.iterator();
+ while (iter.hasNext()) {
+ CachedConnection cachedConnection = iter.next();
+
+ if (!cachedConnection.isReserved() && System.currentTimeMillis() - cachedConnection.lastReturnTime > pool.killTime) {
+ connectionsToClose.add(cachedConnection);
+ iter.remove();
+ }
+ }
+ }
+
- for (List<CachedConnection> ccl : pool.cache.values()) {
++ for (List<CachedConnection> ccl : pool.getCache().values()) {
+ for (CachedConnection cachedConnection : ccl) {
+ cachedConnection.transport.checkForStuckIO(STUCK_THRESHOLD);
+ }
+ }
+
+ Iterator<Entry<ThriftTransportKey,Long>> iter = pool.errorTime.entrySet().iterator();
+ while (iter.hasNext()) {
+ Entry<ThriftTransportKey,Long> entry = iter.next();
+ long delta = System.currentTimeMillis() - entry.getValue();
+ if (delta >= STUCK_THRESHOLD) {
+ pool.errorCount.remove(entry.getKey());
+ iter.remove();
+ }
+ }
+ }
+
+ // close connections outside of sync block
+ for (CachedConnection cachedConnection : connectionsToClose) {
+ cachedConnection.transport.close();
+ }
+
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
++
++ public void run() {
++ try {
++ closeConnections();
++ } catch (TransportPoolShutdownException e) {
++ } finally {
++ closerExitLatch.countDown();
++ }
++ }
+ }
+
+ static class CachedTTransport extends TTransport {
+
+ private ThriftTransportKey cacheKey;
+ private TTransport wrappedTransport;
+ private boolean sawError = false;
+
+ private volatile String ioThreadName = null;
+ private volatile long ioStartTime = 0;
+ private volatile boolean reserved = false;
+
+ private String stuckThreadName = null;
+
+ int ioCount = 0;
+ int lastIoCount = -1;
+
+ private void sawError(Exception e) {
+ sawError = true;
+ }
+
+ final void setReserved(boolean reserved) {
+ this.reserved = reserved;
+ if (reserved) {
+ ioThreadName = Thread.currentThread().getName();
+ ioCount = 0;
+ lastIoCount = -1;
+ } else {
+ if ((ioCount & 1) == 1) {
+ // connection unreserved, but it seems io may still be
+ // happening
+ log.warn("Connection returned to thrift connection pool that may still be in use " + ioThreadName + " " + Thread.currentThread().getName(),
+ new Exception());
+ }
+
+ ioCount = 0;
+ lastIoCount = -1;
+ ioThreadName = null;
+ }
+ checkForStuckIO(STUCK_THRESHOLD);
+ }
+
+ final void checkForStuckIO(long threshold) {
+ /*
+ * checking for stuck io needs to be light weight.
+ *
+ * Tried to call System.currentTimeMillis() and Thread.currentThread() before every io operation.... this dramatically slowed things down. So switched to
+ * incrementing a counter before and after each io operation.
+ */
+
+ if ((ioCount & 1) == 1) {
+ // when ioCount is odd, it means I/O is currently happening
+ if (ioCount == lastIoCount) {
+ // still doing same I/O operation as last time this
+ // functions was called
+ long delta = System.currentTimeMillis() - ioStartTime;
+ if (delta >= threshold && stuckThreadName == null) {
+ stuckThreadName = ioThreadName;
+ log.warn("Thread \"" + ioThreadName + "\" stuck on IO to " + cacheKey + " for at least " + delta + " ms");
+ }
+ } else {
+ // remember this ioCount and the time we saw it, need to see
+ // if it changes
+ lastIoCount = ioCount;
+ ioStartTime = System.currentTimeMillis();
+
+ if (stuckThreadName != null) {
+ // doing I/O, but ioCount changed so no longer stuck
+ log.info("Thread \"" + stuckThreadName + "\" no longer stuck on IO to " + cacheKey + " sawError = " + sawError);
+ stuckThreadName = null;
+ }
+ }
+ } else {
+ // I/O is not currently happening
+ if (stuckThreadName != null) {
+ // no longer stuck, and was stuck in the past
+ log.info("Thread \"" + stuckThreadName + "\" no longer stuck on IO to " + cacheKey + " sawError = " + sawError);
+ stuckThreadName = null;
+ }
+ }
+ }
+
+ public CachedTTransport(TTransport transport, ThriftTransportKey cacheKey2) {
+ this.wrappedTransport = transport;
+ this.cacheKey = cacheKey2;
+ }
+
+ public boolean isOpen() {
+ return wrappedTransport.isOpen();
+ }
+
+ public void open() throws TTransportException {
+ try {
+ ioCount++;
+ wrappedTransport.open();
+ } catch (TTransportException tte) {
+ sawError(tte);
+ throw tte;
+ } finally {
+ ioCount++;
+ }
+ }
+
+ public int read(byte[] arg0, int arg1, int arg2) throws TTransportException {
+ try {
+ ioCount++;
+ return wrappedTransport.read(arg0, arg1, arg2);
+ } catch (TTransportException tte) {
+ sawError(tte);
+ throw tte;
+ } finally {
+ ioCount++;
+ }
+ }
+
+ public int readAll(byte[] arg0, int arg1, int arg2) throws TTransportException {
+ try {
+ ioCount++;
+ return wrappedTransport.readAll(arg0, arg1, arg2);
+ } catch (TTransportException tte) {
+ sawError(tte);
+ throw tte;
+ } finally {
+ ioCount++;
+ }
+ }
+
+ public void write(byte[] arg0, int arg1, int arg2) throws TTransportException {
+ try {
+ ioCount++;
+ wrappedTransport.write(arg0, arg1, arg2);
+ } catch (TTransportException tte) {
+ sawError(tte);
+ throw tte;
+ } finally {
+ ioCount++;
+ }
+ }
+
+ public void write(byte[] arg0) throws TTransportException {
+ try {
+ ioCount++;
+ wrappedTransport.write(arg0);
+ } catch (TTransportException tte) {
+ sawError(tte);
+ throw tte;
+ } finally {
+ ioCount++;
+ }
+ }
+
+ public void close() {
+ try {
+ ioCount++;
+ wrappedTransport.close();
+ } finally {
+ ioCount++;
+ }
+
+ }
+
+ public void flush() throws TTransportException {
+ try {
+ ioCount++;
+ wrappedTransport.flush();
+ } catch (TTransportException tte) {
+ sawError(tte);
+ throw tte;
+ } finally {
+ ioCount++;
+ }
+ }
+
+ public boolean peek() {
+ try {
+ ioCount++;
+ return wrappedTransport.peek();
+ } finally {
+ ioCount++;
+ }
+ }
+
+ public byte[] getBuffer() {
+ try {
+ ioCount++;
+ return wrappedTransport.getBuffer();
+ } finally {
+ ioCount++;
+ }
+ }
+
+ public int getBufferPosition() {
+ try {
+ ioCount++;
+ return wrappedTransport.getBufferPosition();
+ } finally {
+ ioCount++;
+ }
+ }
+
+ public int getBytesRemainingInBuffer() {
+ try {
+ ioCount++;
+ return wrappedTransport.getBytesRemainingInBuffer();
+ } finally {
+ ioCount++;
+ }
+ }
+
+ public void consumeBuffer(int len) {
+ try {
+ ioCount++;
+ wrappedTransport.consumeBuffer(len);
+ } finally {
+ ioCount++;
+ }
+ }
+
+ public ThriftTransportKey getCacheKey() {
+ return cacheKey;
+ }
+
+ }
+
+ private ThriftTransportPool() {}
+
+ public TTransport getTransport(String location, int port) throws TTransportException {
+ return getTransport(location, port, 0);
+ }
+
+ public TTransport getTransportWithDefaultTimeout(InetSocketAddress addr, AccumuloConfiguration conf) throws TTransportException {
+ return getTransport(addr.getAddress().getHostAddress(), addr.getPort(), conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
+ }
+
+ public TTransport getTransport(InetSocketAddress addr, long timeout) throws TTransportException {
+ return getTransport(addr.getAddress().getHostAddress(), addr.getPort(), timeout);
+ }
+
+ public TTransport getTransportWithDefaultTimeout(String location, int port, AccumuloConfiguration conf) throws TTransportException {
+ return getTransport(location, port, conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
+ }
+
+ Pair<String,TTransport> getAnyTransport(List<ThriftTransportKey> servers, boolean preferCachedConnection) throws TTransportException {
+
+ servers = new ArrayList<ThriftTransportKey>(servers);
+
+ if (preferCachedConnection) {
+ HashSet<ThriftTransportKey> serversSet = new HashSet<ThriftTransportKey>(servers);
+
+ synchronized (this) {
+
+ // randomly pick a server from the connection cache
- serversSet.retainAll(cache.keySet());
++ serversSet.retainAll(getCache().keySet());
+
+ if (serversSet.size() > 0) {
+ ArrayList<ThriftTransportKey> cachedServers = new ArrayList<ThriftTransportKey>(serversSet);
+ Collections.shuffle(cachedServers, random);
+
+ for (ThriftTransportKey ttk : cachedServers) {
- for (CachedConnection cachedConnection : cache.get(ttk)) {
++ for (CachedConnection cachedConnection : getCache().get(ttk)) {
+ if (!cachedConnection.isReserved()) {
+ cachedConnection.setReserved(true);
+ if (log.isTraceEnabled())
+ log.trace("Using existing connection to " + ttk.getLocation() + ":" + ttk.getPort());
+ return new Pair<String,TTransport>(ttk.getLocation() + ":" + ttk.getPort(), cachedConnection.transport);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ int retryCount = 0;
+ while (servers.size() > 0 && retryCount < 10) {
+ int index = random.nextInt(servers.size());
+ ThriftTransportKey ttk = servers.get(index);
+
+ if (!preferCachedConnection) {
+ synchronized (this) {
- List<CachedConnection> cachedConnList = cache.get(ttk);
++ List<CachedConnection> cachedConnList = getCache().get(ttk);
+ if (cachedConnList != null) {
+ for (CachedConnection cachedConnection : cachedConnList) {
+ if (!cachedConnection.isReserved()) {
+ cachedConnection.setReserved(true);
+ if (log.isTraceEnabled())
+ log.trace("Using existing connection to " + ttk.getLocation() + ":" + ttk.getPort() + " timeout " + ttk.getTimeout());
+ return new Pair<String,TTransport>(ttk.getLocation() + ":" + ttk.getPort(), cachedConnection.transport);
+ }
+ }
+ }
+ }
+ }
+
+ try {
+ return new Pair<String,TTransport>(ttk.getLocation() + ":" + ttk.getPort(), createNewTransport(ttk));
+ } catch (TTransportException tte) {
+ log.debug("Failed to connect to " + servers.get(index), tte);
+ servers.remove(index);
+ retryCount++;
+ }
+ }
+
+ throw new TTransportException("Failed to connect to a server");
+ }
+
+ public TTransport getTransport(String location, int port, long milliseconds) throws TTransportException {
+ return getTransport(new ThriftTransportKey(location, port, milliseconds));
+ }
+
+ private TTransport getTransport(ThriftTransportKey cacheKey) throws TTransportException {
+ synchronized (this) {
+ // atomically reserve location if it exist in cache
- List<CachedConnection> ccl = cache.get(cacheKey);
++ List<CachedConnection> ccl = getCache().get(cacheKey);
+
+ if (ccl == null) {
+ ccl = new LinkedList<CachedConnection>();
- cache.put(cacheKey, ccl);
++ getCache().put(cacheKey, ccl);
+ }
+
+ for (CachedConnection cachedConnection : ccl) {
+ if (!cachedConnection.isReserved()) {
+ cachedConnection.setReserved(true);
+ if (log.isTraceEnabled())
+ log.trace("Using existing connection to " + cacheKey.getLocation() + ":" + cacheKey.getPort());
+ return cachedConnection.transport;
+ }
+ }
+ }
+
+ return createNewTransport(cacheKey);
+ }
+
+ private TTransport createNewTransport(ThriftTransportKey cacheKey) throws TTransportException {
+ TTransport transport;
+ if (cacheKey.getTimeout() == 0) {
+ transport = AddressUtil.createTSocket(cacheKey.getLocation(), cacheKey.getPort());
+ } else {
+ try {
+ transport = TTimeoutTransport.create(AddressUtil.parseAddress(cacheKey.getLocation(), cacheKey.getPort()), cacheKey.getTimeout());
+ } catch (IOException ex) {
+ throw new TTransportException(ex);
+ }
+ }
+ transport = ThriftUtil.transportFactory().getTransport(transport);
+ transport.open();
+
+ if (log.isTraceEnabled())
+ log.trace("Creating new connection to connection to " + cacheKey.getLocation() + ":" + cacheKey.getPort());
+
+ CachedTTransport tsc = new CachedTTransport(transport, cacheKey);
+
+ CachedConnection cc = new CachedConnection(tsc);
+ cc.setReserved(true);
+
- synchronized (this) {
- List<CachedConnection> ccl = cache.get(cacheKey);
++ try {
++ synchronized (this) {
++ List<CachedConnection> ccl = getCache().get(cacheKey);
++
++ if (ccl == null) {
++ ccl = new LinkedList<CachedConnection>();
++ getCache().put(cacheKey, ccl);
++ }
+
- if (ccl == null) {
- ccl = new LinkedList<CachedConnection>();
- cache.put(cacheKey, ccl);
++ ccl.add(cc);
+ }
-
- ccl.add(cc);
++ } catch (TransportPoolShutdownException e) {
++ cc.transport.close();
++ throw e;
+ }
+ return cc.transport;
+ }
+
+ public void returnTransport(TTransport tsc) {
+ if (tsc == null) {
+ return;
+ }
+
+ boolean existInCache = false;
+ CachedTTransport ctsc = (CachedTTransport) tsc;
+
+ ArrayList<CachedConnection> closeList = new ArrayList<ThriftTransportPool.CachedConnection>();
+
+ synchronized (this) {
- List<CachedConnection> ccl = cache.get(ctsc.getCacheKey());
++ List<CachedConnection> ccl = getCache().get(ctsc.getCacheKey());
+ for (Iterator<CachedConnection> iterator = ccl.iterator(); iterator.hasNext();) {
+ CachedConnection cachedConnection = iterator.next();
+ if (cachedConnection.transport == tsc) {
+ if (ctsc.sawError) {
+ closeList.add(cachedConnection);
+ iterator.remove();
+
+ if (log.isTraceEnabled())
+ log.trace("Returned connection had error " + ctsc.getCacheKey());
+
+ Long ecount = errorCount.get(ctsc.getCacheKey());
+ if (ecount == null)
+ ecount = 0l;
+ ecount++;
+ errorCount.put(ctsc.getCacheKey(), ecount);
+
+ Long etime = errorTime.get(ctsc.getCacheKey());
+ if (etime == null) {
+ errorTime.put(ctsc.getCacheKey(), System.currentTimeMillis());
+ }
+
+ if (ecount >= ERROR_THRESHOLD && !serversWarnedAbout.contains(ctsc.getCacheKey())) {
+ log.warn("Server " + ctsc.getCacheKey() + " had " + ecount + " failures in a short time period, will not complain anymore ");
+ serversWarnedAbout.add(ctsc.getCacheKey());
+ }
+
+ cachedConnection.setReserved(false);
+
+ } else {
+
+ if (log.isTraceEnabled())
+ log.trace("Returned connection " + ctsc.getCacheKey() + " ioCount : " + cachedConnection.transport.ioCount);
+
+ cachedConnection.lastReturnTime = System.currentTimeMillis();
+ cachedConnection.setReserved(false);
+ }
+ existInCache = true;
+ break;
+ }
+ }
+
+ // remove all unreserved cached connection when a sever has an error, not just the connection that was returned
+ if (ctsc.sawError) {
+ for (Iterator<CachedConnection> iterator = ccl.iterator(); iterator.hasNext();) {
+ CachedConnection cachedConnection = iterator.next();
+ if (!cachedConnection.isReserved()) {
+ closeList.add(cachedConnection);
+ iterator.remove();
+ }
+ }
+ }
+ }
+
+ // close outside of sync block
+ for (CachedConnection cachedConnection : closeList) {
+ try {
+ cachedConnection.transport.close();
+ } catch (Exception e) {
+ log.debug("Failed to close connection w/ errors", e);
+ }
+ }
+
+ if (!existInCache) {
+ log.warn("Returned tablet server connection to cache that did not come from cache");
+ // close outside of sync block
+ tsc.close();
+ }
+ }
+
+ /**
+ * Set the time after which idle connections should be closed
+ *
+ * @param time
+ */
+ public synchronized void setIdleTime(long time) {
+ this.killTime = time;
+ log.debug("Set thrift transport pool idle time to " + time);
+ }
+
+ private static ThriftTransportPool instance = new ThriftTransportPool();
+ private static final AtomicBoolean daemonStarted = new AtomicBoolean(false);
+
+ public static ThriftTransportPool getInstance() {
+ SecurityManager sm = System.getSecurityManager();
+ if (sm != null) {
+ sm.checkPermission(TRANSPORT_POOL_PERMISSION);
+ }
+
+ if (daemonStarted.compareAndSet(false, true)) {
- new Daemon(new Closer(instance), "Thrift Connection Pool Checker").start();
++ CountDownLatch closerExitLatch = new CountDownLatch(1);
++ new Daemon(new Closer(instance, closerExitLatch), "Thrift Connection Pool Checker").start();
++ instance.setCloserExitLatch(closerExitLatch);
+ }
+ return instance;
+ }
++
++ private synchronized void setCloserExitLatch(CountDownLatch closerExitLatch) {
++ this.closerExitLatch = closerExitLatch;
++ }
++
++ public void shutdown() {
++ synchronized (this) {
++ if (cache == null)
++ return;
++
++ // close any connections in the pool... even ones that are in use
++ for (List<CachedConnection> ccl : getCache().values()) {
++ Iterator<CachedConnection> iter = ccl.iterator();
++ while (iter.hasNext()) {
++ CachedConnection cc = iter.next();
++ try {
++ cc.transport.close();
++ } catch (Exception e) {
++ log.debug("Error closing transport during shutdown", e);
++ }
++ }
++ }
++
++ // this will render the pool unusable and cause the background thread to exit
++ this.cache = null;
++ }
++
++ try {
++ closerExitLatch.await();
++ } catch (InterruptedException e) {
++ throw new RuntimeException(e);
++ }
++ }
++
++ private Map<ThriftTransportKey,List<CachedConnection>> getCache() {
++ if (cache == null)
++ throw new TransportPoolShutdownException();
++ return cache;
++ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/400b991f/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/util/CleanUp.java
index 0000000,0000000..be5a41a
new file mode 100644
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java
@@@ -1,0 -1,0 +1,63 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.accumulo.core.util;
++
++import java.util.Set;
++
++import org.apache.accumulo.core.client.impl.ThriftTransportPool;
++import org.apache.accumulo.fate.zookeeper.ZooSession;
++import org.apache.log4j.Logger;
++
++/**
++ *
++ */
++public class CleanUp {
++
++ private static final Logger log = Logger.getLogger(CleanUp.class);
++
++ /**
++ * kills all threads created by internal Accumulo singleton resources. After this method is called, no accumulo client will work in the current classloader.
++ */
++ public static void shutdownNow() {
++ ThriftTransportPool.getInstance().shutdown();
++ ZooSession.shutdown();
++ waitForZooKeeperClientThreads();
++ }
++
++ /**
++ * As documented in https://issues.apache.org/jira/browse/ZOOKEEPER-1816, ZooKeeper.close()
++ * is a non-blocking call. This method will wait on the ZooKeeper internal threads to exit.
++ */
++ private static void waitForZooKeeperClientThreads() {
++ Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
++ for (Thread thread : threadSet) {
++ // find ZooKeeper threads that were created in the same ClassLoader as the current thread.
++ if (thread.getClass().getName().startsWith("org.apache.zookeeper.ClientCnxn") &&
++ thread.getContextClassLoader().equals(Thread.currentThread().getContextClassLoader())) {
++
++ // wait for the thread the die
++ while (thread.isAlive()) {
++ try {
++ Thread.sleep(100);
++ } catch (InterruptedException e) {
++ log.error(e.getMessage(), e);
++ }
++ }
++ }
++ }
++ }
++}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/400b991f/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
----------------------------------------------------------------------
diff --cc fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
index 7258ff0,0000000..040b01d
mode 100644,000000..100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
@@@ -1,139 -1,0 +1,160 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+
- class ZooSession {
++public class ZooSession {
+
++ public static class ZooSessionShutdownException extends RuntimeException {
++
++ private static final long serialVersionUID = 1L;
++
++ }
++
+ private static final Logger log = Logger.getLogger(ZooSession.class);
+
+ private static class ZooSessionInfo {
+ public ZooSessionInfo(ZooKeeper zooKeeper, ZooWatcher watcher) {
+ this.zooKeeper = zooKeeper;
+ }
+
+ ZooKeeper zooKeeper;
+ }
+
+ private static Map<String,ZooSessionInfo> sessions = new HashMap<String,ZooSessionInfo>();
+
+ private static String sessionKey(String keepers, int timeout, String scheme, byte[] auth) {
+ return keepers + ":" + timeout + ":" + (scheme == null ? "" : scheme) + ":" + (auth == null ? "" : new String(auth));
+ }
+
+ private static class ZooWatcher implements Watcher {
+
+ public void process(WatchedEvent event) {
+ if (event.getState() == KeeperState.Expired) {
+ log.debug("Session expired, state of current session : " + event.getState());
+ }
+ }
+
+ }
+
+ public static ZooKeeper connect(String host, int timeout, String scheme, byte[] auth, Watcher watcher) {
+ final int TIME_BETWEEN_CONNECT_CHECKS_MS = 100;
+ final int TOTAL_CONNECT_TIME_WAIT_MS = 10 * 1000;
+ boolean tryAgain = true;
+ int sleepTime = 100;
+ ZooKeeper zooKeeper = null;
+
+ long startTime = System.currentTimeMillis();
+
+ while (tryAgain) {
+ try {
+ zooKeeper = new ZooKeeper(host, timeout, watcher);
+ // it may take some time to get connected to zookeeper if some of the servers are down
+ for (int i = 0; i < TOTAL_CONNECT_TIME_WAIT_MS / TIME_BETWEEN_CONNECT_CHECKS_MS && tryAgain; i++) {
+ if (zooKeeper.getState().equals(States.CONNECTED)) {
+ if (auth != null)
+ zooKeeper.addAuthInfo(scheme, auth);
+ tryAgain = false;
+ } else
+ UtilWaitThread.sleep(TIME_BETWEEN_CONNECT_CHECKS_MS);
+ }
+
+ if (System.currentTimeMillis() - startTime > 2 * timeout)
+ throw new RuntimeException("Failed to connect to zookeeper (" + host + ") within 2x zookeeper timeout period " + timeout);
+
+ } catch (UnknownHostException uhe) {
+ // do not expect to recover from this
+ log.warn(uhe.getClass().getName() + " : " + uhe.getMessage());
+ throw new RuntimeException(uhe);
+ } catch (IOException e) {
+ log.warn("Connection to zooKeeper failed, will try again in " + String.format("%.2f secs", sleepTime / 1000.0), e);
+ } finally {
+ if (tryAgain && zooKeeper != null)
+ try {
+ zooKeeper.close();
+ zooKeeper = null;
+ } catch (InterruptedException e) {
+ log.warn("interrupted", e);
+ }
+ }
+
+ if (tryAgain) {
+ UtilWaitThread.sleep(sleepTime);
+ if (sleepTime < 10000)
+ sleepTime = (int) (sleepTime + sleepTime * Math.random());
+ }
+ }
+
+ return zooKeeper;
+ }
+
+ public static synchronized ZooKeeper getSession(String zooKeepers, int timeout) {
+ return getSession(zooKeepers, timeout, null, null);
+ }
+
+ public static synchronized ZooKeeper getSession(String zooKeepers, int timeout, String scheme, byte[] auth) {
+
++ if (sessions == null)
++ throw new ZooSessionShutdownException();
++
+ String sessionKey = sessionKey(zooKeepers, timeout, scheme, auth);
+
+ // a read-only session can use a session with authorizations, so cache a copy for it w/out auths
+ String readOnlySessionKey = sessionKey(zooKeepers, timeout, null, null);
+ ZooSessionInfo zsi = sessions.get(sessionKey);
+ if (zsi != null && zsi.zooKeeper.getState() == States.CLOSED) {
+ if (auth != null && sessions.get(readOnlySessionKey) == zsi)
+ sessions.remove(readOnlySessionKey);
+ zsi = null;
+ sessions.remove(sessionKey);
+ }
+
+ if (zsi == null) {
+ ZooWatcher watcher = new ZooWatcher();
+ log.debug("Connecting to " + zooKeepers + " with timeout " + timeout + " with auth");
+ zsi = new ZooSessionInfo(connect(zooKeepers, timeout, scheme, auth, watcher), watcher);
+ sessions.put(sessionKey, zsi);
+ if (auth != null && !sessions.containsKey(readOnlySessionKey))
+ sessions.put(readOnlySessionKey, zsi);
+ }
+ return zsi.zooKeeper;
+ }
++
++ public static synchronized void shutdown() {
++ for (ZooSessionInfo zsi : sessions.values()) {
++ try {
++ zsi.zooKeeper.close();
++ } catch (Exception e) {
++ log.debug("Error closing zookeeper during shutdown", e);
++ }
++ }
++
++ sessions = null;
++ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/400b991f/test/src/main/java/org/apache/accumulo/test/functional/CleanUpTest.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/functional/CleanUpTest.java
index 0000000,0000000..5a0ca26
new file mode 100644
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CleanUpTest.java
@@@ -1,0 -1,0 +1,153 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.accumulo.test.functional;
++
++import java.util.Collections;
++import java.util.Iterator;
++import java.util.List;
++import java.util.Map;
++import java.util.Map.Entry;
++import java.util.Set;
++
++import org.apache.accumulo.core.client.BatchWriter;
++import org.apache.accumulo.core.client.Scanner;
++import org.apache.accumulo.core.data.Key;
++import org.apache.accumulo.core.data.Mutation;
++import org.apache.accumulo.core.data.Value;
++import org.apache.accumulo.core.security.Authorizations;
++import org.apache.accumulo.core.util.CleanUp;
++
++/**
++ *
++ */
++public class CleanUpTest extends FunctionalTest {
++
++ @Override
++ public Map<String,String> getInitialConfig() {
++ return Collections.emptyMap();
++ }
++
++ @Override
++ public List<TableSetup> getTablesToCreate() {
++ return Collections.emptyList();
++ }
++
++ @Override
++ public void run() throws Exception {
++
++
++ getConnector().tableOperations().create("test");
++
++ BatchWriter bw = getConnector().createBatchWriter("test", 1000000, 60000, 1);
++
++ Mutation m1 = new Mutation("r1");
++ m1.put("cf1", "cq1", 1, "5");
++
++ bw.addMutation(m1);
++
++ bw.flush();
++
++ Scanner scanner = getConnector().createScanner("test", new Authorizations());
++
++ int count = 0;
++ for (Entry<Key,Value> entry : scanner) {
++ count++;
++ if (!entry.getValue().toString().equals("5")) {
++ throw new Exception("Unexpected value " + entry.getValue());
++ }
++ }
++
++ if (count != 1) {
++ throw new Exception("Unexpected count " + count);
++ }
++
++ if (countThreads() < 2) {
++ printThreadNames();
++ throw new Exception("Not seeing expected threads");
++ }
++
++ CleanUp.shutdownNow();
++
++ Mutation m2 = new Mutation("r2");
++ m2.put("cf1", "cq1", 1, "6");
++
++ try {
++ bw.addMutation(m1);
++ bw.flush();
++ throw new Exception("batch writer did not fail");
++ } catch (Exception e) {
++
++ }
++
++ try {
++ // expect this to fail also, want to clean up batch writer threads
++ bw.close();
++ throw new Exception("batch writer close not fail");
++ } catch (Exception e) {
++
++ }
++
++ try {
++ count = 0;
++ Iterator<Entry<Key,Value>> iter = scanner.iterator();
++ while (iter.hasNext()) {
++ iter.next();
++ count++;
++ }
++ throw new Exception("scanner did not fail");
++ } catch (Exception e) {
++
++ }
++
++ if (countThreads() > 0) {
++ printThreadNames();
++ throw new Exception("Threads did not go away");
++ }
++ }
++
++ private void printThreadNames() {
++ Set<Thread> threads = Thread.getAllStackTraces().keySet();
++ for (Thread thread : threads) {
++ System.out.println("thread name:" + thread.getName());
++ thread.getStackTrace();
++
++ }
++ }
++
++ /**
++ * count threads that should be cleaned up
++ *
++ */
++ private int countThreads() {
++ int count = 0;
++ Set<Thread> threads = Thread.getAllStackTraces().keySet();
++ for (Thread thread : threads) {
++
++ if (thread.getName().toLowerCase().contains("sendthread") || thread.getName().toLowerCase().contains("eventthread"))
++ count++;
++
++ if (thread.getName().toLowerCase().contains("thrift") && thread.getName().toLowerCase().contains("pool"))
++ count++;
++ }
++
++ return count;
++ }
++
++ @Override
++ public void cleanup() throws Exception {}
++
++}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/400b991f/test/system/auto/simple/cleanup.py
----------------------------------------------------------------------
diff --cc test/system/auto/simple/cleanup.py
index 0000000,1ed8aff..03f7721
mode 000000,100755..100755
--- a/test/system/auto/simple/cleanup.py
+++ b/test/system/auto/simple/cleanup.py
@@@ -1,0 -1,30 +1,30 @@@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ # http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+ from JavaTest import JavaTest
+
+ import unittest
+
+ class CleanUpTest(JavaTest):
+ "Test clean up util"
+
+ order = 21
- testClass="org.apache.accumulo.server.test.functional.CleanUpTest"
++ testClass="org.apache.accumulo.test.functional.CleanUpTest"
+
+
+ def suite():
+ result = unittest.TestSuite()
+ result.addTest(CleanUpTest())
+ return result
[3/5] git commit: ACCUMULO-2128 added test for static clean up utility
Posted by kt...@apache.org.
ACCUMULO-2128 added test for static clean up utility
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/8f9fe417
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/8f9fe417
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/8f9fe417
Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 8f9fe41751415ab66ddbce6d6dec058999afc1d3
Parents: c94a73f
Author: Keith Turner <kt...@apache.org>
Authored: Mon Jan 6 20:17:45 2014 -0500
Committer: Keith Turner <kt...@apache.org>
Committed: Mon Jan 6 20:17:59 2014 -0500
----------------------------------------------------------------------
.../server/test/functional/CleanUpTest.java | 153 +++++++++++++++++++
test/system/auto/simple/cleanup.py | 30 ++++
2 files changed, 183 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/8f9fe417/src/server/src/main/java/org/apache/accumulo/server/test/functional/CleanUpTest.java
----------------------------------------------------------------------
diff --git a/src/server/src/main/java/org/apache/accumulo/server/test/functional/CleanUpTest.java b/src/server/src/main/java/org/apache/accumulo/server/test/functional/CleanUpTest.java
new file mode 100644
index 0000000..99fbcfe
--- /dev/null
+++ b/src/server/src/main/java/org/apache/accumulo/server/test/functional/CleanUpTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.test.functional;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.CleanUp;
+
+/**
+ *
+ */
+public class CleanUpTest extends FunctionalTest {
+
+ @Override
+ public Map<String,String> getInitialConfig() {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public List<TableSetup> getTablesToCreate() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void run() throws Exception {
+
+
+ getConnector().tableOperations().create("test");
+
+ BatchWriter bw = getConnector().createBatchWriter("test", 1000000, 60000, 1);
+
+ Mutation m1 = new Mutation("r1");
+ m1.put("cf1", "cq1", 1, "5");
+
+ bw.addMutation(m1);
+
+ bw.flush();
+
+ Scanner scanner = getConnector().createScanner("test", new Authorizations());
+
+ int count = 0;
+ for (Entry<Key,Value> entry : scanner) {
+ count++;
+ if (!entry.getValue().toString().equals("5")) {
+ throw new Exception("Unexpected value " + entry.getValue());
+ }
+ }
+
+ if (count != 1) {
+ throw new Exception("Unexpected count " + count);
+ }
+
+ if (countThreads() < 2) {
+ printThreadNames();
+ throw new Exception("Not seeing expected threads");
+ }
+
+ CleanUp.shutdownNow();
+
+ Mutation m2 = new Mutation("r2");
+ m2.put("cf1", "cq1", 1, "6");
+
+ try {
+ bw.addMutation(m1);
+ bw.flush();
+ throw new Exception("batch writer did not fail");
+ } catch (Exception e) {
+
+ }
+
+ try {
+ // expect this to fail also, want to clean up batch writer threads
+ bw.close();
+ throw new Exception("batch writer close not fail");
+ } catch (Exception e) {
+
+ }
+
+ try {
+ count = 0;
+ Iterator<Entry<Key,Value>> iter = scanner.iterator();
+ while (iter.hasNext()) {
+ iter.next();
+ count++;
+ }
+ throw new Exception("scanner did not fail");
+ } catch (Exception e) {
+
+ }
+
+ if (countThreads() > 0) {
+ printThreadNames();
+ throw new Exception("Threads did not go away");
+ }
+ }
+
+ private void printThreadNames() {
+ Set<Thread> threads = Thread.getAllStackTraces().keySet();
+ for (Thread thread : threads) {
+ System.out.println("thread name:" + thread.getName());
+ thread.getStackTrace();
+
+ }
+ }
+
+ /**
+ * count threads that should be cleaned up
+ *
+ */
+ private int countThreads() {
+ int count = 0;
+ Set<Thread> threads = Thread.getAllStackTraces().keySet();
+ for (Thread thread : threads) {
+
+ if (thread.getName().toLowerCase().contains("sendthread") || thread.getName().toLowerCase().contains("eventthread"))
+ count++;
+
+ if (thread.getName().toLowerCase().contains("thrift") && thread.getName().toLowerCase().contains("pool"))
+ count++;
+ }
+
+ return count;
+ }
+
+ @Override
+ public void cleanup() throws Exception {}
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/8f9fe417/test/system/auto/simple/cleanup.py
----------------------------------------------------------------------
diff --git a/test/system/auto/simple/cleanup.py b/test/system/auto/simple/cleanup.py
new file mode 100755
index 0000000..1ed8aff
--- /dev/null
+++ b/test/system/auto/simple/cleanup.py
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from JavaTest import JavaTest
+
+import unittest
+
+class CleanUpTest(JavaTest):
+ "Test clean up util"
+
+ order = 21
+ testClass="org.apache.accumulo.server.test.functional.CleanUpTest"
+
+
+def suite():
+ result = unittest.TestSuite()
+ result.addTest(CleanUpTest())
+ return result
[2/5] git commit: ACCUMULO-2128 added utility to cleanup accumulo
static resources
Posted by kt...@apache.org.
ACCUMULO-2128 added utility to cleanup accumulo static resources
Signed-off-by: Keith Turner <kt...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/715825b3
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/715825b3
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/715825b3
Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 715825b3299fd742d8570971a7f271178b932812
Parents: 71f150a
Author: Keith Turner <kt...@apache.org>
Authored: Thu Jan 2 21:03:55 2014 -0500
Committer: Keith Turner <kt...@apache.org>
Committed: Mon Jan 6 20:17:58 2014 -0500
----------------------------------------------------------------------
.../core/client/impl/ThriftTransportPool.java | 102 +++++++++++++++----
.../org/apache/accumulo/core/util/CleanUp.java | 35 +++++++
.../accumulo/core/zookeeper/ZooSession.java | 23 ++++-
3 files changed, 140 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/715825b3/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java b/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
index ef3724b..7468051 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
@@ -30,6 +30,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -53,6 +54,8 @@ public class ThriftTransportPool {
private Map<ThriftTransportKey,Long> errorCount = new HashMap<ThriftTransportKey,Long>();
private Map<ThriftTransportKey,Long> errorTime = new HashMap<ThriftTransportKey,Long>();
private Set<ThriftTransportKey> serversWarnedAbout = new HashSet<ThriftTransportKey>();
+
+ private CountDownLatch closerExitLatch;
private static final Logger log = Logger.getLogger(ThriftTransportPool.class);
@@ -78,20 +81,26 @@ public class ThriftTransportPool {
long lastReturnTime;
}
+ public static class TransportPoolShutdownException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+ }
+
private static class Closer implements Runnable {
ThriftTransportPool pool;
-
- public Closer(ThriftTransportPool pool) {
+ private CountDownLatch closerExitLatch;
+
+ public Closer(ThriftTransportPool pool, CountDownLatch closerExitLatch) {
this.pool = pool;
+ this.closerExitLatch = closerExitLatch;
}
- public void run() {
+ private void closeConnections() {
while (true) {
ArrayList<CachedConnection> connectionsToClose = new ArrayList<CachedConnection>();
synchronized (pool) {
- for (List<CachedConnection> ccl : pool.cache.values()) {
+ for (List<CachedConnection> ccl : pool.getCache().values()) {
Iterator<CachedConnection> iter = ccl.iterator();
while (iter.hasNext()) {
CachedConnection cachedConnection = iter.next();
@@ -103,7 +112,7 @@ public class ThriftTransportPool {
}
}
- for (List<CachedConnection> ccl : pool.cache.values()) {
+ for (List<CachedConnection> ccl : pool.getCache().values()) {
for (CachedConnection cachedConnection : ccl) {
cachedConnection.transport.checkForStuckIO(STUCK_THRESHOLD);
}
@@ -132,6 +141,15 @@ public class ThriftTransportPool {
}
}
}
+
+ public void run() {
+ try {
+ closeConnections();
+ } catch (TransportPoolShutdownException e) {
+ } finally {
+ closerExitLatch.countDown();
+ }
+ }
}
static class CachedTTransport extends TTransport {
@@ -384,14 +402,14 @@ public class ThriftTransportPool {
synchronized (this) {
// randomly pick a server from the connection cache
- serversSet.retainAll(cache.keySet());
+ serversSet.retainAll(getCache().keySet());
if (serversSet.size() > 0) {
ArrayList<ThriftTransportKey> cachedServers = new ArrayList<ThriftTransportKey>(serversSet);
Collections.shuffle(cachedServers, random);
for (ThriftTransportKey ttk : cachedServers) {
- for (CachedConnection cachedConnection : cache.get(ttk)) {
+ for (CachedConnection cachedConnection : getCache().get(ttk)) {
if (!cachedConnection.isReserved()) {
cachedConnection.setReserved(true);
if (log.isTraceEnabled())
@@ -411,7 +429,7 @@ public class ThriftTransportPool {
if (!preferCachedConnection) {
synchronized (this) {
- List<CachedConnection> cachedConnList = cache.get(ttk);
+ List<CachedConnection> cachedConnList = getCache().get(ttk);
if (cachedConnList != null) {
for (CachedConnection cachedConnection : cachedConnList) {
if (!cachedConnection.isReserved()) {
@@ -444,11 +462,11 @@ public class ThriftTransportPool {
private TTransport getTransport(ThriftTransportKey cacheKey) throws TTransportException {
synchronized (this) {
// atomically reserve location if it exist in cache
- List<CachedConnection> ccl = cache.get(cacheKey);
+ List<CachedConnection> ccl = getCache().get(cacheKey);
if (ccl == null) {
ccl = new LinkedList<CachedConnection>();
- cache.put(cacheKey, ccl);
+ getCache().put(cacheKey, ccl);
}
for (CachedConnection cachedConnection : ccl) {
@@ -486,15 +504,20 @@ public class ThriftTransportPool {
CachedConnection cc = new CachedConnection(tsc);
cc.setReserved(true);
- synchronized (this) {
- List<CachedConnection> ccl = cache.get(cacheKey);
+ try {
+ synchronized (this) {
+ List<CachedConnection> ccl = getCache().get(cacheKey);
+
+ if (ccl == null) {
+ ccl = new LinkedList<CachedConnection>();
+ getCache().put(cacheKey, ccl);
+ }
- if (ccl == null) {
- ccl = new LinkedList<CachedConnection>();
- cache.put(cacheKey, ccl);
+ ccl.add(cc);
}
-
- ccl.add(cc);
+ } catch (TransportPoolShutdownException e) {
+ cc.transport.close();
+ throw e;
}
return cc.transport;
}
@@ -510,7 +533,7 @@ public class ThriftTransportPool {
ArrayList<CachedConnection> closeList = new ArrayList<ThriftTransportPool.CachedConnection>();
synchronized (this) {
- List<CachedConnection> ccl = cache.get(ctsc.getCacheKey());
+ List<CachedConnection> ccl = getCache().get(ctsc.getCacheKey());
for (Iterator<CachedConnection> iterator = ccl.iterator(); iterator.hasNext();) {
CachedConnection cachedConnection = iterator.next();
if (cachedConnection.transport == tsc) {
@@ -600,8 +623,49 @@ public class ThriftTransportPool {
}
if (daemonStarted.compareAndSet(false, true)) {
- new Daemon(new Closer(instance), "Thrift Connection Pool Checker").start();
+ CountDownLatch closerExitLatch = new CountDownLatch(1);
+ new Daemon(new Closer(instance, closerExitLatch), "Thrift Connection Pool Checker").start();
+ instance.setCloserExitLatch(closerExitLatch);
}
return instance;
}
+
+ private synchronized void setCloserExitLatch(CountDownLatch closerExitLatch) {
+ this.closerExitLatch = closerExitLatch;
+ }
+
+ public void shutdown() {
+ synchronized (this) {
+ if (cache == null)
+ return;
+
+ // close any connections in the pool... even ones that are in use
+ for (List<CachedConnection> ccl : getCache().values()) {
+ Iterator<CachedConnection> iter = ccl.iterator();
+ while (iter.hasNext()) {
+ CachedConnection cc = iter.next();
+ try {
+ cc.transport.close();
+ } catch (Exception e) {
+ log.debug("Error closing transport during shutdown", e);
+ }
+ }
+ }
+
+ // this will render the pool unusable and cause the background thread to exit
+ this.cache = null;
+ }
+
+ try {
+ closerExitLatch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Map<ThriftTransportKey,List<CachedConnection>> getCache() {
+ if (cache == null)
+ throw new TransportPoolShutdownException();
+ return cache;
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/715825b3/src/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java b/src/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java
new file mode 100644
index 0000000..ba02f0b
--- /dev/null
+++ b/src/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util;
+
+import org.apache.accumulo.core.client.impl.ThriftTransportPool;
+import org.apache.accumulo.core.zookeeper.ZooSession;
+
+/**
+ *
+ */
+public class CleanUp {
+ /**
+ * kills all threads created by internal Accumulo singleton resources. After this method is called, no accumulo client will work in the current classloader.
+ */
+ public static void shutdownNow() {
+ ThriftTransportPool.getInstance().shutdown();
+ ZooSession.shutdown();
+ // need to get code from jared w
+ // waitForZooKeeperClientThreads();
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/715825b3/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java b/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java
index b3db26f..e64f0c5 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java
@@ -29,8 +29,14 @@ import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
-class ZooSession {
+public class ZooSession {
+ public static class ZooSessionShutdownException extends RuntimeException {
+
+ private static final long serialVersionUID = 1L;
+
+ }
+
private static final Logger log = Logger.getLogger(ZooSession.class);
private static class ZooSessionInfo {
@@ -114,6 +120,9 @@ class ZooSession {
public static synchronized ZooKeeper getSession(String zooKeepers, int timeout, String auth) {
+ if (sessions == null)
+ throw new ZooSessionShutdownException();
+
String sessionKey = sessionKey(zooKeepers, timeout, auth);
// a read-only session can use a session with authorizations, so cache a copy for it w/out auths
@@ -137,4 +146,16 @@ class ZooSession {
}
return zsi.zooKeeper;
}
+
+ public static synchronized void shutdown() {
+ for (ZooSessionInfo zsi : sessions.values()) {
+ try {
+ zsi.zooKeeper.close();
+ } catch (Exception e) {
+ log.debug("Error closing zookeeper during shutdown", e);
+ }
+ }
+
+ sessions = null;
+ }
}