You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by uj...@apache.org on 2013/11/18 19:18:15 UTC
git commit: ACCUMULO-1858 Backport ZooKeeper clean up to 1.4 and 1.5.
Updated Branches:
refs/heads/1.4.5-SNAPSHOT a40a6d423 -> 79d686faa
ACCUMULO-1858 Backport ZooKeeper clean up to 1.4 and 1.5.
Fix cherry picks two commits:
ACCUMULO-1379 - Adding close() to Instance to assist in freeing up resources
(cherry picked from commit 7da1164d87227960d3e0cfc841f753067e2c0304)
Reason: bugfix
Author: John Vines <jv...@gmail.com>
Differs from original by path changes and leaving out ConditionalWriterTest, which is only in 1.6.0+
----
ACCUMULO-1379 Fix edge cases if error in closing ZooKeeperInstance
(cherry picked from commit 3f6c66ede52cb1fb5a122d7bad06d7978ff0a671)
Reason: bugfix
Author: Christopher Tubbs <ct...@apache.org>
Signed-off-by: Bill Slacum <uj...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/79d686fa
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/79d686fa
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/79d686fa
Branch: refs/heads/1.4.5-SNAPSHOT
Commit: 79d686faa1e477b9cbd80c6f833ece402050b490
Parents: a40a6d4
Author: Sean Busbey <bu...@clouderagovt.com>
Authored: Wed Nov 13 09:19:36 2013 -0600
Committer: Bill Slacum <uj...@apache.org>
Committed: Mon Nov 18 13:16:18 2013 -0500
----------------------------------------------------------------------
.../apache/accumulo/core/client/Instance.java | 7 ++
.../accumulo/core/client/ZooKeeperInstance.java | 109 +++++++++++++------
.../core/client/impl/ThriftTransportPool.java | 16 ++-
.../accumulo/core/client/mock/MockInstance.java | 5 +
.../apache/accumulo/core/util/ThriftUtil.java | 4 +
.../accumulo/core/zookeeper/ZooCache.java | 7 ++
.../accumulo/core/zookeeper/ZooReader.java | 4 +
.../core/client/impl/TabletLocatorImplTest.java | 5 +
.../accumulo/server/client/HdfsZooInstance.java | 9 ++
9 files changed, 128 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/79d686fa/src/core/src/main/java/org/apache/accumulo/core/client/Instance.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/Instance.java b/src/core/src/main/java/org/apache/accumulo/core/client/Instance.java
index b3d09ba..1820e7a 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/client/Instance.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/client/Instance.java
@@ -126,6 +126,13 @@ public interface Instance {
* when a user's credentials are invalid
*/
public abstract Connector getConnector(String user, CharSequence pass) throws AccumuloException, AccumuloSecurityException;
+
+ /**
+ * Closes up the instance to free up all associated resources. You should try to reuse an Instance as much as you can because there is some location caching
+ * stored which will enhance performance.
+ * @throws AccumuloException
+ */
+ public abstract void close() throws AccumuloException;
/**
* Returns the AccumuloConfiguration to use when interacting with this instance.
http://git-wip-us.apache.org/repos/asf/accumulo/blob/79d686fa/src/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java b/src/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
index f657c07..1dae711 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.impl.ConnectorImpl;
@@ -33,6 +34,7 @@ import org.apache.accumulo.core.util.ByteBufferUtil;
import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.util.OpTimer;
import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.core.util.ThriftUtil;
import org.apache.accumulo.core.zookeeper.ZooCache;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.hadoop.fs.FileStatus;
@@ -57,18 +59,20 @@ import org.apache.log4j.Logger;
*/
public class ZooKeeperInstance implements Instance {
-
+
private static final Logger log = Logger.getLogger(ZooKeeperInstance.class);
-
+
private String instanceId = null;
private String instanceName = null;
-
+
private ZooCache zooCache;
-
+
private String zooKeepers;
-
+
private int zooKeepersSessionTimeOut;
-
+
+ private volatile boolean closed = false;
+
/**
*
* @param instanceName
@@ -76,11 +80,11 @@ public class ZooKeeperInstance implements Instance {
* @param zooKeepers
* A comma separated list of zoo keeper server locations. Each location can contain an optional port, of the format host:port.
*/
-
+
public ZooKeeperInstance(String instanceName, String zooKeepers) {
this(instanceName, zooKeepers, (int) AccumuloConfiguration.getDefaultConfiguration().getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT));
}
-
+
/**
*
* @param instanceName
@@ -90,7 +94,7 @@ public class ZooKeeperInstance implements Instance {
* @param sessionTimeout
* zoo keeper session time out in milliseconds.
*/
-
+
public ZooKeeperInstance(String instanceName, String zooKeepers, int sessionTimeout) {
ArgumentChecker.notNull(instanceName, zooKeepers);
this.instanceName = instanceName;
@@ -98,8 +102,9 @@ public class ZooKeeperInstance implements Instance {
this.zooKeepersSessionTimeOut = sessionTimeout;
zooCache = ZooCache.getInstance(zooKeepers, sessionTimeout);
getInstanceID();
+ clientInstances.incrementAndGet();
}
-
+
/**
*
* @param instanceId
@@ -107,11 +112,11 @@ public class ZooKeeperInstance implements Instance {
* @param zooKeepers
* A comma separated list of zoo keeper server locations. Each location can contain an optional port, of the format host:port.
*/
-
+
public ZooKeeperInstance(UUID instanceId, String zooKeepers) {
this(instanceId, zooKeepers, (int) AccumuloConfiguration.getDefaultConfiguration().getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT));
}
-
+
/**
*
* @param instanceId
@@ -121,17 +126,20 @@ public class ZooKeeperInstance implements Instance {
* @param sessionTimeout
* zoo keeper session time out in milliseconds.
*/
-
+
public ZooKeeperInstance(UUID instanceId, String zooKeepers, int sessionTimeout) {
ArgumentChecker.notNull(instanceId, zooKeepers);
this.instanceId = instanceId.toString();
this.zooKeepers = zooKeepers;
this.zooKeepersSessionTimeOut = sessionTimeout;
zooCache = ZooCache.getInstance(zooKeepers, sessionTimeout);
+ clientInstances.incrementAndGet();
}
-
+
@Override
public String getInstanceID() {
+ if (closed)
+ throw new RuntimeException("ZooKeeperInstance has been closed.");
if (instanceId == null) {
// want the instance id to be stable for the life of this instance object,
// so only get it once
@@ -143,95 +151,103 @@ public class ZooKeeperInstance implements Instance {
}
instanceId = new String(iidb);
}
-
+
if (zooCache.get(Constants.ZROOT + "/" + instanceId) == null) {
if (instanceName == null)
throw new RuntimeException("Instance id " + instanceId + " does not exist in zookeeper");
throw new RuntimeException("Instance id " + instanceId + " pointed to by the name " + instanceName + " does not exist in zookeeper");
}
-
+
return instanceId;
}
-
+
@Override
public List<String> getMasterLocations() {
+ if (closed)
+ throw new RuntimeException("ZooKeeperInstance has been closed.");
String masterLocPath = ZooUtil.getRoot(this) + Constants.ZMASTER_LOCK;
-
+
OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Looking up master location in zoocache.");
byte[] loc = ZooUtil.getLockData(zooCache, masterLocPath);
opTimer.stop("Found master at " + (loc == null ? null : new String(loc)) + " in %DURATION%");
-
+
if (loc == null) {
return Collections.emptyList();
}
-
+
return Collections.singletonList(new String(loc));
}
-
+
@Override
public String getRootTabletLocation() {
+ if (closed)
+ throw new RuntimeException("ZooKeeperInstance has been closed.");
String zRootLocPath = ZooUtil.getRoot(this) + Constants.ZROOT_TABLET_LOCATION;
-
+
OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Looking up root tablet location in zookeeper.");
byte[] loc = zooCache.get(zRootLocPath);
opTimer.stop("Found root tablet at " + (loc == null ? null : new String(loc)) + " in %DURATION%");
-
+
if (loc == null) {
return null;
}
-
+
return new String(loc).split("\\|")[0];
}
-
+
@Override
public String getInstanceName() {
+ if (closed)
+ throw new RuntimeException("ZooKeeperInstance has been closed.");
if (instanceName == null)
instanceName = lookupInstanceName(zooCache, UUID.fromString(getInstanceID()));
-
+
return instanceName;
}
-
+
@Override
public String getZooKeepers() {
return zooKeepers;
}
-
+
@Override
public int getZooKeepersSessionTimeOut() {
return zooKeepersSessionTimeOut;
}
-
+
@Override
public Connector getConnector(String user, CharSequence pass) throws AccumuloException, AccumuloSecurityException {
return getConnector(user, TextUtil.getBytes(new Text(pass.toString())));
}
-
+
@Override
public Connector getConnector(String user, ByteBuffer pass) throws AccumuloException, AccumuloSecurityException {
return getConnector(user, ByteBufferUtil.toBytes(pass));
}
-
+
// Suppress deprecation, ConnectorImpl is deprecated to warn clients against using.
@SuppressWarnings("deprecation")
@Override
public Connector getConnector(String user, byte[] pass) throws AccumuloException, AccumuloSecurityException {
+ if (closed)
+ throw new RuntimeException("ZooKeeperInstance has been closed.");
return new ConnectorImpl(this, user, pass);
}
-
+
private AccumuloConfiguration conf = null;
-
+
@Override
public AccumuloConfiguration getConfiguration() {
if (conf == null)
conf = AccumuloConfiguration.getDefaultConfiguration();
return conf;
}
-
+
@Override
public void setConfiguration(AccumuloConfiguration conf) {
this.conf = conf;
}
-
+
/**
* Given a zooCache and instanceId, look up the instance name.
*
@@ -277,4 +293,27 @@ public class ZooKeeperInstance implements Instance {
public Connector getConnector(AuthInfo auth) throws AccumuloException, AccumuloSecurityException {
return getConnector(auth.user, auth.password);
}
+
+ static private final AtomicInteger clientInstances = new AtomicInteger(0);
+
+ @Override
+ public synchronized void close() throws AccumuloException {
+ if (!closed && clientInstances.decrementAndGet() == 0) {
+ try {
+ zooCache.close();
+ ThriftUtil.close();
+ } catch (InterruptedException e) {
+ clientInstances.incrementAndGet();
+ throw new AccumuloException("Issues closing ZooKeeper.");
+ }
+ closed = true;
+ }
+ }
+
+ @Override
+ public void finalize() {
+ // This method intentionally left blank. Users need to explicitly close Instances if they want things cleaned up nicely.
+ if (!closed)
+ log.warn("ZooKeeperInstance being cleaned up without being closed. Please remember to call close() before dereferencing to clean up threads.");
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/79d686fa/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java b/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
index ef3724b..f969f28 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
@@ -80,13 +80,15 @@ public class ThriftTransportPool {
private static class Closer implements Runnable {
ThriftTransportPool pool;
+ final AtomicBoolean stop;
- public Closer(ThriftTransportPool pool) {
+ public Closer(ThriftTransportPool pool, AtomicBoolean stop) {
this.pool = pool;
+ this.stop = stop;
}
public void run() {
- while (true) {
+ while (!stop.get()) {
ArrayList<CachedConnection> connectionsToClose = new ArrayList<CachedConnection>();
@@ -592,6 +594,7 @@ public class ThriftTransportPool {
private static ThriftTransportPool instance = new ThriftTransportPool();
private static final AtomicBoolean daemonStarted = new AtomicBoolean(false);
+ private static AtomicBoolean stopDaemon;
public static ThriftTransportPool getInstance() {
SecurityManager sm = System.getSecurityManager();
@@ -600,8 +603,15 @@ public class ThriftTransportPool {
}
if (daemonStarted.compareAndSet(false, true)) {
- new Daemon(new Closer(instance), "Thrift Connection Pool Checker").start();
+ stopDaemon = new AtomicBoolean(false);
+ new Daemon(new Closer(instance, stopDaemon), "Thrift Connection Pool Checker").start();
}
return instance;
}
+
+ public static void close() {
+ if (daemonStarted.compareAndSet(true, false)) {
+ stopDaemon.set(true);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/79d686fa/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java b/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java
index 2ff7b82..d8a15e0 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java
@@ -140,4 +140,9 @@ public class MockInstance implements Instance {
public Connector getConnector(AuthInfo auth) throws AccumuloException, AccumuloSecurityException {
return getConnector(auth.user, auth.password);
}
+
+ @Override
+ public void close() throws AccumuloException {
+ // NOOP
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/79d686fa/src/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java b/src/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
index 1b1cdd7..3684ecd 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
@@ -165,4 +165,8 @@ public class ThriftUtil {
public static TProtocolFactory protocolFactory() {
return protocolFactory;
}
+
+ public static void close() {
+ ThriftTransportPool.close();
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/79d686fa/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooCache.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooCache.java b/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooCache.java
index f5bdd6b..0a36923 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooCache.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooCache.java
@@ -307,4 +307,11 @@ public class ZooCache {
return zc;
}
+
+ public void close() throws InterruptedException {
+ cache.clear();
+ statCache.clear();
+ childrenCache.clear();
+ zReader.close();
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/79d686fa/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooReader.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooReader.java b/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooReader.java
index 47663ac..1bcd22b 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooReader.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooReader.java
@@ -107,4 +107,8 @@ public class ZooReader implements IZooReader {
public ZooReader(Instance instance) {
this(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut());
}
+
+ public void close() throws InterruptedException {
+ getZooKeeper().close();
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/79d686fa/src/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java
----------------------------------------------------------------------
diff --git a/src/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java b/src/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java
index 538cb6c..624a824 100644
--- a/src/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java
+++ b/src/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java
@@ -448,6 +448,11 @@ public class TabletLocatorImplTest extends TestCase {
public Connector getConnector(AuthInfo auth) throws AccumuloException, AccumuloSecurityException {
return getConnector(auth.user, auth.password);
}
+
+ @Override
+ public void close() throws AccumuloException {
+ // NOOP
+ }
}
static class TServers {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/79d686fa/src/server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
----------------------------------------------------------------------
diff --git a/src/server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java b/src/server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
index e6cdb63..d68449d 100644
--- a/src/server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
+++ b/src/server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
@@ -177,6 +177,15 @@ public class HdfsZooInstance implements Instance {
System.out.println("ZooKeepers: " + instance.getZooKeepers());
System.out.println("Masters: " + StringUtil.join(instance.getMasterLocations(), ", "));
}
+
+ @Override
+ public void close() throws AccumuloException {
+ try {
+ zooCache.close();
+ } catch (InterruptedException e) {
+ throw new AccumuloException("Issues closing ZooKeeper, try again");
+ }
+ }
@Override
public Connector getConnector(AuthInfo auth) throws AccumuloException, AccumuloSecurityException {