You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2018/01/02 16:03:53 UTC

[1/7] curator git commit: To avoid massive spinning, background operations are paused for 1 second when there is no connection. However, this can hurt performance terribly if background operations are queued, for example, prior to initial connection. Thi

Repository: curator
Updated Branches:
  refs/heads/master cd365c414 -> 1fade17a2


To avoid massive spinning, background operations are paused for 1 second when there is no connection. However, this can hurt performance terribly if background operations are queued, for example, prior to initial connection. This changes the behavior so that the sleeps are cleared when the connection is re-established. A separate queue of "forced sleep" operations are kept while the connection is down. This queue then gets its sleep cleared when the connection is re-established.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/bfdb790b
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/bfdb790b
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/bfdb790b

Branch: refs/heads/master
Commit: bfdb790bc69022b0eef74558e9511e6ed2b665e9
Parents: d502dde
Author: randgalt <ra...@apache.org>
Authored: Tue Nov 21 18:11:01 2017 -0800
Committer: randgalt <ra...@apache.org>
Committed: Tue Nov 21 18:11:01 2017 -0800

----------------------------------------------------------------------
 .../framework/imps/CuratorFrameworkImpl.java    | 37 ++++++++++++++++++--
 .../framework/imps/OperationAndData.java        |  5 +++
 .../framework/imps/TestFrameworkEdges.java      | 31 ++++++++++++++++
 3 files changed, 70 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/bfdb790b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 7488793..4a5fad3 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -54,13 +54,16 @@ import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -76,6 +79,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
     private final ThreadFactory threadFactory;
     private final int maxCloseWaitMs;
     private final BlockingQueue<OperationAndData<?>> backgroundOperations;
+    private final BlockingQueue<OperationAndData<?>> forcedSleepOperations;
     private final NamespaceImpl namespace;
     private final ConnectionStateManager connectionStateManager;
     private final List<AuthInfo> authInfos;
@@ -136,6 +140,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
         listeners = new ListenerContainer<CuratorListener>();
         unhandledErrorListeners = new ListenerContainer<UnhandledErrorListener>();
         backgroundOperations = new DelayQueue<OperationAndData<?>>();
+        forcedSleepOperations = new LinkedBlockingQueue<>();
         namespace = new NamespaceImpl(this, builder.getNamespace());
         threadFactory = getThreadFactory(builder);
         maxCloseWaitMs = builder.getMaxCloseWaitMs();
@@ -217,6 +222,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
         threadFactory = parent.threadFactory;
         maxCloseWaitMs = parent.maxCloseWaitMs;
         backgroundOperations = parent.backgroundOperations;
+        forcedSleepOperations = parent.forcedSleepOperations;
         connectionStateManager = parent.connectionStateManager;
         defaultData = parent.defaultData;
         failedDeleteManager = parent.failedDeleteManager;
@@ -640,12 +646,14 @@ public class CuratorFrameworkImpl implements CuratorFramework
         }
     }
 
-    <DATA_TYPE> void queueOperation(OperationAndData<DATA_TYPE> operationAndData)
+    <DATA_TYPE> boolean queueOperation(OperationAndData<DATA_TYPE> operationAndData)
     {
         if ( getState() == CuratorFrameworkState.STARTED )
         {
             backgroundOperations.offer(operationAndData);
+            return true;
         }
+        return false;
     }
 
     void logError(String reason, final Throwable e)
@@ -730,6 +738,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
         {
             internalConnectionHandler.checkNewConnection(this);
             connectionStateManager.addStateChange(ConnectionState.RECONNECTED);
+            unSleepBackgroundOperations();
         }
         else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly )
         {
@@ -940,8 +949,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
                 {
                     throw new CuratorConnectionLossException();
                 }
-                operationAndData.sleepFor(1, TimeUnit.SECONDS);
-                queueOperation(operationAndData);
+                sleepAndQueueOperation(operationAndData);
             }
         }
         catch ( Throwable e )
@@ -973,6 +981,29 @@ public class CuratorFrameworkImpl implements CuratorFramework
         }
     }
 
+    @VisibleForTesting
+    volatile long sleepAndQueueOperationSeconds = 1;
+
+    private void sleepAndQueueOperation(OperationAndData<?> operationAndData) throws InterruptedException
+    {
+        operationAndData.sleepFor(sleepAndQueueOperationSeconds, TimeUnit.SECONDS);
+        if ( queueOperation(operationAndData) )
+        {
+            forcedSleepOperations.add(operationAndData);
+        }
+    }
+
+    private void unSleepBackgroundOperations()
+    {
+        Collection<OperationAndData<?>> drain = new ArrayList<>(forcedSleepOperations.size());
+        forcedSleepOperations.drainTo(drain);
+        log.debug("Clearing sleep for {} operations", drain.size());
+        for ( OperationAndData<?> operation : drain )
+        {
+            operation.clearSleep();
+        }
+    }
+
     private void processEvent(final CuratorEvent curatorEvent)
     {
         if ( curatorEvent.getType() == CuratorEventType.WATCHED )

http://git-wip-us.apache.org/repos/asf/curator/blob/bfdb790b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
index 3d69e5d..8370415 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
@@ -115,6 +115,11 @@ class OperationAndData<T> implements Delayed, RetrySleeper
         return operation;
     }
 
+    void clearSleep()
+    {
+        sleepUntilTimeMs.set(0);
+    }
+
     @Override
     public void sleepFor(long time, TimeUnit unit) throws InterruptedException
     {

http://git-wip-us.apache.org/repos/asf/curator/blob/bfdb790b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
index 9c4afe3..35e9fb1 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
@@ -61,6 +61,37 @@ public class TestFrameworkEdges extends BaseClassForTests
     private final Timing2 timing = new Timing2();
 
     @Test
+    public void testBackgroundLatencyUnSleep() throws Exception
+    {
+        server.stop();
+
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+            ((CuratorFrameworkImpl)client).sleepAndQueueOperationSeconds = Integer.MAX_VALUE;
+
+            final CountDownLatch latch = new CountDownLatch(1);
+            BackgroundCallback callback = new BackgroundCallback()
+            {
+                @Override
+                public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+                {
+                    latch.countDown();
+                }
+            };
+            client.create().inBackground(callback).forPath("/test");
+            server.restart();
+
+            Assert.assertTrue(timing.awaitLatch(latch));
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
     public void testCreateContainersForBadConnect() throws Exception
     {
         final int serverPort = server.getPort();


[4/7] curator git commit: added comment

Posted by ra...@apache.org.
added comment


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/3f7b610a
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/3f7b610a
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/3f7b610a

Branch: refs/heads/master
Commit: 3f7b610ad5c5a0c6a7b0331f02294bd433a54554
Parents: 0d01ea5
Author: randgalt <ra...@apache.org>
Authored: Tue Dec 5 12:47:35 2017 -0800
Committer: randgalt <ra...@apache.org>
Committed: Tue Dec 5 12:47:35 2017 -0800

----------------------------------------------------------------------
 .../org/apache/curator/framework/imps/CuratorFrameworkImpl.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/3f7b610a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 1d3c1b6..2bd5c7c 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -1005,7 +1005,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
         for ( OperationAndData<?> operation : drain )
         {
             operation.clearSleep();
-            if ( backgroundOperations.remove(operation) )
+            if ( backgroundOperations.remove(operation) )   // due to the internals of DelayQueue, operation must be removed/re-added so that re-sorting occurs
             {
                 backgroundOperations.offer(operation);
             }


[6/7] curator git commit: only countdown on success

Posted by ra...@apache.org.
only countdown on success


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/ab2e7563
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/ab2e7563
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/ab2e7563

Branch: refs/heads/master
Commit: ab2e756323b5c5398b01cb1493fbc984f572b946
Parents: 166b5c3
Author: randgalt <ra...@apache.org>
Authored: Tue Dec 5 13:00:02 2017 -0800
Committer: randgalt <ra...@apache.org>
Committed: Tue Dec 5 13:00:02 2017 -0800

----------------------------------------------------------------------
 .../org/apache/curator/framework/imps/TestFrameworkEdges.java   | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/ab2e7563/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
index 75d97b1..86133b8 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
@@ -77,7 +77,10 @@ public class TestFrameworkEdges extends BaseClassForTests
                 @Override
                 public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
                 {
-                    latch.countDown();
+                    if ( (event.getType() == CuratorEventType.CREATE) && (event.getResultCode() == KeeperException.Code.OK.intValue()) )
+                    {
+                        latch.countDown();
+                    }
                 }
             };
             // queue multiple operations for a more complete test


[2/7] curator git commit: I believe the operation needs to be removed/added back to the delay queue to get restorting to occur. I'm concerned however, about altering operation ordering

Posted by ra...@apache.org.
I believe the operation needs to be removed/added back to the delay queue to get restorting to occur. I'm concerned however, about altering operation ordering


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/c49f037c
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/c49f037c
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/c49f037c

Branch: refs/heads/master
Commit: c49f037ce87ce527bc244a94b4d2d1160c9ef012
Parents: bfdb790
Author: randgalt <ra...@apache.org>
Authored: Tue Nov 21 18:34:45 2017 -0800
Committer: randgalt <ra...@apache.org>
Committed: Tue Nov 21 18:34:45 2017 -0800

----------------------------------------------------------------------
 .../org/apache/curator/framework/imps/CuratorFrameworkImpl.java  | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/c49f037c/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 4a5fad3..c91758a 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -1001,6 +1001,10 @@ public class CuratorFrameworkImpl implements CuratorFramework
         for ( OperationAndData<?> operation : drain )
         {
             operation.clearSleep();
+            if ( backgroundOperations.remove(operation) )
+            {
+                backgroundOperations.offer(operation);
+            }
         }
     }
 


[3/7] curator git commit: added comment

Posted by ra...@apache.org.
added comment


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/0d01ea55
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/0d01ea55
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/0d01ea55

Branch: refs/heads/master
Commit: 0d01ea5587024fa517b79880c9cf5193f3a6522e
Parents: c49f037
Author: randgalt <ra...@apache.org>
Authored: Tue Dec 5 12:45:56 2017 -0800
Committer: randgalt <ra...@apache.org>
Committed: Tue Dec 5 12:45:56 2017 -0800

----------------------------------------------------------------------
 .../org/apache/curator/framework/imps/CuratorFrameworkImpl.java  | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/0d01ea55/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index c91758a..1d3c1b6 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -646,6 +646,10 @@ public class CuratorFrameworkImpl implements CuratorFramework
         }
     }
 
+    /**
+     * @param operationAndData operation entry
+     * @return true if the operation was actually queued, false if not
+     */
     <DATA_TYPE> boolean queueOperation(OperationAndData<DATA_TYPE> operationAndData)
     {
         if ( getState() == CuratorFrameworkState.STARTED )


[5/7] curator git commit: queue multiple operations for a more complete test

Posted by ra...@apache.org.
queue multiple operations for a more complete test


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/166b5c3d
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/166b5c3d
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/166b5c3d

Branch: refs/heads/master
Commit: 166b5c3d5091bd254747f6372534519d37883bfa
Parents: 3f7b610
Author: randgalt <ra...@apache.org>
Authored: Tue Dec 5 12:57:41 2017 -0800
Committer: randgalt <ra...@apache.org>
Committed: Tue Dec 5 12:57:41 2017 -0800

----------------------------------------------------------------------
 .../org/apache/curator/framework/imps/TestFrameworkEdges.java   | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/166b5c3d/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
index 35e9fb1..75d97b1 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
@@ -71,7 +71,7 @@ public class TestFrameworkEdges extends BaseClassForTests
             client.start();
             ((CuratorFrameworkImpl)client).sleepAndQueueOperationSeconds = Integer.MAX_VALUE;
 
-            final CountDownLatch latch = new CountDownLatch(1);
+            final CountDownLatch latch = new CountDownLatch(3);
             BackgroundCallback callback = new BackgroundCallback()
             {
                 @Override
@@ -80,7 +80,10 @@ public class TestFrameworkEdges extends BaseClassForTests
                     latch.countDown();
                 }
             };
+            // queue multiple operations for a more complete test
             client.create().inBackground(callback).forPath("/test");
+            client.create().inBackground(callback).forPath("/test/one");
+            client.create().inBackground(callback).forPath("/test/two");
             server.restart();
 
             Assert.assertTrue(timing.awaitLatch(latch));


[7/7] curator git commit: Merge branch 'CURATOR-443'

Posted by ra...@apache.org.
Merge branch 'CURATOR-443'


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/1fade17a
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/1fade17a
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/1fade17a

Branch: refs/heads/master
Commit: 1fade17a26b0b1e7d630f5b5ffbab298ecf5f1d6
Parents: cd365c4 ab2e756
Author: randgalt <ra...@apache.org>
Authored: Tue Jan 2 11:03:42 2018 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Jan 2 11:03:42 2018 -0500

----------------------------------------------------------------------
 .../framework/imps/CuratorFrameworkImpl.java    | 45 ++++++++++++++++++--
 .../framework/imps/OperationAndData.java        |  5 +++
 .../framework/imps/TestFrameworkEdges.java      | 37 ++++++++++++++++
 3 files changed, 84 insertions(+), 3 deletions(-)
----------------------------------------------------------------------