You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2014/01/07 03:05:22 UTC

[2/5] git commit: ACCUMULO-2128 added utility to cleanup accumulo static resources

ACCUMULO-2128 added utility to cleanup accumulo static resources

Signed-off-by: Keith Turner <kt...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/715825b3
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/715825b3
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/715825b3

Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 715825b3299fd742d8570971a7f271178b932812
Parents: 71f150a
Author: Keith Turner <kt...@apache.org>
Authored: Thu Jan 2 21:03:55 2014 -0500
Committer: Keith Turner <kt...@apache.org>
Committed: Mon Jan 6 20:17:58 2014 -0500

----------------------------------------------------------------------
 .../core/client/impl/ThriftTransportPool.java   | 102 +++++++++++++++----
 .../org/apache/accumulo/core/util/CleanUp.java  |  35 +++++++
 .../accumulo/core/zookeeper/ZooSession.java     |  23 ++++-
 3 files changed, 140 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/715825b3/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java b/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
index ef3724b..7468051 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
@@ -30,6 +30,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -53,6 +54,8 @@ public class ThriftTransportPool {
   private Map<ThriftTransportKey,Long> errorCount = new HashMap<ThriftTransportKey,Long>();
   private Map<ThriftTransportKey,Long> errorTime = new HashMap<ThriftTransportKey,Long>();
   private Set<ThriftTransportKey> serversWarnedAbout = new HashSet<ThriftTransportKey>();
+
+  private CountDownLatch closerExitLatch;
   
   private static final Logger log = Logger.getLogger(ThriftTransportPool.class);
   
@@ -78,20 +81,26 @@ public class ThriftTransportPool {
     long lastReturnTime;
   }
   
+  public static class TransportPoolShutdownException extends RuntimeException {
+    private static final long serialVersionUID = 1L;
+  }
+
   private static class Closer implements Runnable {
     ThriftTransportPool pool;
-    
-    public Closer(ThriftTransportPool pool) {
+    private CountDownLatch closerExitLatch;
+
+    public Closer(ThriftTransportPool pool, CountDownLatch closerExitLatch) {
       this.pool = pool;
+      this.closerExitLatch = closerExitLatch;
     }
     
-    public void run() {
+    private void closeConnections() {
       while (true) {
         
         ArrayList<CachedConnection> connectionsToClose = new ArrayList<CachedConnection>();
         
         synchronized (pool) {
-          for (List<CachedConnection> ccl : pool.cache.values()) {
+          for (List<CachedConnection> ccl : pool.getCache().values()) {
             Iterator<CachedConnection> iter = ccl.iterator();
             while (iter.hasNext()) {
               CachedConnection cachedConnection = iter.next();
@@ -103,7 +112,7 @@ public class ThriftTransportPool {
             }
           }
           
-          for (List<CachedConnection> ccl : pool.cache.values()) {
+          for (List<CachedConnection> ccl : pool.getCache().values()) {
             for (CachedConnection cachedConnection : ccl) {
               cachedConnection.transport.checkForStuckIO(STUCK_THRESHOLD);
             }
@@ -132,6 +141,15 @@ public class ThriftTransportPool {
         }
       }
     }
+
+    public void run() {
+      try {
+        closeConnections();
+      } catch (TransportPoolShutdownException e) {
+      } finally {
+        closerExitLatch.countDown();
+      }
+    }
   }
   
   static class CachedTTransport extends TTransport {
@@ -384,14 +402,14 @@ public class ThriftTransportPool {
       synchronized (this) {
         
         // randomly pick a server from the connection cache
-        serversSet.retainAll(cache.keySet());
+        serversSet.retainAll(getCache().keySet());
         
         if (serversSet.size() > 0) {
           ArrayList<ThriftTransportKey> cachedServers = new ArrayList<ThriftTransportKey>(serversSet);
           Collections.shuffle(cachedServers, random);
           
           for (ThriftTransportKey ttk : cachedServers) {
-            for (CachedConnection cachedConnection : cache.get(ttk)) {
+            for (CachedConnection cachedConnection : getCache().get(ttk)) {
               if (!cachedConnection.isReserved()) {
                 cachedConnection.setReserved(true);
                 if (log.isTraceEnabled())
@@ -411,7 +429,7 @@ public class ThriftTransportPool {
       
       if (!preferCachedConnection) {
         synchronized (this) {
-          List<CachedConnection> cachedConnList = cache.get(ttk);
+          List<CachedConnection> cachedConnList = getCache().get(ttk);
           if (cachedConnList != null) {
             for (CachedConnection cachedConnection : cachedConnList) {
               if (!cachedConnection.isReserved()) {
@@ -444,11 +462,11 @@ public class ThriftTransportPool {
   private TTransport getTransport(ThriftTransportKey cacheKey) throws TTransportException {
     synchronized (this) {
       // atomically reserve location if it exist in cache
-      List<CachedConnection> ccl = cache.get(cacheKey);
+      List<CachedConnection> ccl = getCache().get(cacheKey);
       
       if (ccl == null) {
         ccl = new LinkedList<CachedConnection>();
-        cache.put(cacheKey, ccl);
+        getCache().put(cacheKey, ccl);
       }
       
       for (CachedConnection cachedConnection : ccl) {
@@ -486,15 +504,20 @@ public class ThriftTransportPool {
     CachedConnection cc = new CachedConnection(tsc);
     cc.setReserved(true);
     
-    synchronized (this) {
-      List<CachedConnection> ccl = cache.get(cacheKey);
+    try {
+      synchronized (this) {
+        List<CachedConnection> ccl = getCache().get(cacheKey);
+
+        if (ccl == null) {
+          ccl = new LinkedList<CachedConnection>();
+          getCache().put(cacheKey, ccl);
+        }
       
-      if (ccl == null) {
-        ccl = new LinkedList<CachedConnection>();
-        cache.put(cacheKey, ccl);
+        ccl.add(cc);
       }
-      
-      ccl.add(cc);
+    } catch (TransportPoolShutdownException e) {
+      cc.transport.close();
+      throw e;
     }
     return cc.transport;
   }
@@ -510,7 +533,7 @@ public class ThriftTransportPool {
     ArrayList<CachedConnection> closeList = new ArrayList<ThriftTransportPool.CachedConnection>();
 
     synchronized (this) {
-      List<CachedConnection> ccl = cache.get(ctsc.getCacheKey());
+      List<CachedConnection> ccl = getCache().get(ctsc.getCacheKey());
       for (Iterator<CachedConnection> iterator = ccl.iterator(); iterator.hasNext();) {
         CachedConnection cachedConnection = iterator.next();
         if (cachedConnection.transport == tsc) {
@@ -600,8 +623,49 @@ public class ThriftTransportPool {
     }
     
     if (daemonStarted.compareAndSet(false, true)) {
-      new Daemon(new Closer(instance), "Thrift Connection Pool Checker").start();
+      CountDownLatch closerExitLatch = new CountDownLatch(1);
+      new Daemon(new Closer(instance, closerExitLatch), "Thrift Connection Pool Checker").start();
+      instance.setCloserExitLatch(closerExitLatch);
     }
     return instance;
   }
+  
+  private synchronized void setCloserExitLatch(CountDownLatch closerExitLatch) {
+    this.closerExitLatch = closerExitLatch;
+  }
+
+  public void shutdown() {
+    synchronized (this) {
+      if (cache == null)
+        return;
+
+      // close any connections in the pool... even ones that are in use
+      for (List<CachedConnection> ccl : getCache().values()) {
+        Iterator<CachedConnection> iter = ccl.iterator();
+        while (iter.hasNext()) {
+          CachedConnection cc = iter.next();
+          try {
+            cc.transport.close();
+          } catch (Exception e) {
+            log.debug("Error closing transport during shutdown", e);
+          }
+        }
+      }
+
+      // this will render the pool unusable and cause the background thread to exit
+      this.cache = null;
+    }
+
+    try {
+      closerExitLatch.await();
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private Map<ThriftTransportKey,List<CachedConnection>> getCache() {
+    if (cache == null)
+      throw new TransportPoolShutdownException();
+    return cache;
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/715825b3/src/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java b/src/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java
new file mode 100644
index 0000000..ba02f0b
--- /dev/null
+++ b/src/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util;
+
+import org.apache.accumulo.core.client.impl.ThriftTransportPool;
+import org.apache.accumulo.core.zookeeper.ZooSession;
+
+/**
+ * 
+ */
+public class CleanUp {
+  /**
+   * kills all threads created by internal Accumulo singleton resources. After this method is called, no accumulo client will work in the current classloader.
+   */
+  public static void shutdownNow() {
+    ThriftTransportPool.getInstance().shutdown();
+    ZooSession.shutdown();
+    // need to get code from jared w
+    // waitForZooKeeperClientThreads();
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/715825b3/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java b/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java
index b3db26f..e64f0c5 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java
@@ -29,8 +29,14 @@ import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooKeeper.States;
 
-class ZooSession {
+public class ZooSession {
   
+  public static class ZooSessionShutdownException extends RuntimeException {
+
+    private static final long serialVersionUID = 1L;
+
+  }
+
   private static final Logger log = Logger.getLogger(ZooSession.class);
   
   private static class ZooSessionInfo {
@@ -114,6 +120,9 @@ class ZooSession {
   
   public static synchronized ZooKeeper getSession(String zooKeepers, int timeout, String auth) {
     
+    if (sessions == null)
+      throw new ZooSessionShutdownException();
+
     String sessionKey = sessionKey(zooKeepers, timeout, auth);
     
     // a read-only session can use a session with authorizations, so cache a copy for it w/out auths
@@ -137,4 +146,16 @@ class ZooSession {
     }
     return zsi.zooKeeper;
   }
+
+  public static synchronized void shutdown() {
+    for (ZooSessionInfo zsi : sessions.values()) {
+      try {
+        zsi.zooKeeper.close();
+      } catch (Exception e) {
+        log.debug("Error closing zookeeper during shutdown", e);
+      }
+    }
+
+    sessions = null;
+  }
 }