You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by uj...@apache.org on 2013/11/18 19:18:15 UTC

git commit: ACCUMULO-1858 Backport ZooKeeper clean up to 1.4 and 1.5.

Updated Branches:
  refs/heads/1.4.5-SNAPSHOT a40a6d423 -> 79d686faa


ACCUMULO-1858 Backport ZooKeeper clean up to 1.4 and 1.5.

Fix cherry picks two commits:

ACCUMULO-1379 - Adding close() to Instance to assist in freeing up resources

(cherry picked from commit 7da1164d87227960d3e0cfc841f753067e2c0304)

Reason: bugfix
Author: John Vines <jv...@gmail.com>

Differs from original by path changes and leaving out ConditionalWriterTest, which is only in 1.6.0+

----

ACCUMULO-1379 Fix edge cases if error in closing ZooKeeperInstance

(cherry picked from commit 3f6c66ede52cb1fb5a122d7bad06d7978ff0a671)

Reason: bugfix
Author: Christopher Tubbs <ct...@apache.org>

Signed-off-by: Bill Slacum <uj...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/79d686fa
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/79d686fa
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/79d686fa

Branch: refs/heads/1.4.5-SNAPSHOT
Commit: 79d686faa1e477b9cbd80c6f833ece402050b490
Parents: a40a6d4
Author: Sean Busbey <bu...@clouderagovt.com>
Authored: Wed Nov 13 09:19:36 2013 -0600
Committer: Bill Slacum <uj...@apache.org>
Committed: Mon Nov 18 13:16:18 2013 -0500

----------------------------------------------------------------------
 .../apache/accumulo/core/client/Instance.java   |   7 ++
 .../accumulo/core/client/ZooKeeperInstance.java | 109 +++++++++++++------
 .../core/client/impl/ThriftTransportPool.java   |  16 ++-
 .../accumulo/core/client/mock/MockInstance.java |   5 +
 .../apache/accumulo/core/util/ThriftUtil.java   |   4 +
 .../accumulo/core/zookeeper/ZooCache.java       |   7 ++
 .../accumulo/core/zookeeper/ZooReader.java      |   4 +
 .../core/client/impl/TabletLocatorImplTest.java |   5 +
 .../accumulo/server/client/HdfsZooInstance.java |   9 ++
 9 files changed, 128 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/79d686fa/src/core/src/main/java/org/apache/accumulo/core/client/Instance.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/Instance.java b/src/core/src/main/java/org/apache/accumulo/core/client/Instance.java
index b3d09ba..1820e7a 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/client/Instance.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/client/Instance.java
@@ -126,6 +126,13 @@ public interface Instance {
    *           when a user's credentials are invalid
    */
   public abstract Connector getConnector(String user, CharSequence pass) throws AccumuloException, AccumuloSecurityException;
+
+  /**
+   * Closes up the instance to free up all associated resources. You should try to reuse an Instance as much as you can because there is some location caching
+   * stored which will enhance performance.
+   * @throws AccumuloException 
+   */
+  public abstract void close() throws AccumuloException;
   
   /**
    * Returns the AccumuloConfiguration to use when interacting with this instance.

http://git-wip-us.apache.org/repos/asf/accumulo/blob/79d686fa/src/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java b/src/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
index f657c07..1dae711 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.impl.ConnectorImpl;
@@ -33,6 +34,7 @@ import org.apache.accumulo.core.util.ByteBufferUtil;
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.util.OpTimer;
 import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.core.util.ThriftUtil;
 import org.apache.accumulo.core.zookeeper.ZooCache;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.hadoop.fs.FileStatus;
@@ -57,18 +59,20 @@ import org.apache.log4j.Logger;
  */
 
 public class ZooKeeperInstance implements Instance {
-  
+
   private static final Logger log = Logger.getLogger(ZooKeeperInstance.class);
-  
+
   private String instanceId = null;
   private String instanceName = null;
-  
+
   private ZooCache zooCache;
-  
+
   private String zooKeepers;
-  
+
   private int zooKeepersSessionTimeOut;
-  
+
+  private volatile boolean closed = false;
+
   /**
    * 
    * @param instanceName
@@ -76,11 +80,11 @@ public class ZooKeeperInstance implements Instance {
    * @param zooKeepers
    *          A comma separated list of zoo keeper server locations. Each location can contain an optional port, of the format host:port.
    */
-  
+
   public ZooKeeperInstance(String instanceName, String zooKeepers) {
     this(instanceName, zooKeepers, (int) AccumuloConfiguration.getDefaultConfiguration().getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT));
   }
-  
+
   /**
    * 
    * @param instanceName
@@ -90,7 +94,7 @@ public class ZooKeeperInstance implements Instance {
    * @param sessionTimeout
    *          zoo keeper session time out in milliseconds.
    */
-  
+
   public ZooKeeperInstance(String instanceName, String zooKeepers, int sessionTimeout) {
     ArgumentChecker.notNull(instanceName, zooKeepers);
     this.instanceName = instanceName;
@@ -98,8 +102,9 @@ public class ZooKeeperInstance implements Instance {
     this.zooKeepersSessionTimeOut = sessionTimeout;
     zooCache = ZooCache.getInstance(zooKeepers, sessionTimeout);
     getInstanceID();
+    clientInstances.incrementAndGet();
   }
-  
+
   /**
    * 
    * @param instanceId
@@ -107,11 +112,11 @@ public class ZooKeeperInstance implements Instance {
    * @param zooKeepers
    *          A comma separated list of zoo keeper server locations. Each location can contain an optional port, of the format host:port.
    */
-  
+
   public ZooKeeperInstance(UUID instanceId, String zooKeepers) {
     this(instanceId, zooKeepers, (int) AccumuloConfiguration.getDefaultConfiguration().getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT));
   }
-  
+
   /**
    * 
    * @param instanceId
@@ -121,17 +126,20 @@ public class ZooKeeperInstance implements Instance {
    * @param sessionTimeout
    *          zoo keeper session time out in milliseconds.
    */
-  
+
   public ZooKeeperInstance(UUID instanceId, String zooKeepers, int sessionTimeout) {
     ArgumentChecker.notNull(instanceId, zooKeepers);
     this.instanceId = instanceId.toString();
     this.zooKeepers = zooKeepers;
     this.zooKeepersSessionTimeOut = sessionTimeout;
     zooCache = ZooCache.getInstance(zooKeepers, sessionTimeout);
+    clientInstances.incrementAndGet();
   }
-  
+
   @Override
   public String getInstanceID() {
+    if (closed)
+      throw new RuntimeException("ZooKeeperInstance has been closed.");
     if (instanceId == null) {
       // want the instance id to be stable for the life of this instance object,
       // so only get it once
@@ -143,95 +151,103 @@ public class ZooKeeperInstance implements Instance {
       }
       instanceId = new String(iidb);
     }
-    
+
     if (zooCache.get(Constants.ZROOT + "/" + instanceId) == null) {
       if (instanceName == null)
         throw new RuntimeException("Instance id " + instanceId + " does not exist in zookeeper");
       throw new RuntimeException("Instance id " + instanceId + " pointed to by the name " + instanceName + " does not exist in zookeeper");
     }
-    
+
     return instanceId;
   }
-  
+
   @Override
   public List<String> getMasterLocations() {
+    if (closed)
+      throw new RuntimeException("ZooKeeperInstance has been closed.");
     String masterLocPath = ZooUtil.getRoot(this) + Constants.ZMASTER_LOCK;
-    
+
     OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Looking up master location in zoocache.");
     byte[] loc = ZooUtil.getLockData(zooCache, masterLocPath);
     opTimer.stop("Found master at " + (loc == null ? null : new String(loc)) + " in %DURATION%");
-    
+
     if (loc == null) {
       return Collections.emptyList();
     }
-    
+
     return Collections.singletonList(new String(loc));
   }
-  
+
   @Override
   public String getRootTabletLocation() {
+    if (closed)
+      throw new RuntimeException("ZooKeeperInstance has been closed.");
     String zRootLocPath = ZooUtil.getRoot(this) + Constants.ZROOT_TABLET_LOCATION;
-    
+
     OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Looking up root tablet location in zookeeper.");
     byte[] loc = zooCache.get(zRootLocPath);
     opTimer.stop("Found root tablet at " + (loc == null ? null : new String(loc)) + " in %DURATION%");
-    
+
     if (loc == null) {
       return null;
     }
-    
+
     return new String(loc).split("\\|")[0];
   }
-  
+
   @Override
   public String getInstanceName() {
+    if (closed)
+      throw new RuntimeException("ZooKeeperInstance has been closed.");
     if (instanceName == null)
       instanceName = lookupInstanceName(zooCache, UUID.fromString(getInstanceID()));
-    
+
     return instanceName;
   }
-  
+
   @Override
   public String getZooKeepers() {
     return zooKeepers;
   }
-  
+
   @Override
   public int getZooKeepersSessionTimeOut() {
     return zooKeepersSessionTimeOut;
   }
-  
+
   @Override
   public Connector getConnector(String user, CharSequence pass) throws AccumuloException, AccumuloSecurityException {
     return getConnector(user, TextUtil.getBytes(new Text(pass.toString())));
   }
-  
+
   @Override
   public Connector getConnector(String user, ByteBuffer pass) throws AccumuloException, AccumuloSecurityException {
     return getConnector(user, ByteBufferUtil.toBytes(pass));
   }
-  
+
   // Suppress deprecation, ConnectorImpl is deprecated to warn clients against using.
   @SuppressWarnings("deprecation")
   @Override
   public Connector getConnector(String user, byte[] pass) throws AccumuloException, AccumuloSecurityException {
+    if (closed)
+      throw new RuntimeException("ZooKeeperInstance has been closed.");
     return new ConnectorImpl(this, user, pass);
   }
-  
+
   private AccumuloConfiguration conf = null;
-  
+
   @Override
   public AccumuloConfiguration getConfiguration() {
     if (conf == null)
       conf = AccumuloConfiguration.getDefaultConfiguration();
     return conf;
   }
-  
+
   @Override
   public void setConfiguration(AccumuloConfiguration conf) {
     this.conf = conf;
   }
-  
+
   /**
    * Given a zooCache and instanceId, look up the instance name.
    * 
@@ -277,4 +293,27 @@ public class ZooKeeperInstance implements Instance {
   public Connector getConnector(AuthInfo auth) throws AccumuloException, AccumuloSecurityException {
     return getConnector(auth.user, auth.password);
   }
+
+  static private final AtomicInteger clientInstances = new AtomicInteger(0);
+
+  @Override
+  public synchronized void close() throws AccumuloException {
+    if (!closed && clientInstances.decrementAndGet() == 0) {
+      try {
+        zooCache.close();
+        ThriftUtil.close();
+      } catch (InterruptedException e) {
+        clientInstances.incrementAndGet();
+        throw new AccumuloException("Issues closing ZooKeeper.");
+      }
+      closed = true;
+    }
+  }
+
+  @Override
+  public void finalize() {
+    // This method intentionally left blank. Users need to explicitly close Instances if they want things cleaned up nicely.
+    if (!closed)
+      log.warn("ZooKeeperInstance being cleaned up without being closed. Please remember to call close() before dereferencing to clean up threads.");
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/79d686fa/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java b/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
index ef3724b..f969f28 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
@@ -80,13 +80,15 @@ public class ThriftTransportPool {
   
   private static class Closer implements Runnable {
     ThriftTransportPool pool;
+    final AtomicBoolean stop;
     
-    public Closer(ThriftTransportPool pool) {
+    public Closer(ThriftTransportPool pool, AtomicBoolean stop) {
       this.pool = pool;
+      this.stop = stop;
     }
     
     public void run() {
-      while (true) {
+      while (!stop.get()) {
         
         ArrayList<CachedConnection> connectionsToClose = new ArrayList<CachedConnection>();
         
@@ -592,6 +594,7 @@ public class ThriftTransportPool {
 
   private static ThriftTransportPool instance = new ThriftTransportPool();
   private static final AtomicBoolean daemonStarted = new AtomicBoolean(false);
+  private static AtomicBoolean stopDaemon;
   
   public static ThriftTransportPool getInstance() {
     SecurityManager sm = System.getSecurityManager();
@@ -600,8 +603,15 @@ public class ThriftTransportPool {
     }
     
     if (daemonStarted.compareAndSet(false, true)) {
-      new Daemon(new Closer(instance), "Thrift Connection Pool Checker").start();
+      stopDaemon = new AtomicBoolean(false);
+      new Daemon(new Closer(instance, stopDaemon), "Thrift Connection Pool Checker").start();
     }
     return instance;
   }
+  
+  public static void close() {
+    if (daemonStarted.compareAndSet(true, false)) {
+      stopDaemon.set(true);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/79d686fa/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java b/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java
index 2ff7b82..d8a15e0 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java
@@ -140,4 +140,9 @@ public class MockInstance implements Instance {
   public Connector getConnector(AuthInfo auth) throws AccumuloException, AccumuloSecurityException {
     return getConnector(auth.user, auth.password);
   }
+
+  @Override
+  public void close() throws AccumuloException {
+    // NOOP
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/79d686fa/src/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java b/src/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
index 1b1cdd7..3684ecd 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
@@ -165,4 +165,8 @@ public class ThriftUtil {
   public static TProtocolFactory protocolFactory() {
     return protocolFactory;
   }
+  
+  public static void close() {
+    ThriftTransportPool.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/79d686fa/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooCache.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooCache.java b/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooCache.java
index f5bdd6b..0a36923 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooCache.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooCache.java
@@ -307,4 +307,11 @@ public class ZooCache {
     
     return zc;
   }
+  
+  public void close() throws InterruptedException {
+    cache.clear();
+    statCache.clear();
+    childrenCache.clear();
+    zReader.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/79d686fa/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooReader.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooReader.java b/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooReader.java
index 47663ac..1bcd22b 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooReader.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooReader.java
@@ -107,4 +107,8 @@ public class ZooReader implements IZooReader {
   public ZooReader(Instance instance) {
     this(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut());
   }
+
+  public void close() throws InterruptedException {
+    getZooKeeper().close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/79d686fa/src/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java
----------------------------------------------------------------------
diff --git a/src/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java b/src/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java
index 538cb6c..624a824 100644
--- a/src/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java
+++ b/src/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java
@@ -448,6 +448,11 @@ public class TabletLocatorImplTest extends TestCase {
     public Connector getConnector(AuthInfo auth) throws AccumuloException, AccumuloSecurityException {
       return getConnector(auth.user, auth.password);
     }
+    
+    @Override
+    public void close() throws AccumuloException {
+      // NOOP
+    }
   }
   
   static class TServers {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/79d686fa/src/server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
----------------------------------------------------------------------
diff --git a/src/server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java b/src/server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
index e6cdb63..d68449d 100644
--- a/src/server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
+++ b/src/server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
@@ -177,6 +177,15 @@ public class HdfsZooInstance implements Instance {
     System.out.println("ZooKeepers: " + instance.getZooKeepers());
     System.out.println("Masters: " + StringUtil.join(instance.getMasterLocations(), ", "));
   }
+
+  @Override
+  public void close() throws AccumuloException {
+    try {
+      zooCache.close();
+    } catch (InterruptedException e) {
+      throw new AccumuloException("Issues closing ZooKeeper, try again");
+    }
+  }
   
   @Override
   public Connector getConnector(AuthInfo auth) throws AccumuloException, AccumuloSecurityException {