You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by "Sean Busbey (JIRA)" <ji...@apache.org> on 2014/01/03 17:06:51 UTC
[jira] [Resolved] (ACCUMULO-2027) ZooKeeperInstance.close() not
freeing resources in multithreaded env
[ https://issues.apache.org/jira/browse/ACCUMULO-2027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sean Busbey resolved ACCUMULO-2027.
-----------------------------------
Resolution: Won't Fix
should be fixed by the reverts in ACCUMULO-2128
> ZooKeeperInstance.close() not freeing resources in multithreaded env
> --------------------------------------------------------------------
>
> Key: ACCUMULO-2027
> URL: https://issues.apache.org/jira/browse/ACCUMULO-2027
> Project: Accumulo
> Issue Type: Bug
> Reporter: Keith Turner
> Assignee: William Slacum
> Priority: Critical
> Fix For: 1.4.5, 1.5.1, 1.6.0
>
>
> While looking at the changes related to ZooKeeperInstance.close() in the 1.4.5-SNAPSHOT branch I noticed there were race conditions where resources were not properly released. One type of race condition is where a thread is between a closed check in ZooKeeperInstance and calling a ZooCache method when ZooKeeperInstance.close() is called. The following is an example situation
> # Thread 1 uses ZooKeeperInstance1 to get a zoocache.
> # Thread 2 calls close() on ZooKeeperInstnce1 which calls close() on zoocache
> # Thread 1 uses the zoocache it has reference to, causing a new zookeeper connection to be created.
> Below is an example program that will trigger this behavior. For me this little example program reliably shows a connected zookeeper after all of the threads die. If I use 0 threads it will show a closed zookeeper connection at the end.
> {code:java}
> static class WriteTask implements Runnable {
> private BatchWriter writer;
> private Random rand;
> WriteTask(Connector conn) throws TableNotFoundException {
> rand = new Random();
> writer = conn.createBatchWriter("foo5", 10000000, 30000, 1);
> }
> @Override
> public void run() {
> try {
> while (true) {
> Mutation m1 = new Mutation(String.format("%06d", rand.nextInt(1000000)));
> m1.put(String.format("%06d", rand.nextInt(100)), String.format("%06d", rand.nextInt(100)), String.format("%06d", rand.nextInt(1000000)));
> writer.addMutation(m1);
> writer.flush();
> }
> } catch (Exception e) {
> System.out.println(e.getMessage());
> }
> }
> }
> static class ReadTask implements Runnable {
> private Scanner scanner;
> ReadTask(Connector conn) throws TableNotFoundException {
> scanner = conn.createScanner("foo5", new Authorizations());
> }
> @Override
> public void run() {
> try {
> while (true) {
> for (Entry<Key,Value> entry : scanner) {
> }
> }
> } catch (Exception e) {
> System.out.println(e.getMessage());
> }
> }
> }
> @Test(timeout = 30000)
> public void test2() throws Exception {
> ZooKeeperInstance zki = new ZooKeeperInstance(accumulo.getInstanceName(), accumulo.getZooKeepers());
> Connector conn = zki.getConnector("root", "superSecret");
> conn.tableOperations().create("foo5");
> ArrayList<Thread> threads = new ArrayList<Thread>();
> int numThreads = 10;
> for (int i = 0; i < numThreads; i++) {
> Thread t = new Thread(new WriteTask(conn));
> t.start();
> threads.add(t);
> }
> for (int i = 0; i < numThreads; i++) {
> Thread t = new Thread(new ReadTask(conn));
> t.start();
> threads.add(t);
> }
> // let threads get spun up
> Thread.sleep(1000);
> ZooSession.printSessions();
> zki.close();
> // wait for the threads to die
> for (Thread thread : threads) {
> thread.join();
> }
> ZooSession.printSessions();
> }
> {code}
> Below are some changes I made to ZooSession for debugging purposes.
> {noformat}
> diff --git a/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java b/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java
> index b3db26f..475a21d 100644
> --- a/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java
> +++ b/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java
> @@ -20,6 +20,8 @@
> import java.net.UnknownHostException;
> import java.util.HashMap;
> import java.util.Map;
> +import java.util.Map.Entry;
> +import java.util.Set;
>
> import org.apache.accumulo.core.util.UtilWaitThread;
> import org.apache.log4j.Logger;
> @@ -29,7 +31,7 @@
> import org.apache.zookeeper.ZooKeeper;
> import org.apache.zookeeper.ZooKeeper.States;
>
> -class ZooSession {
> +public class ZooSession {
>
> private static final Logger log = Logger.getLogger(ZooSession.class);
>
> @@ -121,6 +123,8 @@
>
> ZooSessionInfo zsi = sessions.get(sessionKey);
> if (zsi != null && zsi.zooKeeper.getState() == States.CLOSED) {
> + System.out.println("Removing closed session ");
> + new Exception().printStackTrace();
> if (auth != null && sessions.get(readOnlySessionKey) == zsi)
> sessions.remove(readOnlySessionKey);
> zsi = null;
> @@ -137,4 +141,13 @@
> }
> return zsi.zooKeeper;
> }
> +
> + public static synchronized void printSessions() {
> + Set<Entry<String,ZooSessionInfo>> es = sessions.entrySet();
> +
> + for (Entry<String,ZooSessionInfo> entry : es) {
> + System.out.println(entry.getKey() + " " + entry.getValue().zooKeeper.getState());
> + }
> + }
> +
> }
> {noformat}
> With the above changes I will see an exception like the following when one of the race conditions occurs.
> {noformat}
> Removing closed session
> java.lang.Exception
> at org.apache.accumulo.core.zookeeper.ZooSession.getSession(ZooSession.java:127)
> at org.apache.accumulo.core.zookeeper.ZooReader.getSession(ZooReader.java:37)
> at org.apache.accumulo.core.zookeeper.ZooReader.getZooKeeper(ZooReader.java:41)
> at org.apache.accumulo.core.zookeeper.ZooCache.getZooKeeper(ZooCache.java:56)
> at org.apache.accumulo.core.zookeeper.ZooCache.retry(ZooCache.java:127)
> at org.apache.accumulo.core.zookeeper.ZooCache.get(ZooCache.java:233)
> at org.apache.accumulo.core.zookeeper.ZooCache.get(ZooCache.java:188)
> at org.apache.accumulo.core.client.ZooKeeperInstance.getInstanceID(ZooKeeperInstance.java:156)
> at org.apache.accumulo.core.client.impl.TabletLocator.getInstance(TabletLocator.java:96)
> at org.apache.accumulo.core.client.impl.ThriftScanner.scan(ThriftScanner.java:245)
> at org.apache.accumulo.core.client.impl.ScannerIterator$Reader.run(ScannerIterator.java:94)
> at org.apache.accumulo.core.client.impl.ScannerIterator.hasNext(ScannerIterator.java:176)
> at org.apache.accumulo.minicluster.MiniAccumuloClusterTest$ReadTask.run(MiniAccumuloClusterTest.java:109)
> at java.lang.Thread.run(Thread.java:662)
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.1.5#6160)