You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2014/01/07 03:07:16 UTC

[1/6] git commit: ACCUMULO-2128 added waitForZooKeeperClientThreads method

Updated Branches:
  refs/heads/master b8bd259d0 -> 4fbba38ae


ACCUMULO-2128 added waitForZooKeeperClientThreads method

Signed-off-by: Keith Turner <kt...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/c94a73f4
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/c94a73f4
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/c94a73f4

Branch: refs/heads/master
Commit: c94a73f478c91a24e35583d41bb39102461c54fa
Parents: 715825b
Author: Jared Winick <ja...@koverse.com>
Authored: Thu Jan 2 22:30:59 2014 -0700
Committer: Keith Turner <kt...@apache.org>
Committed: Mon Jan 6 20:17:58 2014 -0500

----------------------------------------------------------------------
 .../org/apache/accumulo/core/util/CleanUp.java  | 32 ++++++++++++++++++--
 1 file changed, 30 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/c94a73f4/src/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java b/src/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java
index ba02f0b..5b2a4b9 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java
@@ -16,20 +16,48 @@
  */
 package org.apache.accumulo.core.util;
 
+import java.util.Set;
+
 import org.apache.accumulo.core.client.impl.ThriftTransportPool;
 import org.apache.accumulo.core.zookeeper.ZooSession;
+import org.apache.log4j.Logger;
 
 /**
  * 
  */
 public class CleanUp {
+  
+  private static final Logger log = Logger.getLogger(CleanUp.class);
+  
   /**
    * kills all threads created by internal Accumulo singleton resources. After this method is called, no accumulo client will work in the current classloader.
    */
   public static void shutdownNow() {
     ThriftTransportPool.getInstance().shutdown();
     ZooSession.shutdown();
-    // need to get code from jared w
-    // waitForZooKeeperClientThreads();
+    waitForZooKeeperClientThreads();
+  }
+  
+  /**
+   * As documented in https://issues.apache.org/jira/browse/ZOOKEEPER-1816, ZooKeeper.close()
+   * is a non-blocking call. This method will wait on the ZooKeeper internal threads to exit.
+   */
+  private static void waitForZooKeeperClientThreads() {
+    Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
+    for (Thread thread : threadSet) {    
+      // find ZooKeeper threads that were created in the same ClassLoader as the current thread.
+      if (thread.getClass().getName().startsWith("org.apache.zookeeper.ClientCnxn") &&
+          thread.getContextClassLoader().equals(Thread.currentThread().getContextClassLoader())) {
+
+        // wait for the thread the die
+        while (thread.isAlive()) {
+          try {
+            Thread.sleep(100);
+          } catch (InterruptedException e) {
+            log.error(e.getMessage(), e);
+          }
+        }
+      }
+    }
   }
 }


[4/6] git commit: Merge branch '1.4.5-SNAPSHOT' into 1.5.1-SNAPSHOT

Posted by kt...@apache.org.
Merge branch '1.4.5-SNAPSHOT' into 1.5.1-SNAPSHOT

Conflicts:
	core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
	fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/400b991f
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/400b991f
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/400b991f

Branch: refs/heads/master
Commit: 400b991fb80114f6672e18496f1b2359b2e22c3d
Parents: a91ee4d 8f9fe41
Author: Keith Turner <kt...@apache.org>
Authored: Mon Jan 6 20:48:26 2014 -0500
Committer: Keith Turner <kt...@apache.org>
Committed: Mon Jan 6 20:48:26 2014 -0500

----------------------------------------------------------------------
 .../core/client/impl/ThriftTransportPool.java   | 100 +++++++++---
 .../org/apache/accumulo/core/util/CleanUp.java  |  63 ++++++++
 .../accumulo/fate/zookeeper/ZooSession.java     |  23 ++-
 .../accumulo/test/functional/CleanUpTest.java   | 153 +++++++++++++++++++
 test/system/auto/simple/cleanup.py              |  30 ++++
 5 files changed, 350 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/400b991f/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
index ceeab21,0000000..f123289
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
@@@ -1,607 -1,0 +1,671 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.client.impl;
 +
 +import java.io.IOException;
 +import java.net.InetSocketAddress;
 +import java.security.SecurityPermission;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.LinkedList;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Random;
 +import java.util.Set;
++import java.util.concurrent.CountDownLatch;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.util.AddressUtil;
 +import org.apache.accumulo.core.util.Daemon;
 +import org.apache.accumulo.core.util.Pair;
 +import org.apache.accumulo.core.util.TTimeoutTransport;
 +import org.apache.accumulo.core.util.ThriftUtil;
 +import org.apache.log4j.Logger;
 +import org.apache.thrift.transport.TTransport;
 +import org.apache.thrift.transport.TTransportException;
 +
 +public class ThriftTransportPool {
 +  private static SecurityPermission TRANSPORT_POOL_PERMISSION = new SecurityPermission("transportPoolPermission");
 +  
 +  private static final Random random = new Random();
 +  private long killTime = 1000 * 3;
 +  
 +  private Map<ThriftTransportKey,List<CachedConnection>> cache = new HashMap<ThriftTransportKey,List<CachedConnection>>();
 +  private Map<ThriftTransportKey,Long> errorCount = new HashMap<ThriftTransportKey,Long>();
 +  private Map<ThriftTransportKey,Long> errorTime = new HashMap<ThriftTransportKey,Long>();
 +  private Set<ThriftTransportKey> serversWarnedAbout = new HashSet<ThriftTransportKey>();
++
++  private CountDownLatch closerExitLatch;
 +  
 +  private static final Logger log = Logger.getLogger(ThriftTransportPool.class);
 +  
 +  private static final Long ERROR_THRESHOLD = 20l;
 +  private static final int STUCK_THRESHOLD = 2 * 60 * 1000;
 +  
 +  private static class CachedConnection {
 +    
 +    public CachedConnection(CachedTTransport t) {
 +      this.transport = t;
 +    }
 +    
 +    void setReserved(boolean reserved) {
 +      this.transport.setReserved(reserved);
 +    }
 +    
 +    boolean isReserved() {
 +      return this.transport.reserved;
 +    }
 +    
 +    CachedTTransport transport;
 +    
 +    long lastReturnTime;
 +  }
 +  
++  public static class TransportPoolShutdownException extends RuntimeException {
++    private static final long serialVersionUID = 1L;
++  }
++
 +  private static class Closer implements Runnable {
 +    final ThriftTransportPool pool;
++    private CountDownLatch closerExitLatch;
 +    
-     public Closer(ThriftTransportPool pool) {
++    public Closer(ThriftTransportPool pool, CountDownLatch closerExitLatch) {
 +      this.pool = pool;
++      this.closerExitLatch = closerExitLatch;
 +    }
 +    
-     public void run() {
++    private void closeConnections() {
 +      while (true) {
 +        
 +        ArrayList<CachedConnection> connectionsToClose = new ArrayList<CachedConnection>();
 +        
 +        synchronized (pool) {
-           for (List<CachedConnection> ccl : pool.cache.values()) {
++          for (List<CachedConnection> ccl : pool.getCache().values()) {
 +            Iterator<CachedConnection> iter = ccl.iterator();
 +            while (iter.hasNext()) {
 +              CachedConnection cachedConnection = iter.next();
 +              
 +              if (!cachedConnection.isReserved() && System.currentTimeMillis() - cachedConnection.lastReturnTime > pool.killTime) {
 +                connectionsToClose.add(cachedConnection);
 +                iter.remove();
 +              }
 +            }
 +          }
 +          
-           for (List<CachedConnection> ccl : pool.cache.values()) {
++          for (List<CachedConnection> ccl : pool.getCache().values()) {
 +            for (CachedConnection cachedConnection : ccl) {
 +              cachedConnection.transport.checkForStuckIO(STUCK_THRESHOLD);
 +            }
 +          }
 +          
 +          Iterator<Entry<ThriftTransportKey,Long>> iter = pool.errorTime.entrySet().iterator();
 +          while (iter.hasNext()) {
 +            Entry<ThriftTransportKey,Long> entry = iter.next();
 +            long delta = System.currentTimeMillis() - entry.getValue();
 +            if (delta >= STUCK_THRESHOLD) {
 +              pool.errorCount.remove(entry.getKey());
 +              iter.remove();
 +            }
 +          }
 +        }
 +        
 +        // close connections outside of sync block
 +        for (CachedConnection cachedConnection : connectionsToClose) {
 +          cachedConnection.transport.close();
 +        }
 +        
 +        try {
 +          Thread.sleep(500);
 +        } catch (InterruptedException e) {
 +          e.printStackTrace();
 +        }
 +      }
 +    }
++
++    public void run() {
++      try {
++        closeConnections();
++      } catch (TransportPoolShutdownException e) {
++      } finally {
++        closerExitLatch.countDown();
++      }
++    }
 +  }
 +  
 +  static class CachedTTransport extends TTransport {
 +    
 +    private ThriftTransportKey cacheKey;
 +    private TTransport wrappedTransport;
 +    private boolean sawError = false;
 +    
 +    private volatile String ioThreadName = null;
 +    private volatile long ioStartTime = 0;
 +    private volatile boolean reserved = false;
 +    
 +    private String stuckThreadName = null;
 +    
 +    int ioCount = 0;
 +    int lastIoCount = -1;
 +    
 +    private void sawError(Exception e) {
 +      sawError = true;
 +    }
 +    
 +    final void setReserved(boolean reserved) {
 +      this.reserved = reserved;
 +      if (reserved) {
 +        ioThreadName = Thread.currentThread().getName();
 +        ioCount = 0;
 +        lastIoCount = -1;
 +      } else {
 +        if ((ioCount & 1) == 1) {
 +          // connection unreserved, but it seems io may still be
 +          // happening
 +          log.warn("Connection returned to thrift connection pool that may still be in use " + ioThreadName + " " + Thread.currentThread().getName(),
 +              new Exception());
 +        }
 +        
 +        ioCount = 0;
 +        lastIoCount = -1;
 +        ioThreadName = null;
 +      }
 +      checkForStuckIO(STUCK_THRESHOLD);
 +    }
 +    
 +    final void checkForStuckIO(long threshold) {
 +      /*
 +       * checking for stuck io needs to be light weight.
 +       * 
 +       * Tried to call System.currentTimeMillis() and Thread.currentThread() before every io operation.... this dramatically slowed things down. So switched to
 +       * incrementing a counter before and after each io operation.
 +       */
 +      
 +      if ((ioCount & 1) == 1) {
 +        // when ioCount is odd, it means I/O is currently happening
 +        if (ioCount == lastIoCount) {
 +          // still doing same I/O operation as last time this
 +          // functions was called
 +          long delta = System.currentTimeMillis() - ioStartTime;
 +          if (delta >= threshold && stuckThreadName == null) {
 +            stuckThreadName = ioThreadName;
 +            log.warn("Thread \"" + ioThreadName + "\" stuck on IO  to " + cacheKey + " for at least " + delta + " ms");
 +          }
 +        } else {
 +          // remember this ioCount and the time we saw it, need to see
 +          // if it changes
 +          lastIoCount = ioCount;
 +          ioStartTime = System.currentTimeMillis();
 +          
 +          if (stuckThreadName != null) {
 +            // doing I/O, but ioCount changed so no longer stuck
 +            log.info("Thread \"" + stuckThreadName + "\" no longer stuck on IO  to " + cacheKey + " sawError = " + sawError);
 +            stuckThreadName = null;
 +          }
 +        }
 +      } else {
 +        // I/O is not currently happening
 +        if (stuckThreadName != null) {
 +          // no longer stuck, and was stuck in the past
 +          log.info("Thread \"" + stuckThreadName + "\" no longer stuck on IO  to " + cacheKey + " sawError = " + sawError);
 +          stuckThreadName = null;
 +        }
 +      }
 +    }
 +    
 +    public CachedTTransport(TTransport transport, ThriftTransportKey cacheKey2) {
 +      this.wrappedTransport = transport;
 +      this.cacheKey = cacheKey2;
 +    }
 +    
 +    public boolean isOpen() {
 +      return wrappedTransport.isOpen();
 +    }
 +    
 +    public void open() throws TTransportException {
 +      try {
 +        ioCount++;
 +        wrappedTransport.open();
 +      } catch (TTransportException tte) {
 +        sawError(tte);
 +        throw tte;
 +      } finally {
 +        ioCount++;
 +      }
 +    }
 +    
 +    public int read(byte[] arg0, int arg1, int arg2) throws TTransportException {
 +      try {
 +        ioCount++;
 +        return wrappedTransport.read(arg0, arg1, arg2);
 +      } catch (TTransportException tte) {
 +        sawError(tte);
 +        throw tte;
 +      } finally {
 +        ioCount++;
 +      }
 +    }
 +    
 +    public int readAll(byte[] arg0, int arg1, int arg2) throws TTransportException {
 +      try {
 +        ioCount++;
 +        return wrappedTransport.readAll(arg0, arg1, arg2);
 +      } catch (TTransportException tte) {
 +        sawError(tte);
 +        throw tte;
 +      } finally {
 +        ioCount++;
 +      }
 +    }
 +    
 +    public void write(byte[] arg0, int arg1, int arg2) throws TTransportException {
 +      try {
 +        ioCount++;
 +        wrappedTransport.write(arg0, arg1, arg2);
 +      } catch (TTransportException tte) {
 +        sawError(tte);
 +        throw tte;
 +      } finally {
 +        ioCount++;
 +      }
 +    }
 +    
 +    public void write(byte[] arg0) throws TTransportException {
 +      try {
 +        ioCount++;
 +        wrappedTransport.write(arg0);
 +      } catch (TTransportException tte) {
 +        sawError(tte);
 +        throw tte;
 +      } finally {
 +        ioCount++;
 +      }
 +    }
 +    
 +    public void close() {
 +      try {
 +        ioCount++;
 +        wrappedTransport.close();
 +      } finally {
 +        ioCount++;
 +      }
 +      
 +    }
 +    
 +    public void flush() throws TTransportException {
 +      try {
 +        ioCount++;
 +        wrappedTransport.flush();
 +      } catch (TTransportException tte) {
 +        sawError(tte);
 +        throw tte;
 +      } finally {
 +        ioCount++;
 +      }
 +    }
 +    
 +    public boolean peek() {
 +      try {
 +        ioCount++;
 +        return wrappedTransport.peek();
 +      } finally {
 +        ioCount++;
 +      }
 +    }
 +    
 +    public byte[] getBuffer() {
 +      try {
 +        ioCount++;
 +        return wrappedTransport.getBuffer();
 +      } finally {
 +        ioCount++;
 +      }
 +    }
 +    
 +    public int getBufferPosition() {
 +      try {
 +        ioCount++;
 +        return wrappedTransport.getBufferPosition();
 +      } finally {
 +        ioCount++;
 +      }
 +    }
 +    
 +    public int getBytesRemainingInBuffer() {
 +      try {
 +        ioCount++;
 +        return wrappedTransport.getBytesRemainingInBuffer();
 +      } finally {
 +        ioCount++;
 +      }
 +    }
 +    
 +    public void consumeBuffer(int len) {
 +      try {
 +        ioCount++;
 +        wrappedTransport.consumeBuffer(len);
 +      } finally {
 +        ioCount++;
 +      }
 +    }
 +    
 +    public ThriftTransportKey getCacheKey() {
 +      return cacheKey;
 +    }
 +    
 +  }
 +  
 +  private ThriftTransportPool() {}
 +  
 +  public TTransport getTransport(String location, int port) throws TTransportException {
 +    return getTransport(location, port, 0);
 +  }
 +  
 +  public TTransport getTransportWithDefaultTimeout(InetSocketAddress addr, AccumuloConfiguration conf) throws TTransportException {
 +    return getTransport(addr.getAddress().getHostAddress(), addr.getPort(), conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
 +  }
 +  
 +  public TTransport getTransport(InetSocketAddress addr, long timeout) throws TTransportException {
 +    return getTransport(addr.getAddress().getHostAddress(), addr.getPort(), timeout);
 +  }
 +  
 +  public TTransport getTransportWithDefaultTimeout(String location, int port, AccumuloConfiguration conf) throws TTransportException {
 +    return getTransport(location, port, conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
 +  }
 +  
 +  Pair<String,TTransport> getAnyTransport(List<ThriftTransportKey> servers, boolean preferCachedConnection) throws TTransportException {
 +    
 +    servers = new ArrayList<ThriftTransportKey>(servers);
 +    
 +    if (preferCachedConnection) {
 +      HashSet<ThriftTransportKey> serversSet = new HashSet<ThriftTransportKey>(servers);
 +      
 +      synchronized (this) {
 +        
 +        // randomly pick a server from the connection cache
-         serversSet.retainAll(cache.keySet());
++        serversSet.retainAll(getCache().keySet());
 +        
 +        if (serversSet.size() > 0) {
 +          ArrayList<ThriftTransportKey> cachedServers = new ArrayList<ThriftTransportKey>(serversSet);
 +          Collections.shuffle(cachedServers, random);
 +          
 +          for (ThriftTransportKey ttk : cachedServers) {
-             for (CachedConnection cachedConnection : cache.get(ttk)) {
++            for (CachedConnection cachedConnection : getCache().get(ttk)) {
 +              if (!cachedConnection.isReserved()) {
 +                cachedConnection.setReserved(true);
 +                if (log.isTraceEnabled())
 +                  log.trace("Using existing connection to " + ttk.getLocation() + ":" + ttk.getPort());
 +                return new Pair<String,TTransport>(ttk.getLocation() + ":" + ttk.getPort(), cachedConnection.transport);
 +              }
 +            }
 +          }
 +        }
 +      }
 +    }
 +    
 +    int retryCount = 0;
 +    while (servers.size() > 0 && retryCount < 10) {
 +      int index = random.nextInt(servers.size());
 +      ThriftTransportKey ttk = servers.get(index);
 +      
 +      if (!preferCachedConnection) {
 +        synchronized (this) {
-           List<CachedConnection> cachedConnList = cache.get(ttk);
++          List<CachedConnection> cachedConnList = getCache().get(ttk);
 +          if (cachedConnList != null) {
 +            for (CachedConnection cachedConnection : cachedConnList) {
 +              if (!cachedConnection.isReserved()) {
 +                cachedConnection.setReserved(true);
 +                if (log.isTraceEnabled())
 +                  log.trace("Using existing connection to " + ttk.getLocation() + ":" + ttk.getPort() + " timeout " + ttk.getTimeout());
 +                return new Pair<String,TTransport>(ttk.getLocation() + ":" + ttk.getPort(), cachedConnection.transport);
 +              }
 +            }
 +          }
 +        }
 +      }
 +
 +      try {
 +        return new Pair<String,TTransport>(ttk.getLocation() + ":" + ttk.getPort(), createNewTransport(ttk));
 +      } catch (TTransportException tte) {
 +        log.debug("Failed to connect to " + servers.get(index), tte);
 +        servers.remove(index);
 +        retryCount++;
 +      }
 +    }
 +    
 +    throw new TTransportException("Failed to connect to a server");
 +  }
 +  
 +  public TTransport getTransport(String location, int port, long milliseconds) throws TTransportException {
 +    return getTransport(new ThriftTransportKey(location, port, milliseconds));
 +  }
 +  
 +  private TTransport getTransport(ThriftTransportKey cacheKey) throws TTransportException {
 +    synchronized (this) {
 +      // atomically reserve location if it exist in cache
-       List<CachedConnection> ccl = cache.get(cacheKey);
++      List<CachedConnection> ccl = getCache().get(cacheKey);
 +      
 +      if (ccl == null) {
 +        ccl = new LinkedList<CachedConnection>();
-         cache.put(cacheKey, ccl);
++        getCache().put(cacheKey, ccl);
 +      }
 +      
 +      for (CachedConnection cachedConnection : ccl) {
 +        if (!cachedConnection.isReserved()) {
 +          cachedConnection.setReserved(true);
 +          if (log.isTraceEnabled())
 +            log.trace("Using existing connection to " + cacheKey.getLocation() + ":" + cacheKey.getPort());
 +          return cachedConnection.transport;
 +        }
 +      }
 +    }
 +    
 +    return createNewTransport(cacheKey);
 +  }
 +  
 +  private TTransport createNewTransport(ThriftTransportKey cacheKey) throws TTransportException {
 +    TTransport transport;
 +    if (cacheKey.getTimeout() == 0) {
 +      transport = AddressUtil.createTSocket(cacheKey.getLocation(), cacheKey.getPort());
 +    } else {
 +      try {
 +        transport = TTimeoutTransport.create(AddressUtil.parseAddress(cacheKey.getLocation(), cacheKey.getPort()), cacheKey.getTimeout());
 +      } catch (IOException ex) {
 +        throw new TTransportException(ex);
 +      }
 +    }
 +    transport = ThriftUtil.transportFactory().getTransport(transport);
 +    transport.open();
 +    
 +    if (log.isTraceEnabled())
 +      log.trace("Creating new connection to connection to " + cacheKey.getLocation() + ":" + cacheKey.getPort());
 +    
 +    CachedTTransport tsc = new CachedTTransport(transport, cacheKey);
 +    
 +    CachedConnection cc = new CachedConnection(tsc);
 +    cc.setReserved(true);
 +    
-     synchronized (this) {
-       List<CachedConnection> ccl = cache.get(cacheKey);
++    try {
++      synchronized (this) {
++        List<CachedConnection> ccl = getCache().get(cacheKey);
++
++        if (ccl == null) {
++          ccl = new LinkedList<CachedConnection>();
++          getCache().put(cacheKey, ccl);
++        }
 +      
-       if (ccl == null) {
-         ccl = new LinkedList<CachedConnection>();
-         cache.put(cacheKey, ccl);
++        ccl.add(cc);
 +      }
-       
-       ccl.add(cc);
++    } catch (TransportPoolShutdownException e) {
++      cc.transport.close();
++      throw e;
 +    }
 +    return cc.transport;
 +  }
 +  
 +  public void returnTransport(TTransport tsc) {
 +    if (tsc == null) {
 +      return;
 +    }
 +    
 +    boolean existInCache = false;
 +    CachedTTransport ctsc = (CachedTTransport) tsc;
 +    
 +    ArrayList<CachedConnection> closeList = new ArrayList<ThriftTransportPool.CachedConnection>();
 +
 +    synchronized (this) {
-       List<CachedConnection> ccl = cache.get(ctsc.getCacheKey());
++      List<CachedConnection> ccl = getCache().get(ctsc.getCacheKey());
 +      for (Iterator<CachedConnection> iterator = ccl.iterator(); iterator.hasNext();) {
 +        CachedConnection cachedConnection = iterator.next();
 +        if (cachedConnection.transport == tsc) {
 +          if (ctsc.sawError) {
 +            closeList.add(cachedConnection);
 +            iterator.remove();
 +            
 +            if (log.isTraceEnabled())
 +              log.trace("Returned connection had error " + ctsc.getCacheKey());
 +            
 +            Long ecount = errorCount.get(ctsc.getCacheKey());
 +            if (ecount == null)
 +              ecount = 0l;
 +            ecount++;
 +            errorCount.put(ctsc.getCacheKey(), ecount);
 +            
 +            Long etime = errorTime.get(ctsc.getCacheKey());
 +            if (etime == null) {
 +              errorTime.put(ctsc.getCacheKey(), System.currentTimeMillis());
 +            }
 +            
 +            if (ecount >= ERROR_THRESHOLD && !serversWarnedAbout.contains(ctsc.getCacheKey())) {
 +              log.warn("Server " + ctsc.getCacheKey() + " had " + ecount + " failures in a short time period, will not complain anymore ");
 +              serversWarnedAbout.add(ctsc.getCacheKey());
 +            }
 +            
 +            cachedConnection.setReserved(false);
 +            
 +          } else {
 +            
 +            if (log.isTraceEnabled())
 +              log.trace("Returned connection " + ctsc.getCacheKey() + " ioCount : " + cachedConnection.transport.ioCount);
 +            
 +            cachedConnection.lastReturnTime = System.currentTimeMillis();
 +            cachedConnection.setReserved(false);
 +          }
 +          existInCache = true;
 +          break;
 +        }
 +      }
 +      
 +      // remove all unreserved cached connection when a sever has an error, not just the connection that was returned
 +      if (ctsc.sawError) {
 +        for (Iterator<CachedConnection> iterator = ccl.iterator(); iterator.hasNext();) {
 +          CachedConnection cachedConnection = iterator.next();
 +          if (!cachedConnection.isReserved()) {
 +            closeList.add(cachedConnection);
 +            iterator.remove();
 +          }
 +        }
 +      }
 +    }
 +    
 +    // close outside of sync block
 +    for (CachedConnection cachedConnection : closeList) {
 +      try {
 +        cachedConnection.transport.close();
 +      } catch (Exception e) {
 +        log.debug("Failed to close connection w/ errors", e);
 +      }
 +    }
 +    
 +    if (!existInCache) {
 +      log.warn("Returned tablet server connection to cache that did not come from cache");
 +      // close outside of sync block
 +      tsc.close();
 +    }
 +  }
 +  
 +  /**
 +   * Set the time after which idle connections should be closed
 +   * 
 +   * @param time
 +   */
 +  public synchronized void setIdleTime(long time) {
 +    this.killTime = time;
 +    log.debug("Set thrift transport pool idle time to " + time);
 +  }
 +
 +  private static ThriftTransportPool instance = new ThriftTransportPool();
 +  private static final AtomicBoolean daemonStarted = new AtomicBoolean(false);
 +  
 +  public static ThriftTransportPool getInstance() {
 +    SecurityManager sm = System.getSecurityManager();
 +    if (sm != null) {
 +      sm.checkPermission(TRANSPORT_POOL_PERMISSION);
 +    }
 +    
 +    if (daemonStarted.compareAndSet(false, true)) {
-       new Daemon(new Closer(instance), "Thrift Connection Pool Checker").start();
++      CountDownLatch closerExitLatch = new CountDownLatch(1);
++      new Daemon(new Closer(instance, closerExitLatch), "Thrift Connection Pool Checker").start();
++      instance.setCloserExitLatch(closerExitLatch);
 +    }
 +    return instance;
 +  }
++  
++  private synchronized void setCloserExitLatch(CountDownLatch closerExitLatch) {
++    this.closerExitLatch = closerExitLatch;
++  }
++
++  public void shutdown() {
++    synchronized (this) {
++      if (cache == null)
++        return;
++
++      // close any connections in the pool... even ones that are in use
++      for (List<CachedConnection> ccl : getCache().values()) {
++        Iterator<CachedConnection> iter = ccl.iterator();
++        while (iter.hasNext()) {
++          CachedConnection cc = iter.next();
++          try {
++            cc.transport.close();
++          } catch (Exception e) {
++            log.debug("Error closing transport during shutdown", e);
++          }
++        }
++      }
++
++      // this will render the pool unusable and cause the background thread to exit
++      this.cache = null;
++    }
++
++    try {
++      closerExitLatch.await();
++    } catch (InterruptedException e) {
++      throw new RuntimeException(e);
++    }
++  }
++
++  private Map<ThriftTransportKey,List<CachedConnection>> getCache() {
++    if (cache == null)
++      throw new TransportPoolShutdownException();
++    return cache;
++  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/400b991f/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/util/CleanUp.java
index 0000000,0000000..be5a41a
new file mode 100644
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java
@@@ -1,0 -1,0 +1,63 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements.  See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.accumulo.core.util;
++
++import java.util.Set;
++
++import org.apache.accumulo.core.client.impl.ThriftTransportPool;
++import org.apache.accumulo.fate.zookeeper.ZooSession;
++import org.apache.log4j.Logger;
++
++/**
++ * 
++ */
++public class CleanUp {
++  
++  private static final Logger log = Logger.getLogger(CleanUp.class);
++  
++  /**
++   * kills all threads created by internal Accumulo singleton resources. After this method is called, no accumulo client will work in the current classloader.
++   */
++  public static void shutdownNow() {
++    ThriftTransportPool.getInstance().shutdown();
++    ZooSession.shutdown();
++    waitForZooKeeperClientThreads();
++  }
++  
++  /**
++   * As documented in https://issues.apache.org/jira/browse/ZOOKEEPER-1816, ZooKeeper.close()
++   * is a non-blocking call. This method will wait on the ZooKeeper internal threads to exit.
++   */
++  private static void waitForZooKeeperClientThreads() {
++    Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
++    for (Thread thread : threadSet) {    
++      // find ZooKeeper threads that were created in the same ClassLoader as the current thread.
++      if (thread.getClass().getName().startsWith("org.apache.zookeeper.ClientCnxn") &&
++          thread.getContextClassLoader().equals(Thread.currentThread().getContextClassLoader())) {
++
++        // wait for the thread the die
++        while (thread.isAlive()) {
++          try {
++            Thread.sleep(100);
++          } catch (InterruptedException e) {
++            log.error(e.getMessage(), e);
++          }
++        }
++      }
++    }
++  }
++}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/400b991f/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
----------------------------------------------------------------------
diff --cc fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
index 7258ff0,0000000..040b01d
mode 100644,000000..100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
@@@ -1,139 -1,0 +1,160 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.fate.zookeeper;
 +
 +import java.io.IOException;
 +import java.net.UnknownHostException;
 +import java.util.HashMap;
 +import java.util.Map;
 +
 +import org.apache.accumulo.fate.util.UtilWaitThread;
 +import org.apache.log4j.Logger;
 +import org.apache.zookeeper.WatchedEvent;
 +import org.apache.zookeeper.Watcher;
 +import org.apache.zookeeper.Watcher.Event.KeeperState;
 +import org.apache.zookeeper.ZooKeeper;
 +import org.apache.zookeeper.ZooKeeper.States;
 +
- class ZooSession {
++public class ZooSession {
 +  
++  public static class ZooSessionShutdownException extends RuntimeException {
++
++    private static final long serialVersionUID = 1L;
++
++  }
++
 +  private static final Logger log = Logger.getLogger(ZooSession.class);
 +  
 +  private static class ZooSessionInfo {
 +    public ZooSessionInfo(ZooKeeper zooKeeper, ZooWatcher watcher) {
 +      this.zooKeeper = zooKeeper;
 +    }
 +    
 +    ZooKeeper zooKeeper;
 +  }
 +  
 +  private static Map<String,ZooSessionInfo> sessions = new HashMap<String,ZooSessionInfo>();
 +  
 +  private static String sessionKey(String keepers, int timeout, String scheme, byte[] auth) {
 +    return keepers + ":" + timeout + ":" + (scheme == null ? "" : scheme) + ":" + (auth == null ? "" : new String(auth));
 +  }
 +  
 +  private static class ZooWatcher implements Watcher {
 +    
 +    public void process(WatchedEvent event) {
 +      if (event.getState() == KeeperState.Expired) {
 +        log.debug("Session expired, state of current session : " + event.getState());
 +      }
 +    }
 +    
 +  }
 +  
 +  public static ZooKeeper connect(String host, int timeout, String scheme, byte[] auth, Watcher watcher) {
 +    final int TIME_BETWEEN_CONNECT_CHECKS_MS = 100;
 +    final int TOTAL_CONNECT_TIME_WAIT_MS = 10 * 1000;
 +    boolean tryAgain = true;
 +    int sleepTime = 100;
 +    ZooKeeper zooKeeper = null;
 +    
 +    long startTime = System.currentTimeMillis();
 +
 +    while (tryAgain) {
 +      try {
 +        zooKeeper = new ZooKeeper(host, timeout, watcher);
 +        // it may take some time to get connected to zookeeper if some of the servers are down
 +        for (int i = 0; i < TOTAL_CONNECT_TIME_WAIT_MS / TIME_BETWEEN_CONNECT_CHECKS_MS && tryAgain; i++) {
 +          if (zooKeeper.getState().equals(States.CONNECTED)) {
 +            if (auth != null)
 +              zooKeeper.addAuthInfo(scheme, auth);
 +            tryAgain = false;
 +          } else
 +            UtilWaitThread.sleep(TIME_BETWEEN_CONNECT_CHECKS_MS);
 +        }
 +        
 +        if (System.currentTimeMillis() - startTime > 2 * timeout)
 +          throw new RuntimeException("Failed to connect to zookeeper (" + host + ") within 2x zookeeper timeout period " + timeout);
 +
 +      } catch (UnknownHostException uhe) {
 +        // do not expect to recover from this
 +        log.warn(uhe.getClass().getName() + " : " + uhe.getMessage());
 +        throw new RuntimeException(uhe);
 +      } catch (IOException e) {
 +        log.warn("Connection to zooKeeper failed, will try again in " + String.format("%.2f secs", sleepTime / 1000.0), e);
 +      } finally {
 +        if (tryAgain && zooKeeper != null)
 +          try {
 +            zooKeeper.close();
 +            zooKeeper = null;
 +          } catch (InterruptedException e) {
 +            log.warn("interrupted", e);
 +          }
 +      }
 +      
 +      if (tryAgain) {
 +        UtilWaitThread.sleep(sleepTime);
 +        if (sleepTime < 10000)
 +          sleepTime = (int) (sleepTime + sleepTime * Math.random());
 +      }
 +    }
 +    
 +    return zooKeeper;
 +  }
 +  
 +  public static synchronized ZooKeeper getSession(String zooKeepers, int timeout) {
 +    return getSession(zooKeepers, timeout, null, null);
 +  }
 +  
 +  public static synchronized ZooKeeper getSession(String zooKeepers, int timeout, String scheme, byte[] auth) {
 +    
++    if (sessions == null)
++      throw new ZooSessionShutdownException();
++
 +    String sessionKey = sessionKey(zooKeepers, timeout, scheme, auth);
 +    
 +    // a read-only session can use a session with authorizations, so cache a copy for it w/out auths
 +    String readOnlySessionKey = sessionKey(zooKeepers, timeout, null, null);
 +    ZooSessionInfo zsi = sessions.get(sessionKey);
 +    if (zsi != null && zsi.zooKeeper.getState() == States.CLOSED) {
 +      if (auth != null && sessions.get(readOnlySessionKey) == zsi)
 +        sessions.remove(readOnlySessionKey);
 +      zsi = null;
 +      sessions.remove(sessionKey);
 +    }
 +    
 +    if (zsi == null) {
 +      ZooWatcher watcher = new ZooWatcher();
 +      log.debug("Connecting to " + zooKeepers + " with timeout " + timeout + " with auth");
 +      zsi = new ZooSessionInfo(connect(zooKeepers, timeout, scheme, auth, watcher), watcher);
 +      sessions.put(sessionKey, zsi);
 +      if (auth != null && !sessions.containsKey(readOnlySessionKey))
 +        sessions.put(readOnlySessionKey, zsi);
 +    }
 +    return zsi.zooKeeper;
 +  }
++
++  public static synchronized void shutdown() {
++    for (ZooSessionInfo zsi : sessions.values()) {
++      try {
++        zsi.zooKeeper.close();
++      } catch (Exception e) {
++        log.debug("Error closing zookeeper during shutdown", e);
++      }
++    }
++
++    sessions = null;
++  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/400b991f/test/src/main/java/org/apache/accumulo/test/functional/CleanUpTest.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/functional/CleanUpTest.java
index 0000000,0000000..5a0ca26
new file mode 100644
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CleanUpTest.java
@@@ -1,0 -1,0 +1,153 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements.  See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.accumulo.test.functional;
++
++import java.util.Collections;
++import java.util.Iterator;
++import java.util.List;
++import java.util.Map;
++import java.util.Map.Entry;
++import java.util.Set;
++
++import org.apache.accumulo.core.client.BatchWriter;
++import org.apache.accumulo.core.client.Scanner;
++import org.apache.accumulo.core.data.Key;
++import org.apache.accumulo.core.data.Mutation;
++import org.apache.accumulo.core.data.Value;
++import org.apache.accumulo.core.security.Authorizations;
++import org.apache.accumulo.core.util.CleanUp;
++
++/**
++ * 
++ */
++public class CleanUpTest extends FunctionalTest {
++
++  @Override
++  public Map<String,String> getInitialConfig() {
++    return Collections.emptyMap();
++  }
++
++  @Override
++  public List<TableSetup> getTablesToCreate() {
++    return Collections.emptyList();
++  }
++
++  @Override
++  public void run() throws Exception {
++
++
++    getConnector().tableOperations().create("test");
++
++    BatchWriter bw = getConnector().createBatchWriter("test", 1000000, 60000, 1);
++
++    Mutation m1 = new Mutation("r1");
++    m1.put("cf1", "cq1", 1, "5");
++
++    bw.addMutation(m1);
++
++    bw.flush();
++
++    Scanner scanner = getConnector().createScanner("test", new Authorizations());
++
++    int count = 0;
++    for (Entry<Key,Value> entry : scanner) {
++      count++;
++      if (!entry.getValue().toString().equals("5")) {
++        throw new Exception("Unexpected value " + entry.getValue());
++      }
++    }
++
++    if (count != 1) {
++      throw new Exception("Unexpected count " + count);
++    }
++
++    if (countThreads() < 2) {
++      printThreadNames();
++      throw new Exception("Not seeing expected threads");
++    }
++
++    CleanUp.shutdownNow();
++
++    Mutation m2 = new Mutation("r2");
++    m2.put("cf1", "cq1", 1, "6");
++
++    try {
++      bw.addMutation(m1);
++      bw.flush();
++      throw new Exception("batch writer did not fail");
++    } catch (Exception e) {
++
++    }
++
++    try {
++      // expect this to fail also, want to clean up batch writer threads
++      bw.close();
++      throw new Exception("batch writer close not fail");
++    } catch (Exception e) {
++
++    }
++
++    try {
++      count = 0;
++      Iterator<Entry<Key,Value>> iter = scanner.iterator();
++      while (iter.hasNext()) {
++        iter.next();
++        count++;
++      }
++      throw new Exception("scanner did not fail");
++    } catch (Exception e) {
++
++    }
++
++    if (countThreads() > 0) {
++      printThreadNames();
++      throw new Exception("Threads did not go away");
++    }
++  }
++
++  private void printThreadNames() {
++    Set<Thread> threads = Thread.getAllStackTraces().keySet();
++    for (Thread thread : threads) {
++      System.out.println("thread name:" + thread.getName());
++      thread.getStackTrace();
++
++    }
++  }
++
++  /**
++   * count threads that should be cleaned up
++   * 
++   */
++  private int countThreads() {
++    int count = 0;
++    Set<Thread> threads = Thread.getAllStackTraces().keySet();
++    for (Thread thread : threads) {
++
++      if (thread.getName().toLowerCase().contains("sendthread") || thread.getName().toLowerCase().contains("eventthread"))
++        count++;
++
++      if (thread.getName().toLowerCase().contains("thrift") && thread.getName().toLowerCase().contains("pool"))
++        count++;
++    }
++
++    return count;
++  }
++
++  @Override
++  public void cleanup() throws Exception {}
++
++}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/400b991f/test/system/auto/simple/cleanup.py
----------------------------------------------------------------------
diff --cc test/system/auto/simple/cleanup.py
index 0000000,1ed8aff..03f7721
mode 000000,100755..100755
--- a/test/system/auto/simple/cleanup.py
+++ b/test/system/auto/simple/cleanup.py
@@@ -1,0 -1,30 +1,30 @@@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements.  See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License.  You may obtain a copy of the License at
+ #
+ #     http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+ 
+ from JavaTest import JavaTest
+ 
+ import unittest
+ 
+ class CleanUpTest(JavaTest):
+     "Test clean up util"
+ 
+     order = 21
 -    testClass="org.apache.accumulo.server.test.functional.CleanUpTest"
++    testClass="org.apache.accumulo.test.functional.CleanUpTest"
+ 
+ 
+ def suite():
+     result = unittest.TestSuite()
+     result.addTest(CleanUpTest())
+     return result


[3/6] git commit: ACCUMULO-2128 added test for static clean up utility

Posted by kt...@apache.org.
ACCUMULO-2128 added test for static clean up utility


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/8f9fe417
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/8f9fe417
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/8f9fe417

Branch: refs/heads/master
Commit: 8f9fe41751415ab66ddbce6d6dec058999afc1d3
Parents: c94a73f
Author: Keith Turner <kt...@apache.org>
Authored: Mon Jan 6 20:17:45 2014 -0500
Committer: Keith Turner <kt...@apache.org>
Committed: Mon Jan 6 20:17:59 2014 -0500

----------------------------------------------------------------------
 .../server/test/functional/CleanUpTest.java     | 153 +++++++++++++++++++
 test/system/auto/simple/cleanup.py              |  30 ++++
 2 files changed, 183 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/8f9fe417/src/server/src/main/java/org/apache/accumulo/server/test/functional/CleanUpTest.java
----------------------------------------------------------------------
diff --git a/src/server/src/main/java/org/apache/accumulo/server/test/functional/CleanUpTest.java b/src/server/src/main/java/org/apache/accumulo/server/test/functional/CleanUpTest.java
new file mode 100644
index 0000000..99fbcfe
--- /dev/null
+++ b/src/server/src/main/java/org/apache/accumulo/server/test/functional/CleanUpTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.test.functional;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.CleanUp;
+
+/**
+ * 
+ */
+public class CleanUpTest extends FunctionalTest {
+
+  @Override
+  public Map<String,String> getInitialConfig() {
+    return Collections.emptyMap();
+  }
+
+  @Override
+  public List<TableSetup> getTablesToCreate() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public void run() throws Exception {
+
+
+    getConnector().tableOperations().create("test");
+
+    BatchWriter bw = getConnector().createBatchWriter("test", 1000000, 60000, 1);
+
+    Mutation m1 = new Mutation("r1");
+    m1.put("cf1", "cq1", 1, "5");
+
+    bw.addMutation(m1);
+
+    bw.flush();
+
+    Scanner scanner = getConnector().createScanner("test", new Authorizations());
+
+    int count = 0;
+    for (Entry<Key,Value> entry : scanner) {
+      count++;
+      if (!entry.getValue().toString().equals("5")) {
+        throw new Exception("Unexpected value " + entry.getValue());
+      }
+    }
+
+    if (count != 1) {
+      throw new Exception("Unexpected count " + count);
+    }
+
+    if (countThreads() < 2) {
+      printThreadNames();
+      throw new Exception("Not seeing expected threads");
+    }
+
+    CleanUp.shutdownNow();
+
+    Mutation m2 = new Mutation("r2");
+    m2.put("cf1", "cq1", 1, "6");
+
+    try {
+      bw.addMutation(m1);
+      bw.flush();
+      throw new Exception("batch writer did not fail");
+    } catch (Exception e) {
+
+    }
+
+    try {
+      // expect this to fail also, want to clean up batch writer threads
+      bw.close();
+      throw new Exception("batch writer close not fail");
+    } catch (Exception e) {
+
+    }
+
+    try {
+      count = 0;
+      Iterator<Entry<Key,Value>> iter = scanner.iterator();
+      while (iter.hasNext()) {
+        iter.next();
+        count++;
+      }
+      throw new Exception("scanner did not fail");
+    } catch (Exception e) {
+
+    }
+
+    if (countThreads() > 0) {
+      printThreadNames();
+      throw new Exception("Threads did not go away");
+    }
+  }
+
+  private void printThreadNames() {
+    Set<Thread> threads = Thread.getAllStackTraces().keySet();
+    for (Thread thread : threads) {
+      System.out.println("thread name:" + thread.getName());
+      thread.getStackTrace();
+
+    }
+  }
+
+  /**
+   * count threads that should be cleaned up
+   * 
+   */
+  private int countThreads() {
+    int count = 0;
+    Set<Thread> threads = Thread.getAllStackTraces().keySet();
+    for (Thread thread : threads) {
+
+      if (thread.getName().toLowerCase().contains("sendthread") || thread.getName().toLowerCase().contains("eventthread"))
+        count++;
+
+      if (thread.getName().toLowerCase().contains("thrift") && thread.getName().toLowerCase().contains("pool"))
+        count++;
+    }
+
+    return count;
+  }
+
+  @Override
+  public void cleanup() throws Exception {}
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8f9fe417/test/system/auto/simple/cleanup.py
----------------------------------------------------------------------
diff --git a/test/system/auto/simple/cleanup.py b/test/system/auto/simple/cleanup.py
new file mode 100755
index 0000000..1ed8aff
--- /dev/null
+++ b/test/system/auto/simple/cleanup.py
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from JavaTest import JavaTest
+
+import unittest
+
+class CleanUpTest(JavaTest):
+    "Test clean up util"
+
+    order = 21
+    testClass="org.apache.accumulo.server.test.functional.CleanUpTest"
+
+
+def suite():
+    result = unittest.TestSuite()
+    result.addTest(CleanUpTest())
+    return result


[5/6] git commit: Merge remote-tracking branch 'origin/1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT

Posted by kt...@apache.org.
Merge remote-tracking branch 'origin/1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT

Conflicts:
	core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/9c092cad
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/9c092cad
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/9c092cad

Branch: refs/heads/master
Commit: 9c092cadde75250fa84e049eeae33687636dbc99
Parents: ee3ccb8 400b991
Author: Keith Turner <kt...@apache.org>
Authored: Mon Jan 6 21:11:02 2014 -0500
Committer: Keith Turner <kt...@apache.org>
Committed: Mon Jan 6 21:11:02 2014 -0500

----------------------------------------------------------------------
 .../core/client/impl/ThriftTransportPool.java   | 100 +++++++++++---
 .../org/apache/accumulo/core/util/CleanUp.java  |  63 +++++++++
 .../accumulo/fate/zookeeper/ZooSession.java     |  23 +++-
 .../accumulo/test/functional/CleanUpIT.java     | 137 +++++++++++++++++++
 4 files changed, 304 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/9c092cad/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
index a553cc1,f123289..a5fecd5
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
@@@ -357,35 -376,20 +375,35 @@@ public class ThriftTransportPool 
    
    private ThriftTransportPool() {}
    
 -  public TTransport getTransport(String location, int port) throws TTransportException {
 -    return getTransport(location, port, 0);
 -  }
 -  
 -  public TTransport getTransportWithDefaultTimeout(InetSocketAddress addr, AccumuloConfiguration conf) throws TTransportException {
 -    return getTransport(addr.getAddress().getHostAddress(), addr.getPort(), conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
 +  public TTransport getTransportWithDefaultTimeout(HostAndPort addr, AccumuloConfiguration conf) throws TTransportException {
 +    return getTransport(String.format("%s:%d", addr.getHostText(), addr.getPort()), conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT), SslConnectionParams.forClient(conf));
    }
    
 -  public TTransport getTransport(InetSocketAddress addr, long timeout) throws TTransportException {
 -    return getTransport(addr.getAddress().getHostAddress(), addr.getPort(), timeout);
 +  public TTransport getTransport(String location, long milliseconds, SslConnectionParams sslParams) throws TTransportException {
 +    return getTransport(new ThriftTransportKey(location, milliseconds, sslParams));
    }
    
 -  public TTransport getTransportWithDefaultTimeout(String location, int port, AccumuloConfiguration conf) throws TTransportException {
 -    return getTransport(location, port, conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
 +  private TTransport getTransport(ThriftTransportKey cacheKey) throws TTransportException {
 +    synchronized (this) {
 +      // atomically reserve location if it exist in cache
-       List<CachedConnection> ccl = cache.get(cacheKey);
++      List<CachedConnection> ccl = getCache().get(cacheKey);
 +      
 +      if (ccl == null) {
 +        ccl = new LinkedList<CachedConnection>();
-         cache.put(cacheKey, ccl);
++        getCache().put(cacheKey, ccl);
 +      }
 +      
 +      for (CachedConnection cachedConnection : ccl) {
 +        if (!cachedConnection.isReserved()) {
 +          cachedConnection.setReserved(true);
 +          if (log.isTraceEnabled())
 +            log.trace("Using existing connection to " + cacheKey.getLocation() + ":" + cacheKey.getPort());
 +          return cachedConnection.transport;
 +        }
 +      }
 +    }
 +    
 +    return createNewTransport(cacheKey);
    }
    
    Pair<String,TTransport> getAnyTransport(List<ThriftTransportKey> servers, boolean preferCachedConnection) throws TTransportException {
@@@ -484,9 -531,9 +507,9 @@@
      CachedTTransport ctsc = (CachedTTransport) tsc;
      
      ArrayList<CachedConnection> closeList = new ArrayList<ThriftTransportPool.CachedConnection>();
 -
 +    
      synchronized (this) {
-       List<CachedConnection> ccl = cache.get(ctsc.getCacheKey());
+       List<CachedConnection> ccl = getCache().get(ctsc.getCacheKey());
        for (Iterator<CachedConnection> iterator = ccl.iterator(); iterator.hasNext();) {
          CachedConnection cachedConnection = iterator.next();
          if (cachedConnection.transport == tsc) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9c092cad/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9c092cad/test/src/test/java/org/apache/accumulo/test/functional/CleanUpIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/functional/CleanUpIT.java
index 0000000,0000000..ad5b2fd
new file mode 100644
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/functional/CleanUpIT.java
@@@ -1,0 -1,0 +1,137 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements.  See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.accumulo.test.functional;
++
++import java.util.Iterator;
++import java.util.Map.Entry;
++import java.util.Set;
++
++import org.apache.accumulo.core.client.BatchWriter;
++import org.apache.accumulo.core.client.BatchWriterConfig;
++import org.apache.accumulo.core.client.Scanner;
++import org.apache.accumulo.core.data.Key;
++import org.apache.accumulo.core.data.Mutation;
++import org.apache.accumulo.core.data.Value;
++import org.apache.accumulo.core.security.Authorizations;
++import org.apache.accumulo.core.util.CleanUp;
++import org.junit.Test;
++
++/**
++ * 
++ */
++public class CleanUpIT extends SimpleMacIT {
++  @Test(timeout = 30 * 1000)
++  public void run() throws Exception {
++
++    String tableName = getTableNames(1)[0];
++    getConnector().tableOperations().create(tableName);
++
++    BatchWriter bw = getConnector().createBatchWriter(tableName, new BatchWriterConfig());
++
++    Mutation m1 = new Mutation("r1");
++    m1.put("cf1", "cq1", 1, "5");
++
++    bw.addMutation(m1);
++
++    bw.flush();
++
++    Scanner scanner = getConnector().createScanner(tableName, new Authorizations());
++
++    int count = 0;
++    for (Entry<Key,Value> entry : scanner) {
++      count++;
++      if (!entry.getValue().toString().equals("5")) {
++        throw new Exception("Unexpected value " + entry.getValue());
++      }
++    }
++
++    if (count != 1) {
++      throw new Exception("Unexpected count " + count);
++    }
++
++    if (countThreads() < 2) {
++      printThreadNames();
++      throw new Exception("Not seeing expected threads");
++    }
++
++    CleanUp.shutdownNow();
++
++    Mutation m2 = new Mutation("r2");
++    m2.put("cf1", "cq1", 1, "6");
++
++    try {
++      bw.addMutation(m1);
++      bw.flush();
++      throw new Exception("batch writer did not fail");
++    } catch (Exception e) {
++
++    }
++
++    try {
++      // expect this to fail also, want to clean up batch writer threads
++      bw.close();
++      throw new Exception("batch writer close not fail");
++    } catch (Exception e) {
++
++    }
++
++    try {
++      count = 0;
++      Iterator<Entry<Key,Value>> iter = scanner.iterator();
++      while (iter.hasNext()) {
++        iter.next();
++        count++;
++      }
++      throw new Exception("scanner did not fail");
++    } catch (Exception e) {
++
++    }
++
++    if (countThreads() > 0) {
++      printThreadNames();
++      throw new Exception("Threads did not go away");
++    }
++  }
++
++  private void printThreadNames() {
++    Set<Thread> threads = Thread.getAllStackTraces().keySet();
++    for (Thread thread : threads) {
++      System.out.println("thread name:" + thread.getName());
++      thread.getStackTrace();
++
++    }
++  }
++
++  /**
++   * count threads that should be cleaned up
++   * 
++   */
++  private int countThreads() {
++    int count = 0;
++    Set<Thread> threads = Thread.getAllStackTraces().keySet();
++    for (Thread thread : threads) {
++
++      if (thread.getName().toLowerCase().contains("sendthread") || thread.getName().toLowerCase().contains("eventthread"))
++        count++;
++
++      if (thread.getName().toLowerCase().contains("thrift") && thread.getName().toLowerCase().contains("pool"))
++        count++;
++    }
++
++    return count;
++  }
++}


[6/6] git commit: Merge branch '1.6.0-SNAPSHOT'

Posted by kt...@apache.org.
Merge branch '1.6.0-SNAPSHOT'


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/4fbba38a
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/4fbba38a
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/4fbba38a

Branch: refs/heads/master
Commit: 4fbba38aea78467b6299d2ecb2339d058f8f3ebf
Parents: b8bd259 9c092ca
Author: Keith Turner <kt...@apache.org>
Authored: Mon Jan 6 21:12:57 2014 -0500
Committer: Keith Turner <kt...@apache.org>
Committed: Mon Jan 6 21:12:57 2014 -0500

----------------------------------------------------------------------
 .../core/client/impl/ThriftTransportPool.java   | 100 +++++++++++---
 .../org/apache/accumulo/core/util/CleanUp.java  |  63 +++++++++
 .../accumulo/fate/zookeeper/ZooSession.java     |  23 +++-
 .../accumulo/test/functional/CleanUpIT.java     | 137 +++++++++++++++++++
 4 files changed, 304 insertions(+), 19 deletions(-)
----------------------------------------------------------------------



[2/6] git commit: ACCUMULO-2128 added utility to cleanup accumulo static resources

Posted by kt...@apache.org.
ACCUMULO-2128 added utility to cleanup accumulo static resources

Signed-off-by: Keith Turner <kt...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/715825b3
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/715825b3
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/715825b3

Branch: refs/heads/master
Commit: 715825b3299fd742d8570971a7f271178b932812
Parents: 71f150a
Author: Keith Turner <kt...@apache.org>
Authored: Thu Jan 2 21:03:55 2014 -0500
Committer: Keith Turner <kt...@apache.org>
Committed: Mon Jan 6 20:17:58 2014 -0500

----------------------------------------------------------------------
 .../core/client/impl/ThriftTransportPool.java   | 102 +++++++++++++++----
 .../org/apache/accumulo/core/util/CleanUp.java  |  35 +++++++
 .../accumulo/core/zookeeper/ZooSession.java     |  23 ++++-
 3 files changed, 140 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/715825b3/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java b/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
index ef3724b..7468051 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
@@ -30,6 +30,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -53,6 +54,8 @@ public class ThriftTransportPool {
   private Map<ThriftTransportKey,Long> errorCount = new HashMap<ThriftTransportKey,Long>();
   private Map<ThriftTransportKey,Long> errorTime = new HashMap<ThriftTransportKey,Long>();
   private Set<ThriftTransportKey> serversWarnedAbout = new HashSet<ThriftTransportKey>();
+
+  private CountDownLatch closerExitLatch;
   
   private static final Logger log = Logger.getLogger(ThriftTransportPool.class);
   
@@ -78,20 +81,26 @@ public class ThriftTransportPool {
     long lastReturnTime;
   }
   
+  public static class TransportPoolShutdownException extends RuntimeException {
+    private static final long serialVersionUID = 1L;
+  }
+
   private static class Closer implements Runnable {
     ThriftTransportPool pool;
-    
-    public Closer(ThriftTransportPool pool) {
+    private CountDownLatch closerExitLatch;
+
+    public Closer(ThriftTransportPool pool, CountDownLatch closerExitLatch) {
       this.pool = pool;
+      this.closerExitLatch = closerExitLatch;
     }
     
-    public void run() {
+    private void closeConnections() {
       while (true) {
         
         ArrayList<CachedConnection> connectionsToClose = new ArrayList<CachedConnection>();
         
         synchronized (pool) {
-          for (List<CachedConnection> ccl : pool.cache.values()) {
+          for (List<CachedConnection> ccl : pool.getCache().values()) {
             Iterator<CachedConnection> iter = ccl.iterator();
             while (iter.hasNext()) {
               CachedConnection cachedConnection = iter.next();
@@ -103,7 +112,7 @@ public class ThriftTransportPool {
             }
           }
           
-          for (List<CachedConnection> ccl : pool.cache.values()) {
+          for (List<CachedConnection> ccl : pool.getCache().values()) {
             for (CachedConnection cachedConnection : ccl) {
               cachedConnection.transport.checkForStuckIO(STUCK_THRESHOLD);
             }
@@ -132,6 +141,15 @@ public class ThriftTransportPool {
         }
       }
     }
+
+    public void run() {
+      try {
+        closeConnections();
+      } catch (TransportPoolShutdownException e) {
+      } finally {
+        closerExitLatch.countDown();
+      }
+    }
   }
   
   static class CachedTTransport extends TTransport {
@@ -384,14 +402,14 @@ public class ThriftTransportPool {
       synchronized (this) {
         
         // randomly pick a server from the connection cache
-        serversSet.retainAll(cache.keySet());
+        serversSet.retainAll(getCache().keySet());
         
         if (serversSet.size() > 0) {
           ArrayList<ThriftTransportKey> cachedServers = new ArrayList<ThriftTransportKey>(serversSet);
           Collections.shuffle(cachedServers, random);
           
           for (ThriftTransportKey ttk : cachedServers) {
-            for (CachedConnection cachedConnection : cache.get(ttk)) {
+            for (CachedConnection cachedConnection : getCache().get(ttk)) {
               if (!cachedConnection.isReserved()) {
                 cachedConnection.setReserved(true);
                 if (log.isTraceEnabled())
@@ -411,7 +429,7 @@ public class ThriftTransportPool {
       
       if (!preferCachedConnection) {
         synchronized (this) {
-          List<CachedConnection> cachedConnList = cache.get(ttk);
+          List<CachedConnection> cachedConnList = getCache().get(ttk);
           if (cachedConnList != null) {
             for (CachedConnection cachedConnection : cachedConnList) {
               if (!cachedConnection.isReserved()) {
@@ -444,11 +462,11 @@ public class ThriftTransportPool {
   private TTransport getTransport(ThriftTransportKey cacheKey) throws TTransportException {
     synchronized (this) {
       // atomically reserve location if it exist in cache
-      List<CachedConnection> ccl = cache.get(cacheKey);
+      List<CachedConnection> ccl = getCache().get(cacheKey);
       
       if (ccl == null) {
         ccl = new LinkedList<CachedConnection>();
-        cache.put(cacheKey, ccl);
+        getCache().put(cacheKey, ccl);
       }
       
       for (CachedConnection cachedConnection : ccl) {
@@ -486,15 +504,20 @@ public class ThriftTransportPool {
     CachedConnection cc = new CachedConnection(tsc);
     cc.setReserved(true);
     
-    synchronized (this) {
-      List<CachedConnection> ccl = cache.get(cacheKey);
+    try {
+      synchronized (this) {
+        List<CachedConnection> ccl = getCache().get(cacheKey);
+
+        if (ccl == null) {
+          ccl = new LinkedList<CachedConnection>();
+          getCache().put(cacheKey, ccl);
+        }
       
-      if (ccl == null) {
-        ccl = new LinkedList<CachedConnection>();
-        cache.put(cacheKey, ccl);
+        ccl.add(cc);
       }
-      
-      ccl.add(cc);
+    } catch (TransportPoolShutdownException e) {
+      cc.transport.close();
+      throw e;
     }
     return cc.transport;
   }
@@ -510,7 +533,7 @@ public class ThriftTransportPool {
     ArrayList<CachedConnection> closeList = new ArrayList<ThriftTransportPool.CachedConnection>();
 
     synchronized (this) {
-      List<CachedConnection> ccl = cache.get(ctsc.getCacheKey());
+      List<CachedConnection> ccl = getCache().get(ctsc.getCacheKey());
       for (Iterator<CachedConnection> iterator = ccl.iterator(); iterator.hasNext();) {
         CachedConnection cachedConnection = iterator.next();
         if (cachedConnection.transport == tsc) {
@@ -600,8 +623,49 @@ public class ThriftTransportPool {
     }
     
     if (daemonStarted.compareAndSet(false, true)) {
-      new Daemon(new Closer(instance), "Thrift Connection Pool Checker").start();
+      CountDownLatch closerExitLatch = new CountDownLatch(1);
+      new Daemon(new Closer(instance, closerExitLatch), "Thrift Connection Pool Checker").start();
+      instance.setCloserExitLatch(closerExitLatch);
     }
     return instance;
   }
+  
+  private synchronized void setCloserExitLatch(CountDownLatch closerExitLatch) {
+    this.closerExitLatch = closerExitLatch;
+  }
+
+  public void shutdown() {
+    synchronized (this) {
+      if (cache == null)
+        return;
+
+      // close any connections in the pool... even ones that are in use
+      for (List<CachedConnection> ccl : getCache().values()) {
+        Iterator<CachedConnection> iter = ccl.iterator();
+        while (iter.hasNext()) {
+          CachedConnection cc = iter.next();
+          try {
+            cc.transport.close();
+          } catch (Exception e) {
+            log.debug("Error closing transport during shutdown", e);
+          }
+        }
+      }
+
+      // this will render the pool unusable and cause the background thread to exit
+      this.cache = null;
+    }
+
+    try {
+      closerExitLatch.await();
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private Map<ThriftTransportKey,List<CachedConnection>> getCache() {
+    if (cache == null)
+      throw new TransportPoolShutdownException();
+    return cache;
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/715825b3/src/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java b/src/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java
new file mode 100644
index 0000000..ba02f0b
--- /dev/null
+++ b/src/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util;
+
+import org.apache.accumulo.core.client.impl.ThriftTransportPool;
+import org.apache.accumulo.core.zookeeper.ZooSession;
+
+/**
+ * 
+ */
+public class CleanUp {
+  /**
+   * kills all threads created by internal Accumulo singleton resources. After this method is called, no accumulo client will work in the current classloader.
+   */
+  public static void shutdownNow() {
+    ThriftTransportPool.getInstance().shutdown();
+    ZooSession.shutdown();
+    // need to get code from jared w
+    // waitForZooKeeperClientThreads();
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/715825b3/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java b/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java
index b3db26f..e64f0c5 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java
@@ -29,8 +29,14 @@ import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooKeeper.States;
 
-class ZooSession {
+public class ZooSession {
   
+  public static class ZooSessionShutdownException extends RuntimeException {
+
+    private static final long serialVersionUID = 1L;
+
+  }
+
   private static final Logger log = Logger.getLogger(ZooSession.class);
   
   private static class ZooSessionInfo {
@@ -114,6 +120,9 @@ class ZooSession {
   
   public static synchronized ZooKeeper getSession(String zooKeepers, int timeout, String auth) {
     
+    if (sessions == null)
+      throw new ZooSessionShutdownException();
+
     String sessionKey = sessionKey(zooKeepers, timeout, auth);
     
     // a read-only session can use a session with authorizations, so cache a copy for it w/out auths
@@ -137,4 +146,16 @@ class ZooSession {
     }
     return zsi.zooKeeper;
   }
+
+  public static synchronized void shutdown() {
+    for (ZooSessionInfo zsi : sessions.values()) {
+      try {
+        zsi.zooKeeper.close();
+      } catch (Exception e) {
+        log.debug("Error closing zookeeper during shutdown", e);
+      }
+    }
+
+    sessions = null;
+  }
 }