You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ca...@apache.org on 2016/06/01 00:06:39 UTC
[1/5] curator git commit: Reworked WatcherRemovalManager. It now
stores watchers only on successful operations. This is more like how ZK does
it. Also, exists watcher must be stored when there is a NoNode result.
Repository: curator
Updated Branches:
refs/heads/CURATOR-3.0 eefdf8ee9 -> e2200daad
Reworked WatcherRemovalManager. It now stores watchers only on successful operations. This is more like how ZK does it.
Also, exists watcher must be stored when there is a NoNode result.
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/f59f23c7
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/f59f23c7
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/f59f23c7
Branch: refs/heads/CURATOR-3.0
Commit: f59f23c703815317d4ef1d39e2b815e402d1559b
Parents: eefdf8e
Author: randgalt <ra...@apache.org>
Authored: Thu May 26 16:59:08 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Thu May 26 16:59:08 2016 -0500
----------------------------------------------------------------------
curator-framework/pom.xml | 14 ++
.../curator/framework/imps/Backgrounding.java | 5 -
.../framework/imps/CuratorFrameworkImpl.java | 2 -
.../curator/framework/imps/EnsembleTracker.java | 22 +-
.../framework/imps/ExistsBuilderImpl.java | 7 +-
.../framework/imps/GetChildrenBuilderImpl.java | 8 +-
.../framework/imps/GetConfigBuilderImpl.java | 11 +-
.../framework/imps/GetDataBuilderImpl.java | 7 +-
.../framework/imps/OperationAndData.java | 14 +-
.../imps/RemoveWatchesBuilderImpl.java | 2 +-
.../apache/curator/framework/imps/Watching.java | 41 +---
.../curator/framework/imps/TestCleanState.java | 103 +++++++++
.../imps/TestWatcherRemovalManager.java | 208 +++++++++++++++----
curator-recipes/pom.xml | 7 +
.../curator/framework/imps/TestCleanState.java | 77 -------
pom.xml | 7 +
16 files changed, 357 insertions(+), 178 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-framework/pom.xml
----------------------------------------------------------------------
diff --git a/curator-framework/pom.xml b/curator-framework/pom.xml
index d6575cc..1a65898 100644
--- a/curator-framework/pom.xml
+++ b/curator-framework/pom.xml
@@ -88,4 +88,18 @@
</dependency>
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</project>
http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-framework/src/main/java/org/apache/curator/framework/imps/Backgrounding.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/Backgrounding.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/Backgrounding.java
index 0b823c4..4ac2edc 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/Backgrounding.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/Backgrounding.java
@@ -116,11 +116,6 @@ class Backgrounding
{
if ( e != null )
{
- if ( watching != null )
- {
- watching.resetCurrentWatcher();
- }
-
if ( errorListener != null )
{
errorListener.unhandledError("n/a", e);
http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 51485f2..aba14c6 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -910,7 +910,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
void performBackgroundOperation(OperationAndData<?> operationAndData)
{
- operationAndData.resetCurrentWatcher();
try
{
if ( !operationAndData.isConnectionRequired() || client.isConnected() )
@@ -930,7 +929,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
}
catch ( Throwable e )
{
- operationAndData.resetCurrentWatcher();
ThreadUtils.checkInterrupted(e);
/**
http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
index bc59512..0b93cab 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
@@ -43,6 +43,7 @@ import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.util.Arrays;
import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@VisibleForTesting
@@ -52,6 +53,7 @@ public class EnsembleTracker implements Closeable, CuratorWatcher
private final WatcherRemoveCuratorFramework client;
private final EnsembleProvider ensembleProvider;
private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
+ private final AtomicInteger outstanding = new AtomicInteger(0);
private final AtomicReference<QuorumMaj> currentConfig = new AtomicReference<>(new QuorumMaj(Maps.<Long, QuorumPeer.QuorumServer>newHashMap()));
private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
{
@@ -121,22 +123,38 @@ public class EnsembleTracker implements Closeable, CuratorWatcher
return currentConfig.get();
}
+ @VisibleForTesting
+ public boolean hasOutstanding()
+ {
+ return outstanding.get() > 0;
+ }
+
private void reset() throws Exception
{
- if ( client.getState() == CuratorFrameworkState.STARTED )
+ if ( (client.getState() == CuratorFrameworkState.STARTED) && (state.get() == State.STARTED) )
{
BackgroundCallback backgroundCallback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
+ outstanding.decrementAndGet();
if ( (event.getType() == CuratorEventType.GET_CONFIG) && (event.getResultCode() == KeeperException.Code.OK.intValue()) )
{
processConfigData(event.getData());
}
}
};
- client.getConfig().usingWatcher(this).inBackground(backgroundCallback).forEnsemble();
+ outstanding.incrementAndGet();
+ try
+ {
+ client.getConfig().usingWatcher(this).inBackground(backgroundCallback).forEnsemble();
+ }
+ catch ( Exception e )
+ {
+ outstanding.decrementAndGet();
+ throw e;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java
index 964706f..960b577 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java
@@ -132,7 +132,7 @@ class ExistsBuilderImpl implements ExistsBuilder, BackgroundOperation<String>, E
@Override
public void processResult(int rc, String path, Object ctx, Stat stat)
{
- watching.checkBackroundRc(rc);
+ watching.commitWatcher(rc, true);
trace.commit();
CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.EXISTS, rc, path, null, ctx, stat, null, null, null, null, null);
client.processBackgroundOperation(operationAndData, event);
@@ -222,8 +222,9 @@ class ExistsBuilderImpl implements ExistsBuilder, BackgroundOperation<String>, E
private Stat pathInForegroundStandard(final String path) throws Exception
{
TimeTrace trace = client.getZookeeperClient().startTracer("ExistsBuilderImpl-Foreground");
- Stat returnStat = watching.callWithRetry
+ Stat returnStat = RetryLoop.callWithRetry
(
+ client.getZookeeperClient(),
new Callable<Stat>()
{
@Override
@@ -237,6 +238,8 @@ class ExistsBuilderImpl implements ExistsBuilder, BackgroundOperation<String>, E
else
{
returnStat = client.getZooKeeper().exists(path, watching.getWatcher(path));
+ int rc = (returnStat != null) ? KeeperException.NoNodeException.Code.OK.intValue() : KeeperException.NoNodeException.Code.NONODE.intValue();
+ watching.commitWatcher(rc, true);
}
return returnStat;
}
http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java
index 0b1bb07..000c911 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java
@@ -19,6 +19,7 @@
package org.apache.curator.framework.imps;
import com.google.common.collect.Lists;
+import org.apache.curator.RetryLoop;
import org.apache.curator.TimeTrace;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.BackgroundPathable;
@@ -30,6 +31,7 @@ import org.apache.curator.framework.api.Pathable;
import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.api.WatchPathable;
import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import java.util.List;
@@ -167,7 +169,7 @@ class GetChildrenBuilderImpl implements GetChildrenBuilder, BackgroundOperation<
@Override
public void processResult(int rc, String path, Object o, List<String> strings, Stat stat)
{
- watching.checkBackroundRc(rc);
+ watching.commitWatcher(rc, false);
trace.commit();
if ( strings == null )
{
@@ -214,8 +216,9 @@ class GetChildrenBuilderImpl implements GetChildrenBuilder, BackgroundOperation<
private List<String> pathInForeground(final String path) throws Exception
{
TimeTrace trace = client.getZookeeperClient().startTracer("GetChildrenBuilderImpl-Foreground");
- List<String> children = watching.callWithRetry
+ List<String> children = RetryLoop.callWithRetry
(
+ client.getZookeeperClient(),
new Callable<List<String>>()
{
@Override
@@ -229,6 +232,7 @@ class GetChildrenBuilderImpl implements GetChildrenBuilder, BackgroundOperation<
else
{
children = client.getZooKeeper().getChildren(path, watching.getWatcher(path), responseStat);
+ watching.commitWatcher(KeeperException.NoNodeException.Code.OK.intValue(), false);
}
return children;
}
http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java
index 3a210b8..1ab9043 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java
@@ -19,9 +19,11 @@
package org.apache.curator.framework.imps;
+import org.apache.curator.RetryLoop;
import org.apache.curator.TimeTrace;
import org.apache.curator.framework.api.*;
import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
@@ -206,7 +208,7 @@ public class GetConfigBuilderImpl implements GetConfigBuilder, BackgroundOperati
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat)
{
- watching.checkBackroundRc(rc);
+ watching.commitWatcher(rc, false);
trace.commit();
CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.GET_CONFIG, rc, path, null, ctx, stat, data, null, null, null, null);
client.processBackgroundOperation(operationAndData, event);
@@ -232,8 +234,9 @@ public class GetConfigBuilderImpl implements GetConfigBuilder, BackgroundOperati
TimeTrace trace = client.getZookeeperClient().startTracer("GetConfigBuilderImpl-Foreground");
try
{
- return watching.callWithRetry
+ return RetryLoop.callWithRetry
(
+ client.getZookeeperClient(),
new Callable<byte[]>()
{
@Override
@@ -243,7 +246,9 @@ public class GetConfigBuilderImpl implements GetConfigBuilder, BackgroundOperati
{
return client.getZooKeeper().getConfig(true, stat);
}
- return client.getZooKeeper().getConfig(watching.getWatcher(ZooDefs.CONFIG_NODE), stat);
+ byte[] config = client.getZooKeeper().getConfig(watching.getWatcher(ZooDefs.CONFIG_NODE), stat);
+ watching.commitWatcher(KeeperException.NoNodeException.Code.OK.intValue(), false);
+ return config;
}
}
);
http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java
index 5528138..bae126c 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java
@@ -18,6 +18,7 @@
*/
package org.apache.curator.framework.imps;
+import org.apache.curator.RetryLoop;
import org.apache.curator.TimeTrace;
import org.apache.curator.framework.api.*;
import org.apache.curator.utils.ThreadUtils;
@@ -238,7 +239,7 @@ class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation<String>,
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat)
{
- watching.checkBackroundRc(rc);
+ watching.commitWatcher(rc, false);
trace.commit();
if ( decompress && (data != null) )
{
@@ -294,8 +295,9 @@ class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation<String>,
private byte[] pathInForeground(final String path) throws Exception
{
TimeTrace trace = client.getZookeeperClient().startTracer("GetDataBuilderImpl-Foreground");
- byte[] responseData = watching.callWithRetry
+ byte[] responseData = RetryLoop.callWithRetry
(
+ client.getZookeeperClient(),
new Callable<byte[]>()
{
@Override
@@ -309,6 +311,7 @@ class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation<String>,
else
{
responseData = client.getZooKeeper().getData(path, watching.getWatcher(path), responseStat);
+ watching.commitWatcher(KeeperException.NoNodeException.Code.OK.intValue(), false);
}
return responseData;
}
http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
index 73ea38e..3d69e5d 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
@@ -41,14 +41,13 @@ class OperationAndData<T> implements Delayed, RetrySleeper
private final AtomicLong ordinal = new AtomicLong();
private final Object context;
private final boolean connectionRequired;
- private final Watching watching;
interface ErrorCallback<T>
{
void retriesExhausted(OperationAndData<T> operationAndData);
}
- OperationAndData(BackgroundOperation<T> operation, T data, BackgroundCallback callback, ErrorCallback<T> errorCallback, Object context, boolean connectionRequired, Watching watching)
+ OperationAndData(BackgroundOperation<T> operation, T data, BackgroundCallback callback, ErrorCallback<T> errorCallback, Object context, boolean connectionRequired)
{
this.operation = operation;
this.data = data;
@@ -56,7 +55,6 @@ class OperationAndData<T> implements Delayed, RetrySleeper
this.errorCallback = errorCallback;
this.context = context;
this.connectionRequired = connectionRequired;
- this.watching = watching;
reset();
}
@@ -68,7 +66,7 @@ class OperationAndData<T> implements Delayed, RetrySleeper
OperationAndData(BackgroundOperation<T> operation, T data, BackgroundCallback callback, ErrorCallback<T> errorCallback, Object context, Watching watching)
{
- this(operation, data, callback, errorCallback, context, true, watching);
+ this(operation, data, callback, errorCallback, context, true);
}
Object getContext()
@@ -117,14 +115,6 @@ class OperationAndData<T> implements Delayed, RetrySleeper
return operation;
}
- void resetCurrentWatcher()
- {
- if ( watching != null )
- {
- watching.resetCurrentWatcher();
- }
- }
-
@Override
public void sleepFor(long time, TimeUnit unit) throws InterruptedException
{
http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
index c2d4d8e..27a3c0f 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
@@ -209,7 +209,7 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat
}
client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(),
- errorCallback, backgrounding.getContext(), !local, null), null);
+ errorCallback, backgrounding.getContext(), !local), null);
}
private void pathInForeground(final String path) throws Exception
http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
index 568f308..daa5dd3 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
@@ -19,11 +19,9 @@
package org.apache.curator.framework.imps;
-import org.apache.curator.RetryLoop;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
-import java.util.concurrent.Callable;
class Watching
{
@@ -77,14 +75,6 @@ class Watching
namespaceWatcher = new NamespaceWatcher(client, curatorWatcher, unfixedPath);
}
- if ( namespaceWatcher != null )
- {
- if ( client.getWatcherRemovalManager() != null )
- {
- client.getWatcherRemovalManager().add(namespaceWatcher);
- }
- }
-
return namespaceWatcher;
}
@@ -98,33 +88,24 @@ class Watching
return watched;
}
- <T> T callWithRetry(Callable<T> proc) throws Exception
+ void commitWatcher(int rc, boolean isExists)
{
- resetCurrentWatcher();
- try
- {
- return RetryLoop.callWithRetry(client.getZookeeperClient(), proc);
- }
- catch ( Exception e )
+ boolean doCommit = false;
+ if ( isExists )
{
- resetCurrentWatcher();
- throw e;
+ doCommit = ((rc == KeeperException.Code.OK.intValue()) || (rc == KeeperException.Code.NONODE.intValue()));
}
- }
-
- void resetCurrentWatcher()
- {
- if ( (namespaceWatcher != null) && (client.getWatcherRemovalManager() != null) )
+ else
{
- client.getWatcherRemovalManager().noteTriggeredWatcher(namespaceWatcher);
+ doCommit = (rc == KeeperException.Code.OK.intValue());
}
- }
- void checkBackroundRc(int rc)
- {
- if ( rc != KeeperException.Code.OK.intValue() )
+ if ( doCommit && (namespaceWatcher != null) )
{
- resetCurrentWatcher();
+ if ( client.getWatcherRemovalManager() != null )
+ {
+ client.getWatcherRemovalManager().add(namespaceWatcher);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCleanState.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCleanState.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCleanState.java
new file mode 100644
index 0000000..aa759ee
--- /dev/null
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCleanState.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.test.WatchersDebug;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.ZooKeeper;
+import java.util.concurrent.Callable;
+
+public class TestCleanState
+{
+ public static void closeAndTestClean(CuratorFramework client)
+ {
+ if ( client == null )
+ {
+ return;
+ }
+
+ try
+ {
+ CuratorFrameworkImpl internalClient = (CuratorFrameworkImpl)client;
+ EnsembleTracker ensembleTracker = internalClient.getEnsembleTracker();
+ if ( ensembleTracker != null )
+ {
+ while ( ensembleTracker.hasOutstanding() )
+ {
+ Thread.sleep(100);
+ }
+ ensembleTracker.close();
+ }
+ ZooKeeper zooKeeper = internalClient.getZooKeeper();
+ if ( zooKeeper != null )
+ {
+ if ( WatchersDebug.getChildWatches(zooKeeper).size() != 0 )
+ {
+ throw new AssertionError("One or more child watchers are still registered: " + WatchersDebug.getChildWatches(zooKeeper));
+ }
+ if ( WatchersDebug.getExistWatches(zooKeeper).size() != 0 )
+ {
+ throw new AssertionError("One or more exists watchers are still registered: " + WatchersDebug.getExistWatches(zooKeeper));
+ }
+ if ( WatchersDebug.getDataWatches(zooKeeper).size() != 0 )
+ {
+ throw new AssertionError("One or more data watchers are still registered: " + WatchersDebug.getDataWatches(zooKeeper));
+ }
+ }
+ }
+ catch ( IllegalStateException ignore )
+ {
+ // client already closed
+ }
+ catch ( Exception e )
+ {
+ e.printStackTrace(); // not sure what to do here
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ public static void test(CuratorFramework client, Callable<Void> proc) throws Exception
+ {
+ boolean succeeded = false;
+ try
+ {
+ proc.call();
+ succeeded = true;
+ }
+ finally
+ {
+ if ( succeeded )
+ {
+ closeAndTestClean(client);
+ }
+ else
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+ }
+
+ private TestCleanState()
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java
index cdb625d..9c405a2 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java
@@ -27,18 +27,141 @@ import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.Timing;
import org.apache.curator.test.WatchersDebug;
-import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.List;
+import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
public class TestWatcherRemovalManager extends BaseClassForTests
{
@Test
+ public void testSameWatcherDifferentPaths1Triggered() throws Exception
+ {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+ WatcherRemovalFacade removerClient = (WatcherRemovalFacade)client.newWatcherRemoveCuratorFramework();
+ final CountDownLatch latch = new CountDownLatch(1);
+ Watcher watcher = new Watcher()
+ {
+ @Override
+ public void process(WatchedEvent event)
+ {
+ latch.countDown();
+ }
+ };
+ removerClient.checkExists().usingWatcher(watcher).forPath("/a/b/c");
+ removerClient.checkExists().usingWatcher(watcher).forPath("/d/e/f");
+ removerClient.create().creatingParentsIfNeeded().forPath("/d/e/f");
+
+ Timing timing = new Timing();
+ Assert.assertTrue(timing.awaitLatch(latch));
+ timing.sleepABit();
+
+ removerClient.removeWatchers();
+ }
+ finally
+ {
+ TestCleanState.closeAndTestClean(client);
+ }
+ }
+
+ @Test
+ public void testSameWatcherDifferentPaths() throws Exception
+ {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+ WatcherRemovalFacade removerClient = (WatcherRemovalFacade)client.newWatcherRemoveCuratorFramework();
+ Watcher watcher = new Watcher()
+ {
+ @Override
+ public void process(WatchedEvent event)
+ {
+ // NOP
+ }
+ };
+ removerClient.checkExists().usingWatcher(watcher).forPath("/a/b/c");
+ removerClient.checkExists().usingWatcher(watcher).forPath("/d/e/f");
+ Assert.assertEquals(removerClient.getWatcherRemovalManager().getEntries().size(), 2);
+ removerClient.removeWatchers();
+ }
+ finally
+ {
+ TestCleanState.closeAndTestClean(client);
+ }
+ }
+
+ @Test
+ public void testSameWatcherDifferentKinds1Triggered() throws Exception
+ {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+ WatcherRemovalFacade removerClient = (WatcherRemovalFacade)client.newWatcherRemoveCuratorFramework();
+ final CountDownLatch latch = new CountDownLatch(1);
+ Watcher watcher = new Watcher()
+ {
+ @Override
+ public void process(WatchedEvent event)
+ {
+ latch.countDown();
+ }
+ };
+
+ removerClient.create().creatingParentsIfNeeded().forPath("/a/b/c");
+ removerClient.checkExists().usingWatcher(watcher).forPath("/a/b/c");
+ removerClient.getData().usingWatcher(watcher).forPath("/a/b/c");
+ removerClient.setData().forPath("/a/b/c", "new".getBytes());
+
+ Timing timing = new Timing();
+ Assert.assertTrue(timing.awaitLatch(latch));
+ timing.sleepABit();
+
+ removerClient.removeWatchers();
+ }
+ finally
+ {
+ TestCleanState.closeAndTestClean(client);
+ }
+ }
+
+ @Test
+ public void testSameWatcherDifferentKinds() throws Exception
+ {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+ WatcherRemovalFacade removerClient = (WatcherRemovalFacade)client.newWatcherRemoveCuratorFramework();
+ Watcher watcher = new Watcher()
+ {
+ @Override
+ public void process(WatchedEvent event)
+ {
+ // NOP
+ }
+ };
+
+ removerClient.create().creatingParentsIfNeeded().forPath("/a/b/c");
+ removerClient.checkExists().usingWatcher(watcher).forPath("/a/b/c");
+ removerClient.getData().usingWatcher(watcher).forPath("/a/b/c");
+ removerClient.removeWatchers();
+ }
+ finally
+ {
+ TestCleanState.closeAndTestClean(client);
+ }
+ }
+
+ @Test
public void testWithRetry() throws Exception
{
server.stop();
@@ -68,7 +191,7 @@ public class TestWatcherRemovalManager extends BaseClassForTests
}
finally
{
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -105,7 +228,7 @@ public class TestWatcherRemovalManager extends BaseClassForTests
}
finally
{
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -134,47 +257,50 @@ public class TestWatcherRemovalManager extends BaseClassForTests
{
// expected
}
- Assert.assertEquals(removerClient.getWatcherRemovalManager().getEntries().size(), 0);
+ removerClient.removeWatchers();
}
finally
{
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@Test
public void testMissingNodeInBackground() throws Exception
{
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
- try
+ final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ Callable<Void> proc = new Callable<Void>()
{
- client.start();
- WatcherRemovalFacade removerClient = (WatcherRemovalFacade)client.newWatcherRemoveCuratorFramework();
- Watcher w = new Watcher()
+ @Override
+ public Void call() throws Exception
{
- @Override
- public void process(WatchedEvent event)
+ client.start();
+ WatcherRemovalFacade removerClient = (WatcherRemovalFacade)client.newWatcherRemoveCuratorFramework();
+ Watcher w = new Watcher()
{
- // NOP
- }
- };
- final CountDownLatch latch = new CountDownLatch(1);
- BackgroundCallback callback = new BackgroundCallback()
- {
- @Override
- public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+ @Override
+ public void process(WatchedEvent event)
+ {
+ // NOP
+ }
+ };
+ final CountDownLatch latch = new CountDownLatch(1);
+ BackgroundCallback callback = new BackgroundCallback()
{
- latch.countDown();
- }
- };
- removerClient.getData().usingWatcher(w).inBackground(callback).forPath("/one/two/three");
- Assert.assertTrue(new Timing().awaitLatch(latch));
- Assert.assertEquals(removerClient.getWatcherRemovalManager().getEntries().size(), 0);
- }
- finally
- {
- CloseableUtils.closeQuietly(client);
- }
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+ {
+ latch.countDown();
+ }
+ };
+ removerClient.getData().usingWatcher(w).inBackground(callback).forPath("/one/two/three");
+ Assert.assertTrue(new Timing().awaitLatch(latch));
+ Assert.assertEquals(removerClient.getWatcherRemovalManager().getEntries().size(), 0);
+ removerClient.removeWatchers();
+ return null;
+ }
+ };
+ TestCleanState.test(client, proc);
}
@Test
@@ -188,7 +314,7 @@ public class TestWatcherRemovalManager extends BaseClassForTests
}
finally
{
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -203,7 +329,7 @@ public class TestWatcherRemovalManager extends BaseClassForTests
}
finally
{
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -222,7 +348,7 @@ public class TestWatcherRemovalManager extends BaseClassForTests
}
finally
{
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -241,7 +367,7 @@ public class TestWatcherRemovalManager extends BaseClassForTests
}
finally
{
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -252,6 +378,7 @@ public class TestWatcherRemovalManager extends BaseClassForTests
try
{
client.start();
+ client.create().forPath("/test");
WatcherRemovalFacade removerClient = (WatcherRemovalFacade)client.newWatcherRemoveCuratorFramework();
@@ -264,14 +391,15 @@ public class TestWatcherRemovalManager extends BaseClassForTests
}
};
- removerClient.getData().usingWatcher(watcher).forPath("/");
+ removerClient.getData().usingWatcher(watcher).forPath("/test");
Assert.assertEquals(removerClient.getRemovalManager().getEntries().size(), 1);
- removerClient.getData().usingWatcher(watcher).forPath("/");
+ removerClient.getData().usingWatcher(watcher).forPath("/test");
Assert.assertEquals(removerClient.getRemovalManager().getEntries().size(), 1);
+ removerClient.removeWatchers();
}
finally
{
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -308,7 +436,7 @@ public class TestWatcherRemovalManager extends BaseClassForTests
}
finally
{
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -364,7 +492,7 @@ public class TestWatcherRemovalManager extends BaseClassForTests
}
finally
{
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-recipes/pom.xml
----------------------------------------------------------------------
diff --git a/curator-recipes/pom.xml b/curator-recipes/pom.xml
index 17414c2..0443adc 100644
--- a/curator-recipes/pom.xml
+++ b/curator-recipes/pom.xml
@@ -52,6 +52,13 @@
<dependency>
<groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<scope>test</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-recipes/src/test/java/org/apache/curator/framework/imps/TestCleanState.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/imps/TestCleanState.java b/curator-recipes/src/test/java/org/apache/curator/framework/imps/TestCleanState.java
deleted file mode 100644
index f90f463..0000000
--- a/curator-recipes/src/test/java/org/apache/curator/framework/imps/TestCleanState.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.framework.imps;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.test.WatchersDebug;
-import org.apache.curator.utils.CloseableUtils;
-import org.apache.zookeeper.ZooKeeper;
-
-public class TestCleanState
-{
- public static void closeAndTestClean(CuratorFramework client)
- {
- if ( client == null )
- {
- return;
- }
-
- try
- {
- CuratorFrameworkImpl internalClient = (CuratorFrameworkImpl)client;
- EnsembleTracker ensembleTracker = internalClient.getEnsembleTracker();
- if ( ensembleTracker != null )
- {
- ensembleTracker.close();
- }
- ZooKeeper zooKeeper = internalClient.getZooKeeper();
- if ( zooKeeper != null )
- {
- if ( WatchersDebug.getChildWatches(zooKeeper).size() != 0 )
- {
- throw new AssertionError("One or more child watchers are still registered: " + WatchersDebug.getChildWatches(zooKeeper));
- }
- if ( WatchersDebug.getExistWatches(zooKeeper).size() != 0 )
- {
- throw new AssertionError("One or more exists watchers are still registered: " + WatchersDebug.getExistWatches(zooKeeper));
- }
- if ( WatchersDebug.getDataWatches(zooKeeper).size() != 0 )
- {
- throw new AssertionError("One or more data watchers are still registered: " + WatchersDebug.getDataWatches(zooKeeper));
- }
- }
- }
- catch ( IllegalStateException ignore )
- {
- // client already closed
- }
- catch ( Exception e )
- {
- e.printStackTrace(); // not sure what to do here
- }
- finally
- {
- CloseableUtils.closeQuietly(client);
- }
- }
-
- private TestCleanState()
- {
- }
-}
http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c89092d..384b310 100644
--- a/pom.xml
+++ b/pom.xml
@@ -322,6 +322,13 @@
<dependency>
<groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ <type>test-jar</type>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>${project.version}</version>
</dependency>
[5/5] curator git commit: it takes time for the watcher removals to
settle due to async/watchers, etc. So, if there are remaining watchers,
sleep a bit
Posted by ca...@apache.org.
it takes time for the watcher removals to settle due to async/watchers, etc. So, if there are remaining watchers, sleep a bit
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/e2200daa
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/e2200daa
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/e2200daa
Branch: refs/heads/CURATOR-3.0
Commit: e2200daad2b04d721a99374686b52758bc012e12
Parents: 44fd666
Author: randgalt <ra...@apache.org>
Authored: Tue May 31 10:35:10 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue May 31 10:35:10 2016 -0500
----------------------------------------------------------------------
.../curator/framework/imps/TestCleanState.java | 42 +++++++++++++++-----
1 file changed, 32 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/e2200daa/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCleanState.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCleanState.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCleanState.java
index aa759ee..9d90616 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCleanState.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCleanState.java
@@ -48,17 +48,39 @@ public class TestCleanState
ZooKeeper zooKeeper = internalClient.getZooKeeper();
if ( zooKeeper != null )
{
- if ( WatchersDebug.getChildWatches(zooKeeper).size() != 0 )
+ final int maxLoops = 3;
+ for ( int i = 0; i < maxLoops; ++i ) // it takes time for the watcher removals to settle due to async/watchers, etc. So, if there are remaining watchers, sleep a bit
{
- throw new AssertionError("One or more child watchers are still registered: " + WatchersDebug.getChildWatches(zooKeeper));
- }
- if ( WatchersDebug.getExistWatches(zooKeeper).size() != 0 )
- {
- throw new AssertionError("One or more exists watchers are still registered: " + WatchersDebug.getExistWatches(zooKeeper));
- }
- if ( WatchersDebug.getDataWatches(zooKeeper).size() != 0 )
- {
- throw new AssertionError("One or more data watchers are still registered: " + WatchersDebug.getDataWatches(zooKeeper));
+ if ( i > 0 )
+ {
+ Thread.sleep(500);
+ }
+ boolean isLast = (i + 1) == maxLoops;
+ if ( WatchersDebug.getChildWatches(zooKeeper).size() != 0 )
+ {
+ if ( isLast )
+ {
+ throw new AssertionError("One or more child watchers are still registered: " + WatchersDebug.getChildWatches(zooKeeper));
+ }
+ continue;
+ }
+ if ( WatchersDebug.getExistWatches(zooKeeper).size() != 0 )
+ {
+ if ( isLast )
+ {
+ throw new AssertionError("One or more exists watchers are still registered: " + WatchersDebug.getExistWatches(zooKeeper));
+ }
+ continue;
+ }
+ if ( WatchersDebug.getDataWatches(zooKeeper).size() != 0 )
+ {
+ if ( isLast )
+ {
+ throw new AssertionError("One or more data watchers are still registered: " + WatchersDebug.getDataWatches(zooKeeper));
+ }
+ continue;
+ }
+ break;
}
}
}
[3/5] curator git commit: if the background call happens after
closure, remove any watchers
Posted by ca...@apache.org.
if the background call happens after closure, remove any watchers
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/4fb08957
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/4fb08957
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/4fb08957
Branch: refs/heads/CURATOR-3.0
Commit: 4fb089575480befa51174d611d141f4d4fb03fff
Parents: c7f3b00
Author: randgalt <ra...@apache.org>
Authored: Mon May 30 21:13:16 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon May 30 21:13:16 2016 -0500
----------------------------------------------------------------------
.../apache/curator/framework/recipes/cache/PathChildrenCache.java | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/4fb08957/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
index eb98e6a..91a3a98 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
@@ -493,6 +493,7 @@ public class PathChildrenCache implements Closeable
{
if (PathChildrenCache.this.state.get().equals(State.CLOSED)) {
// This ship is closed, don't handle the callback
+ PathChildrenCache.this.client.removeWatchers();
return;
}
if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
[4/5] curator git commit: Don't exit test until watchers have hit so
that the watcher removal code is stable
Posted by ca...@apache.org.
Don't exit test until watchers have hit so that the watcher removal code is stable
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/44fd666a
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/44fd666a
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/44fd666a
Branch: refs/heads/CURATOR-3.0
Commit: 44fd666a366999ae9503dff6866bb8aa37b2d604
Parents: 4fb0895
Author: randgalt <ra...@apache.org>
Authored: Mon May 30 21:31:21 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon May 30 21:31:21 2016 -0500
----------------------------------------------------------------------
.../recipes/shared/TestSharedCount.java | 37 ++++++++++++++++++++
1 file changed, 37 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/44fd666a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
index 16134ed..28df3f9 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
@@ -163,10 +163,29 @@ public class TestSharedCount extends BaseClassForTests
client.start();
count.start();
+ final CountDownLatch setLatch = new CountDownLatch(3);
+ SharedCountListener listener = new SharedCountListener()
+ {
+ @Override
+ public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception
+ {
+ setLatch.countDown();
+ }
+
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ // nop
+ }
+ };
+ count.addListener(listener);
+
Assert.assertTrue(count.trySetCount(1));
Assert.assertTrue(count.trySetCount(2));
Assert.assertTrue(count.trySetCount(10));
Assert.assertEquals(count.getCount(), 10);
+
+ Assert.assertTrue(new Timing().awaitLatch(setLatch));
}
finally
{
@@ -242,12 +261,30 @@ public class TestSharedCount extends BaseClassForTests
Assert.assertTrue(count2.trySetCount(versionedValue, 20));
timing.sleepABit();
+ final CountDownLatch setLatch = new CountDownLatch(2);
+ SharedCountListener listener = new SharedCountListener()
+ {
+ @Override
+ public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception
+ {
+ setLatch.countDown();
+ }
+
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ // nop
+ }
+ };
+ count1.addListener(listener);
VersionedValue<Integer> versionedValue1 = count1.getVersionedValue();
VersionedValue<Integer> versionedValue2 = count2.getVersionedValue();
Assert.assertTrue(count2.trySetCount(versionedValue2, 30));
Assert.assertFalse(count1.trySetCount(versionedValue1, 40));
+
versionedValue1 = count1.getVersionedValue();
Assert.assertTrue(count1.trySetCount(versionedValue1, 40));
+ Assert.assertTrue(timing.awaitLatch(setLatch));
}
finally
{
[2/5] curator git commit: safer way to set outstanding
Posted by ca...@apache.org.
safer way to set outstanding
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/c7f3b006
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/c7f3b006
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/c7f3b006
Branch: refs/heads/CURATOR-3.0
Commit: c7f3b006fc3fd0fde76dca4cf55d468245ecf335
Parents: f59f23c
Author: randgalt <ra...@apache.org>
Authored: Thu May 26 17:18:17 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Thu May 26 17:18:17 2016 -0500
----------------------------------------------------------------------
.../java/org/apache/curator/framework/imps/EnsembleTracker.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/c7f3b006/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
index 0b93cab..2e020d4 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
@@ -149,11 +149,11 @@ public class EnsembleTracker implements Closeable, CuratorWatcher
try
{
client.getConfig().usingWatcher(this).inBackground(backgroundCallback).forEnsemble();
+ outstanding.incrementAndGet(); // finally block will decrement
}
- catch ( Exception e )
+ finally
{
outstanding.decrementAndGet();
- throw e;
}
}
}