You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2018/01/02 16:03:53 UTC
[1/7] curator git commit: To avoid massive spinning,
background operations are paused for 1 second when there is no
connection. However,
this can hurt performance terribly if background operations are queued,
for example, prior to initial connection. Thi
Repository: curator
Updated Branches:
refs/heads/master cd365c414 -> 1fade17a2
To avoid massive spinning, background operations are paused for 1 second when there is no connection. However, this can hurt performance terribly if background operations are queued, for example, prior to initial connection. This changes the behavior so that the sleeps are cleared when the connection is re-established. A separate queue of "forced sleep" operations are kept while the connection is down. This queue then gets its sleep cleared when the connection is re-established.
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/bfdb790b
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/bfdb790b
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/bfdb790b
Branch: refs/heads/master
Commit: bfdb790bc69022b0eef74558e9511e6ed2b665e9
Parents: d502dde
Author: randgalt <ra...@apache.org>
Authored: Tue Nov 21 18:11:01 2017 -0800
Committer: randgalt <ra...@apache.org>
Committed: Tue Nov 21 18:11:01 2017 -0800
----------------------------------------------------------------------
.../framework/imps/CuratorFrameworkImpl.java | 37 ++++++++++++++++++--
.../framework/imps/OperationAndData.java | 5 +++
.../framework/imps/TestFrameworkEdges.java | 31 ++++++++++++++++
3 files changed, 70 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/bfdb790b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 7488793..4a5fad3 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -54,13 +54,16 @@ import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -76,6 +79,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
private final ThreadFactory threadFactory;
private final int maxCloseWaitMs;
private final BlockingQueue<OperationAndData<?>> backgroundOperations;
+ private final BlockingQueue<OperationAndData<?>> forcedSleepOperations;
private final NamespaceImpl namespace;
private final ConnectionStateManager connectionStateManager;
private final List<AuthInfo> authInfos;
@@ -136,6 +140,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
listeners = new ListenerContainer<CuratorListener>();
unhandledErrorListeners = new ListenerContainer<UnhandledErrorListener>();
backgroundOperations = new DelayQueue<OperationAndData<?>>();
+ forcedSleepOperations = new LinkedBlockingQueue<>();
namespace = new NamespaceImpl(this, builder.getNamespace());
threadFactory = getThreadFactory(builder);
maxCloseWaitMs = builder.getMaxCloseWaitMs();
@@ -217,6 +222,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
threadFactory = parent.threadFactory;
maxCloseWaitMs = parent.maxCloseWaitMs;
backgroundOperations = parent.backgroundOperations;
+ forcedSleepOperations = parent.forcedSleepOperations;
connectionStateManager = parent.connectionStateManager;
defaultData = parent.defaultData;
failedDeleteManager = parent.failedDeleteManager;
@@ -640,12 +646,14 @@ public class CuratorFrameworkImpl implements CuratorFramework
}
}
- <DATA_TYPE> void queueOperation(OperationAndData<DATA_TYPE> operationAndData)
+ <DATA_TYPE> boolean queueOperation(OperationAndData<DATA_TYPE> operationAndData)
{
if ( getState() == CuratorFrameworkState.STARTED )
{
backgroundOperations.offer(operationAndData);
+ return true;
}
+ return false;
}
void logError(String reason, final Throwable e)
@@ -730,6 +738,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
{
internalConnectionHandler.checkNewConnection(this);
connectionStateManager.addStateChange(ConnectionState.RECONNECTED);
+ unSleepBackgroundOperations();
}
else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly )
{
@@ -940,8 +949,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
{
throw new CuratorConnectionLossException();
}
- operationAndData.sleepFor(1, TimeUnit.SECONDS);
- queueOperation(operationAndData);
+ sleepAndQueueOperation(operationAndData);
}
}
catch ( Throwable e )
@@ -973,6 +981,29 @@ public class CuratorFrameworkImpl implements CuratorFramework
}
}
+ @VisibleForTesting
+ volatile long sleepAndQueueOperationSeconds = 1;
+
+ private void sleepAndQueueOperation(OperationAndData<?> operationAndData) throws InterruptedException
+ {
+ operationAndData.sleepFor(sleepAndQueueOperationSeconds, TimeUnit.SECONDS);
+ if ( queueOperation(operationAndData) )
+ {
+ forcedSleepOperations.add(operationAndData);
+ }
+ }
+
+ private void unSleepBackgroundOperations()
+ {
+ Collection<OperationAndData<?>> drain = new ArrayList<>(forcedSleepOperations.size());
+ forcedSleepOperations.drainTo(drain);
+ log.debug("Clearing sleep for {} operations", drain.size());
+ for ( OperationAndData<?> operation : drain )
+ {
+ operation.clearSleep();
+ }
+ }
+
private void processEvent(final CuratorEvent curatorEvent)
{
if ( curatorEvent.getType() == CuratorEventType.WATCHED )
http://git-wip-us.apache.org/repos/asf/curator/blob/bfdb790b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
index 3d69e5d..8370415 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
@@ -115,6 +115,11 @@ class OperationAndData<T> implements Delayed, RetrySleeper
return operation;
}
+ void clearSleep()
+ {
+ sleepUntilTimeMs.set(0);
+ }
+
@Override
public void sleepFor(long time, TimeUnit unit) throws InterruptedException
{
http://git-wip-us.apache.org/repos/asf/curator/blob/bfdb790b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
index 9c4afe3..35e9fb1 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
@@ -61,6 +61,37 @@ public class TestFrameworkEdges extends BaseClassForTests
private final Timing2 timing = new Timing2();
@Test
+ public void testBackgroundLatencyUnSleep() throws Exception
+ {
+ server.stop();
+
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+ ((CuratorFrameworkImpl)client).sleepAndQueueOperationSeconds = Integer.MAX_VALUE;
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ BackgroundCallback callback = new BackgroundCallback()
+ {
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+ {
+ latch.countDown();
+ }
+ };
+ client.create().inBackground(callback).forPath("/test");
+ server.restart();
+
+ Assert.assertTrue(timing.awaitLatch(latch));
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
public void testCreateContainersForBadConnect() throws Exception
{
final int serverPort = server.getPort();
[4/7] curator git commit: added comment
Posted by ra...@apache.org.
added comment
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/3f7b610a
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/3f7b610a
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/3f7b610a
Branch: refs/heads/master
Commit: 3f7b610ad5c5a0c6a7b0331f02294bd433a54554
Parents: 0d01ea5
Author: randgalt <ra...@apache.org>
Authored: Tue Dec 5 12:47:35 2017 -0800
Committer: randgalt <ra...@apache.org>
Committed: Tue Dec 5 12:47:35 2017 -0800
----------------------------------------------------------------------
.../org/apache/curator/framework/imps/CuratorFrameworkImpl.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/3f7b610a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 1d3c1b6..2bd5c7c 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -1005,7 +1005,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
for ( OperationAndData<?> operation : drain )
{
operation.clearSleep();
- if ( backgroundOperations.remove(operation) )
+ if ( backgroundOperations.remove(operation) ) // due to the internals of DelayQueue, operation must be removed/re-added so that re-sorting occurs
{
backgroundOperations.offer(operation);
}
[6/7] curator git commit: only countdown on success
Posted by ra...@apache.org.
only countdown on success
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/ab2e7563
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/ab2e7563
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/ab2e7563
Branch: refs/heads/master
Commit: ab2e756323b5c5398b01cb1493fbc984f572b946
Parents: 166b5c3
Author: randgalt <ra...@apache.org>
Authored: Tue Dec 5 13:00:02 2017 -0800
Committer: randgalt <ra...@apache.org>
Committed: Tue Dec 5 13:00:02 2017 -0800
----------------------------------------------------------------------
.../org/apache/curator/framework/imps/TestFrameworkEdges.java | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/ab2e7563/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
index 75d97b1..86133b8 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
@@ -77,7 +77,10 @@ public class TestFrameworkEdges extends BaseClassForTests
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
- latch.countDown();
+ if ( (event.getType() == CuratorEventType.CREATE) && (event.getResultCode() == KeeperException.Code.OK.intValue()) )
+ {
+ latch.countDown();
+ }
}
};
// queue multiple operations for a more complete test
[2/7] curator git commit: I believe the operation needs to be
removed/added back to the delay queue to get restorting to occur. I'm
concerned however, about altering operation ordering
Posted by ra...@apache.org.
I believe the operation needs to be removed/added back to the delay queue to get restorting to occur. I'm concerned however, about altering operation ordering
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/c49f037c
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/c49f037c
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/c49f037c
Branch: refs/heads/master
Commit: c49f037ce87ce527bc244a94b4d2d1160c9ef012
Parents: bfdb790
Author: randgalt <ra...@apache.org>
Authored: Tue Nov 21 18:34:45 2017 -0800
Committer: randgalt <ra...@apache.org>
Committed: Tue Nov 21 18:34:45 2017 -0800
----------------------------------------------------------------------
.../org/apache/curator/framework/imps/CuratorFrameworkImpl.java | 4 ++++
1 file changed, 4 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/c49f037c/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 4a5fad3..c91758a 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -1001,6 +1001,10 @@ public class CuratorFrameworkImpl implements CuratorFramework
for ( OperationAndData<?> operation : drain )
{
operation.clearSleep();
+ if ( backgroundOperations.remove(operation) )
+ {
+ backgroundOperations.offer(operation);
+ }
}
}
[3/7] curator git commit: added comment
Posted by ra...@apache.org.
added comment
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/0d01ea55
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/0d01ea55
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/0d01ea55
Branch: refs/heads/master
Commit: 0d01ea5587024fa517b79880c9cf5193f3a6522e
Parents: c49f037
Author: randgalt <ra...@apache.org>
Authored: Tue Dec 5 12:45:56 2017 -0800
Committer: randgalt <ra...@apache.org>
Committed: Tue Dec 5 12:45:56 2017 -0800
----------------------------------------------------------------------
.../org/apache/curator/framework/imps/CuratorFrameworkImpl.java | 4 ++++
1 file changed, 4 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/0d01ea55/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index c91758a..1d3c1b6 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -646,6 +646,10 @@ public class CuratorFrameworkImpl implements CuratorFramework
}
}
+ /**
+ * @param operationAndData operation entry
+ * @return true if the operation was actually queued, false if not
+ */
<DATA_TYPE> boolean queueOperation(OperationAndData<DATA_TYPE> operationAndData)
{
if ( getState() == CuratorFrameworkState.STARTED )
[5/7] curator git commit: queue multiple operations for a more
complete test
Posted by ra...@apache.org.
queue multiple operations for a more complete test
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/166b5c3d
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/166b5c3d
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/166b5c3d
Branch: refs/heads/master
Commit: 166b5c3d5091bd254747f6372534519d37883bfa
Parents: 3f7b610
Author: randgalt <ra...@apache.org>
Authored: Tue Dec 5 12:57:41 2017 -0800
Committer: randgalt <ra...@apache.org>
Committed: Tue Dec 5 12:57:41 2017 -0800
----------------------------------------------------------------------
.../org/apache/curator/framework/imps/TestFrameworkEdges.java | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/166b5c3d/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
index 35e9fb1..75d97b1 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
@@ -71,7 +71,7 @@ public class TestFrameworkEdges extends BaseClassForTests
client.start();
((CuratorFrameworkImpl)client).sleepAndQueueOperationSeconds = Integer.MAX_VALUE;
- final CountDownLatch latch = new CountDownLatch(1);
+ final CountDownLatch latch = new CountDownLatch(3);
BackgroundCallback callback = new BackgroundCallback()
{
@Override
@@ -80,7 +80,10 @@ public class TestFrameworkEdges extends BaseClassForTests
latch.countDown();
}
};
+ // queue multiple operations for a more complete test
client.create().inBackground(callback).forPath("/test");
+ client.create().inBackground(callback).forPath("/test/one");
+ client.create().inBackground(callback).forPath("/test/two");
server.restart();
Assert.assertTrue(timing.awaitLatch(latch));
[7/7] curator git commit: Merge branch 'CURATOR-443'
Posted by ra...@apache.org.
Merge branch 'CURATOR-443'
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/1fade17a
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/1fade17a
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/1fade17a
Branch: refs/heads/master
Commit: 1fade17a26b0b1e7d630f5b5ffbab298ecf5f1d6
Parents: cd365c4 ab2e756
Author: randgalt <ra...@apache.org>
Authored: Tue Jan 2 11:03:42 2018 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Jan 2 11:03:42 2018 -0500
----------------------------------------------------------------------
.../framework/imps/CuratorFrameworkImpl.java | 45 ++++++++++++++++++--
.../framework/imps/OperationAndData.java | 5 +++
.../framework/imps/TestFrameworkEdges.java | 37 ++++++++++++++++
3 files changed, 84 insertions(+), 3 deletions(-)
----------------------------------------------------------------------