You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/07/29 02:16:09 UTC

[1/3] git commit: CURATOR-126: Fix race condition in CuratorFrameworkImpl.close()

Repository: curator
Updated Branches:
  refs/heads/master 15a0aacec -> ccac7baac


CURATOR-126: Fix race condition in CuratorFrameworkImpl.close()


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/785e9f6c
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/785e9f6c
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/785e9f6c

Branch: refs/heads/master
Commit: 785e9f6c8a528d0c07652450471dcb71a5717776
Parents: 15a0aac
Author: Scott Blum <sc...@squareup.com>
Authored: Mon Jul 28 14:10:37 2014 -0400
Committer: Scott Blum <sc...@squareup.com>
Committed: Mon Jul 28 17:13:17 2014 -0400

----------------------------------------------------------------------
 .../framework/CuratorFrameworkFactory.java      | 17 +++++++++++++++
 .../framework/imps/CuratorFrameworkImpl.java    | 22 +++++++++++++++-----
 2 files changed, 34 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/785e9f6c/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
index 2d21fb7..8ef2580 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
@@ -52,6 +52,7 @@ public class CuratorFrameworkFactory
     private static final DefaultZookeeperFactory    DEFAULT_ZOOKEEPER_FACTORY = new DefaultZookeeperFactory();
     private static final DefaultACLProvider         DEFAULT_ACL_PROVIDER = new DefaultACLProvider();
     private static final long                       DEFAULT_INACTIVE_THRESHOLD_MS = (int)TimeUnit.MINUTES.toMillis(3);
+    private static final int                        DEFAULT_CLOSE_WAIT_MS = (int)TimeUnit.SECONDS.toMillis(1);
 
     /**
      * Return a new builder that builds a CuratorFramework
@@ -101,6 +102,7 @@ public class CuratorFrameworkFactory
         private EnsembleProvider    ensembleProvider;
         private int                 sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT_MS;
         private int                 connectionTimeoutMs = DEFAULT_CONNECTION_TIMEOUT_MS;
+        private int                 maxCloseWaitMs = DEFAULT_CLOSE_WAIT_MS;
         private RetryPolicy         retryPolicy;
         private ThreadFactory       threadFactory = null;
         private String              namespace;
@@ -239,6 +241,16 @@ public class CuratorFrameworkFactory
         }
 
         /**
+         * @param maxCloseWaitMs time to wait during close to join background threads
+         * @return this
+         */
+        public Builder maxCloseWaitMs(int maxCloseWaitMs)
+        {
+            this.maxCloseWaitMs = maxCloseWaitMs;
+            return this;
+        }
+
+        /**
          * @param retryPolicy retry policy to use
          * @return this
          */
@@ -336,6 +348,11 @@ public class CuratorFrameworkFactory
             return connectionTimeoutMs;
         }
 
+        public int getMaxCloseWaitMs()
+        {
+            return maxCloseWaitMs;
+        }
+
         public RetryPolicy getRetryPolicy()
         {
             return retryPolicy;

http://git-wip-us.apache.org/repos/asf/curator/blob/785e9f6c/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 23a3248..7f7cc98 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -63,6 +63,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
     private final ListenerContainer<CuratorListener> listeners;
     private final ListenerContainer<UnhandledErrorListener> unhandledErrorListeners;
     private final ThreadFactory threadFactory;
+    private final int maxCloseWaitMs;
     private final BlockingQueue<OperationAndData<?>> backgroundOperations;
     private final NamespaceImpl namespace;
     private final ConnectionStateManager connectionStateManager;
@@ -127,6 +128,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
         backgroundOperations = new DelayQueue<OperationAndData<?>>();
         namespace = new NamespaceImpl(this, builder.getNamespace());
         threadFactory = getThreadFactory(builder);
+        maxCloseWaitMs = builder.getMaxCloseWaitMs();
         connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory());
         compressionProvider = builder.getCompressionProvider();
         aclProvider = builder.getAclProvider();
@@ -179,6 +181,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
         listeners = parent.listeners;
         unhandledErrorListeners = parent.unhandledErrorListeners;
         threadFactory = parent.threadFactory;
+        maxCloseWaitMs = parent.maxCloseWaitMs;
         backgroundOperations = parent.backgroundOperations;
         connectionStateManager = parent.connectionStateManager;
         defaultData = parent.defaultData;
@@ -297,15 +300,24 @@ public class CuratorFrameworkImpl implements CuratorFramework
                     }
                 });
 
+            if ( executorService != null )
+            {
+                executorService.shutdownNow();
+                try
+                {
+                    executorService.awaitTermination(maxCloseWaitMs, TimeUnit.MILLISECONDS);
+                }
+                catch ( InterruptedException e )
+                {
+                    // Interrupted while interrupting; I give up.
+                    Thread.currentThread().interrupt();
+                }
+            }
             listeners.clear();
             unhandledErrorListeners.clear();
             connectionStateManager.close();
             client.close();
             namespaceWatcherMap.close();
-            if ( executorService != null )
-            {
-                executorService.shutdownNow();
-            }
         }
     }
 
@@ -759,7 +771,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
 
     private void backgroundOperationsLoop()
     {
-        while ( !Thread.interrupted() )
+        while ( !Thread.currentThread().isInterrupted() )
         {
             OperationAndData<?> operationAndData;
             try


[2/3] git commit: CURATOR-126 - Added a unit test to reproduce this case.

Posted by ra...@apache.org.
CURATOR-126 - Added a unit test to reproduce this case.

Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/a8a3e147
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/a8a3e147
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/a8a3e147

Branch: refs/heads/master
Commit: a8a3e14755ea69aaa186a40e67173ad7b686d9e3
Parents: 785e9f6
Author: Cameron McKenzie <ca...@unico.com.au>
Authored: Tue Jul 29 09:28:14 2014 +1000
Committer: Cameron McKenzie <ca...@unico.com.au>
Committed: Tue Jul 29 09:28:14 2014 +1000

----------------------------------------------------------------------
 .../framework/imps/TestFrameworkBackground.java | 86 ++++++++++++++++++++
 1 file changed, 86 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/a8a3e147/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
index 44792d9..f9fea4f 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
@@ -20,12 +20,15 @@
 package org.apache.curator.framework.imps;
 
 import com.google.common.collect.Lists;
+
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.UnhandledErrorListener;
+import org.apache.curator.framework.imps.OperationAndData.ErrorCallback;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.RetryNTimes;
@@ -35,10 +38,12 @@ import org.apache.curator.test.Timing;
 import org.apache.zookeeper.KeeperException.Code;
 import org.testng.Assert;
 import org.testng.annotations.Test;
+
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -215,4 +220,85 @@ public class TestFrameworkBackground extends BaseClassForTests
         }
 
     }
+    
+    /**
+     * CURATOR-126
+     * Shutdown the Curator client while there are still background operations running.
+     */
+    @Test
+    public void testShutdown() throws Exception
+    {
+        final int MAX_CLOSE_WAIT_MS = 5000;
+        Timing timing = new Timing();
+        CuratorFramework client = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).sessionTimeoutMs(timing.session()).
+            connectionTimeoutMs(timing.connection()).retryPolicy(new RetryOneTime(1)).maxCloseWaitMs(MAX_CLOSE_WAIT_MS).build();
+        try
+        {
+            client.start();
+
+            BackgroundCallback callback = new BackgroundCallback()
+            {
+                @Override
+                public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+                {                
+                }
+            };
+            
+            final CountDownLatch operationReadyLatch = new CountDownLatch(1);
+            
+            //This gets called just before the operation is run.
+            ((CuratorFrameworkImpl)client).debugListener = new CuratorFrameworkImpl.DebugBackgroundListener()
+            {
+                @Override
+                public void listen(OperationAndData<?> data)
+                {
+                    operationReadyLatch.countDown();
+                    
+                    try {
+                        Thread.sleep(MAX_CLOSE_WAIT_MS / 2);
+                    } catch(InterruptedException e) {
+                    }
+                }
+            };            
+            
+            Assert.assertTrue(client.getZookeeperClient().blockUntilConnectedOrTimedOut(), "Failed to connect");
+
+            server.stop();
+            
+            BackgroundOperation<String> background = new BackgroundOperation<String>()
+            {
+
+                @Override
+                public void performBackgroundOperation(OperationAndData<String> data)
+                        throws Exception
+                {
+                }
+            };
+            
+            ErrorCallback<String> errorCallback = new ErrorCallback<String>()
+            {
+
+                @Override
+                public void retriesExhausted(
+                        OperationAndData<String> operationAndData)
+                {
+                }
+            };
+            
+            OperationAndData<String> operation = new OperationAndData<String>(background,
+                    "thedata", callback, errorCallback, null);
+            
+            ((CuratorFrameworkImpl)client).queueOperation(operation);
+            
+            operationReadyLatch.await();
+            
+            client.close();
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }        
+    }
+    
+    
 }


[3/3] git commit: Altered test to make it expose problem. Needed to add debugUnhandledErrorListener for this

Posted by ra...@apache.org.
Altered test to make it expose problem. Needed to add debugUnhandledErrorListener for this


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/ccac7baa
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/ccac7baa
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/ccac7baa

Branch: refs/heads/master
Commit: ccac7baac38b9fe3dc5b322639ea409f7fa0f2b6
Parents: a8a3e14
Author: randgalt <ra...@apache.org>
Authored: Mon Jul 28 19:00:12 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jul 28 19:00:12 2014 -0500

----------------------------------------------------------------------
 .../framework/imps/CuratorFrameworkImpl.java    |   8 +-
 .../framework/imps/TestFrameworkBackground.java | 111 ++++++++-----------
 2 files changed, 53 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/ccac7baa/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 7f7cc98..01cacee 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -57,7 +57,6 @@ import java.util.concurrent.atomic.AtomicReference;
 
 public class CuratorFrameworkImpl implements CuratorFramework
 {
-
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final CuratorZookeeperClient client;
     private final ListenerContainer<CuratorListener> listeners;
@@ -86,6 +85,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
     }
 
     volatile DebugBackgroundListener debugListener = null;
+    volatile UnhandledErrorListener debugUnhandledErrorListener = null;
 
     private final AtomicReference<CuratorFrameworkState> state;
 
@@ -313,6 +313,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
                     Thread.currentThread().interrupt();
                 }
             }
+
             listeners.clear();
             unhandledErrorListeners.clear();
             connectionStateManager.close();
@@ -566,6 +567,11 @@ public class CuratorFrameworkImpl implements CuratorFramework
                     return null;
                 }
             });
+
+        if ( debugUnhandledErrorListener != null )
+        {
+            debugUnhandledErrorListener.unhandledError(reason, e);
+        }
     }
 
     String unfixForNamespace(String path)

http://git-wip-us.apache.org/repos/asf/curator/blob/ccac7baa/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
index f9fea4f..3f1c41f 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
@@ -20,30 +20,26 @@
 package org.apache.curator.framework.imps;
 
 import com.google.common.collect.Lists;
-
-import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.UnhandledErrorListener;
-import org.apache.curator.framework.imps.OperationAndData.ErrorCallback;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.RetryNTimes;
 import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.TestingServer;
 import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
 import org.apache.zookeeper.KeeperException.Code;
 import org.testng.Assert;
 import org.testng.annotations.Test;
-
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -68,7 +64,8 @@ public class TestFrameworkBackground extends BaseClassForTests
                 @Override
                 public void stateChanged(CuratorFramework client, ConnectionState newState)
                 {
-                    if ( firstListenerAction.compareAndSet(true, false) ) {
+                    if ( firstListenerAction.compareAndSet(true, false) )
+                    {
                         firstListenerState.set(newState);
                         System.out.println("First listener state is " + newState);
                     }
@@ -185,11 +182,7 @@ public class TestFrameworkBackground extends BaseClassForTests
     public void testCuratorCallbackOnError() throws Exception
     {
         Timing timing = new Timing();
-        CuratorFramework client = CuratorFrameworkFactory.builder()
-            .connectString(server.getConnectString())
-            .sessionTimeoutMs(timing.session())
-            .connectionTimeoutMs(timing.connection())
-            .retryPolicy(new RetryOneTime(1000)).build();
+        CuratorFramework client = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).sessionTimeoutMs(timing.session()).connectionTimeoutMs(timing.connection()).retryPolicy(new RetryOneTime(1000)).build();
         final CountDownLatch latch = new CountDownLatch(1);
         try
         {
@@ -198,8 +191,7 @@ public class TestFrameworkBackground extends BaseClassForTests
             {
 
                 @Override
-                public void processResult(CuratorFramework client, CuratorEvent event)
-                    throws Exception
+                public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
                 {
                     if ( event.getResultCode() == Code.CONNECTIONLOSS.intValue() )
                     {
@@ -220,7 +212,7 @@ public class TestFrameworkBackground extends BaseClassForTests
         }
 
     }
-    
+
     /**
      * CURATOR-126
      * Shutdown the Curator client while there are still background operations running.
@@ -228,77 +220,66 @@ public class TestFrameworkBackground extends BaseClassForTests
     @Test
     public void testShutdown() throws Exception
     {
-        final int MAX_CLOSE_WAIT_MS = 5000;
         Timing timing = new Timing();
-        CuratorFramework client = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).sessionTimeoutMs(timing.session()).
-            connectionTimeoutMs(timing.connection()).retryPolicy(new RetryOneTime(1)).maxCloseWaitMs(MAX_CLOSE_WAIT_MS).build();
+        CuratorFramework client = CuratorFrameworkFactory
+            .builder()
+            .connectString(server.getConnectString())
+            .sessionTimeoutMs(timing.session())
+            .connectionTimeoutMs(timing.connection()).retryPolicy(new RetryOneTime(1))
+            .maxCloseWaitMs(timing.forWaiting().milliseconds())
+            .build();
         try
         {
-            client.start();
-
-            BackgroundCallback callback = new BackgroundCallback()
+            final AtomicBoolean hadIllegalStateException = new AtomicBoolean(false);
+            ((CuratorFrameworkImpl)client).debugUnhandledErrorListener = new UnhandledErrorListener()
             {
                 @Override
-                public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
-                {                
+                public void unhandledError(String message, Throwable e)
+                {
+                    if ( e instanceof IllegalStateException )
+                    {
+                        hadIllegalStateException.set(true);
+                    }
                 }
             };
-            
+            client.start();
+
             final CountDownLatch operationReadyLatch = new CountDownLatch(1);
-            
-            //This gets called just before the operation is run.
             ((CuratorFrameworkImpl)client).debugListener = new CuratorFrameworkImpl.DebugBackgroundListener()
             {
                 @Override
                 public void listen(OperationAndData<?> data)
                 {
-                    operationReadyLatch.countDown();
-                    
-                    try {
-                        Thread.sleep(MAX_CLOSE_WAIT_MS / 2);
-                    } catch(InterruptedException e) {
+                    try
+                    {
+                        operationReadyLatch.await();
+                    }
+                    catch ( InterruptedException e )
+                    {
+                        Thread.currentThread().interrupt();
                     }
-                }
-            };            
-            
-            Assert.assertTrue(client.getZookeeperClient().blockUntilConnectedOrTimedOut(), "Failed to connect");
-
-            server.stop();
-            
-            BackgroundOperation<String> background = new BackgroundOperation<String>()
-            {
-
-                @Override
-                public void performBackgroundOperation(OperationAndData<String> data)
-                        throws Exception
-                {
                 }
             };
-            
-            ErrorCallback<String> errorCallback = new ErrorCallback<String>()
-            {
 
-                @Override
-                public void retriesExhausted(
-                        OperationAndData<String> operationAndData)
-                {
-                }
-            };
-            
-            OperationAndData<String> operation = new OperationAndData<String>(background,
-                    "thedata", callback, errorCallback, null);
-            
-            ((CuratorFrameworkImpl)client).queueOperation(operation);
-            
-            operationReadyLatch.await();
-            
+            // queue a background operation that will block due to the debugListener
+            client.create().inBackground().forPath("/hey");
+            timing.sleepABit();
+
+            // close the client while the background is still blocked
             client.close();
+
+            // unblock the background
+            operationReadyLatch.countDown();
+            timing.sleepABit();
+
+            // should not generate an exception
+            Assert.assertFalse(hadIllegalStateException.get());
         }
         finally
         {
             CloseableUtils.closeQuietly(client);
-        }        
+        }
     }
-    
-    
+
+
 }