You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2019/11/12 14:52:39 UTC
[curator] 02/04: CURATOR-549
This is an automated email from the ASF dual-hosted git repository.
randgalt pushed a commit to branch CURATOR-549-zk36-persistent-watcher-recipes
in repository https://gitbox.apache.org/repos/asf/curator.git
commit e517ea4139cd4caef8e0dc13d99bc8204f853c73
Author: randgalt <ra...@apache.org>
AuthorDate: Sat Nov 2 11:40:44 2019 -0500
CURATOR-549
Bring Curator up to ZooKeeper 3.5.6 in preparation for supporting persistent recursive watchers while maintaining background compatability with previous versions of ZK. Added a new module to make sure we maintain compatibility with ZK 3.5.x. ZooKeeper 3.6.0 has some significant changes from previous versions. The reconfig APIs have moved into a new class, ZooKeeperAdmin. This class existed in 3.5.x but wasn't required. Now it is. A bunch of little things changed in the ZK server code [...]
There is a new module, curator-test-zk35. It forces ZooKeeper 3.5.6 and performs selected tests from the other modules to ensure compatibility. Tests annotated with TestNG groups zk35 and zk35Compatibility are tested. Group zk36 is excluded. Note: these tests will only run from Maven. I don't think IntelliJ/Eclipse support the Maven syntax I used.
Support persistent watchers in ZK 3.6+ while maintaining background compatability with previous versions of ZK. Added a new module to make sure we maintain comaptibility with ZK 3.5.x
---
.../curator/framework/imps/TestWatchesBuilder.java | 431 +++++++++++----------
1 file changed, 222 insertions(+), 209 deletions(-)
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatchesBuilder.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatchesBuilder.java
index 26c41f1..b65e85c 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatchesBuilder.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatchesBuilder.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.curator.framework.imps;
import org.apache.curator.framework.CuratorFramework;
@@ -55,7 +56,7 @@ public class TestWatchesBuilder extends CuratorTestBase
final AtomicReference<ConnectionState> state = new AtomicReference<ConnectionState>();
client.getConnectionStateListenable().addListener(new ConnectionStateListener()
{
-
+
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
@@ -66,13 +67,13 @@ public class TestWatchesBuilder extends CuratorTestBase
}
}
});
-
+
return state;
}
-
+
private boolean blockUntilDesiredConnectionState(AtomicReference<ConnectionState> stateRef, Timing timing, final ConnectionState desiredState)
{
- if(stateRef.get() == desiredState)
+ if ( stateRef.get() == desiredState )
{
return true;
}
@@ -80,55 +81,55 @@ public class TestWatchesBuilder extends CuratorTestBase
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized(stateRef)
{
- if(stateRef.get() == desiredState)
+ if ( stateRef.get() == desiredState )
{
return true;
}
-
+
try
{
stateRef.wait(timing.milliseconds());
return stateRef.get() == desiredState;
}
- catch(InterruptedException e)
+ catch ( InterruptedException e )
{
Thread.currentThread().interrupt();
return false;
}
}
}
-
+
@Test
public void testRemoveCuratorDefaultWatcher() throws Exception
{
Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.builder().
- connectString(server.getConnectString()).
- retryPolicy(new RetryOneTime(1)).
- build();
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
try
{
client.start();
-
+
final CountDownLatch removedLatch = new CountDownLatch(1);
-
- final String path = "/";
+
+ final String path = "/";
client.getCuratorListenable().addListener(new CuratorListener()
- {
+ {
@Override
- public void eventReceived(CuratorFramework client, CuratorEvent event)
- throws Exception
+ public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception
{
- if(event.getType() == CuratorEventType.WATCHED && event.getWatchedEvent().getType() == EventType.DataWatchRemoved) {
+ if ( event.getType() == CuratorEventType.WATCHED && event.getWatchedEvent().getType() == EventType.DataWatchRemoved )
+ {
removedLatch.countDown();
- }
+ }
}
});
-
+
client.checkExists().watched().forPath(path);
-
+
client.watches().removeAll().forPath(path);
-
+
Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");
}
finally
@@ -136,34 +137,35 @@ public class TestWatchesBuilder extends CuratorTestBase
CloseableUtils.closeQuietly(client);
}
}
-
+
@Test
public void testRemoveCuratorWatch() throws Exception
- {
+ {
Timing timing = new Timing();
CuratorFrameworkImpl client = (CuratorFrameworkImpl)CuratorFrameworkFactory.builder().
- connectString(server.getConnectString()).
- retryPolicy(new RetryOneTime(1)).
- build();
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
try
{
client.start();
-
+
final CountDownLatch removedLatch = new CountDownLatch(1);
-
- final String path = "/";
+
+ final String path = "/";
CuratorWatcher watcher = new CuratorWatcher()
{
-
+
@Override
public void process(WatchedEvent event) throws Exception
{
- if(event.getPath().equals(path) && event.getType() == EventType.DataWatchRemoved) {
+ if ( event.getPath().equals(path) && event.getType() == EventType.DataWatchRemoved )
+ {
removedLatch.countDown();
}
}
};
-
+
client.checkExists().usingWatcher(watcher).forPath(path);
client.watches().remove(watcher).forPath(path);
@@ -174,25 +176,25 @@ public class TestWatchesBuilder extends CuratorTestBase
{
CloseableUtils.closeQuietly(client);
}
- }
-
+ }
+
@Test
public void testRemoveWatch() throws Exception
- {
+ {
Timing timing = new Timing();
CuratorFrameworkImpl client = (CuratorFrameworkImpl)CuratorFrameworkFactory.builder().
- connectString(server.getConnectString()).
- retryPolicy(new RetryOneTime(1)).
- build();
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
try
{
client.start();
-
+
final CountDownLatch removedLatch = new CountDownLatch(1);
-
- final String path = "/";
+
+ final String path = "/";
Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved);
-
+
client.checkExists().usingWatcher(watcher).forPath(path);
client.watches().remove(watcher).forPath(path);
@@ -204,97 +206,97 @@ public class TestWatchesBuilder extends CuratorTestBase
CloseableUtils.closeQuietly(client);
}
}
-
+
@Test
public void testRemoveWatchInBackgroundWithCallback() throws Exception
- {
+ {
Timing timing = new Timing();
CuratorFrameworkImpl client = (CuratorFrameworkImpl)CuratorFrameworkFactory.builder().
- connectString(server.getConnectString()).
- retryPolicy(new RetryOneTime(1)).
- build();
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
try
- {
+ {
client.start();
-
+
//Make sure that the event fires on both the watcher and the callback.
final CountDownLatch removedLatch = new CountDownLatch(2);
final String path = "/";
Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved);
-
+
BackgroundCallback callback = new BackgroundCallback()
{
-
+
@Override
- public void processResult(CuratorFramework client, CuratorEvent event)
- throws Exception
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
- if(event.getType() == CuratorEventType.REMOVE_WATCHES && event.getPath().equals(path)) {
+ if ( event.getType() == CuratorEventType.REMOVE_WATCHES && event.getPath().equals(path) )
+ {
removedLatch.countDown();
}
}
};
-
+
client.checkExists().usingWatcher(watcher).forPath(path);
client.watches().remove(watcher).ofType(WatcherType.Any).inBackground(callback).forPath(path);
Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");
-
+
}
finally
{
CloseableUtils.closeQuietly(client);
}
}
-
+
@Test
public void testRemoveWatchInBackgroundWithNoCallback() throws Exception
- {
+ {
Timing timing = new Timing();
CuratorFrameworkImpl client = (CuratorFrameworkImpl)CuratorFrameworkFactory.builder().
- connectString(server.getConnectString()).
- retryPolicy(new RetryOneTime(1)).
- build();
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
try
{
client.start();
-
+
final String path = "/";
final CountDownLatch removedLatch = new CountDownLatch(1);
Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved);
-
+
client.checkExists().usingWatcher(watcher).forPath(path);
client.watches().remove(watcher).inBackground().forPath(path);
Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");
-
+
}
finally
{
CloseableUtils.closeQuietly(client);
}
- }
-
+ }
+
@Test
public void testRemoveAllWatches() throws Exception
- {
+ {
Timing timing = new Timing();
CuratorFrameworkImpl client = (CuratorFrameworkImpl)CuratorFrameworkFactory.builder().
- connectString(server.getConnectString()).
- retryPolicy(new RetryOneTime(1)).
- build();
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
try
{
client.start();
-
+
final String path = "/";
final CountDownLatch removedLatch = new CountDownLatch(2);
-
- Watcher watcher1 = new CountDownWatcher(path, removedLatch, EventType.ChildWatchRemoved);
- Watcher watcher2 = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved);
-
+
+ Watcher watcher1 = new CountDownWatcher(path, removedLatch, EventType.ChildWatchRemoved);
+ Watcher watcher2 = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved);
+
client.getChildren().usingWatcher(watcher1).forPath(path);
client.checkExists().usingWatcher(watcher2).forPath(path);
@@ -306,32 +308,32 @@ public class TestWatchesBuilder extends CuratorTestBase
{
CloseableUtils.closeQuietly(client);
}
- }
-
+ }
+
@Test
public void testRemoveAllDataWatches() throws Exception
- {
+ {
Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.builder().
- connectString(server.getConnectString()).
- retryPolicy(new RetryOneTime(1)).
- build();
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
try
{
client.start();
-
+
final String path = "/";
final AtomicBoolean removedFlag = new AtomicBoolean(false);
final CountDownLatch removedLatch = new CountDownLatch(1);
-
- Watcher watcher1 = new BooleanWatcher(path, removedFlag, EventType.ChildWatchRemoved);
- Watcher watcher2 = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved);
-
+
+ Watcher watcher1 = new BooleanWatcher(path, removedFlag, EventType.ChildWatchRemoved);
+ Watcher watcher2 = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved);
+
client.getChildren().usingWatcher(watcher1).forPath(path);
client.checkExists().usingWatcher(watcher2).forPath(path);
-
+
client.watches().removeAll().ofType(WatcherType.Data).forPath(path);
-
+
Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");
Assert.assertEquals(removedFlag.get(), false);
}
@@ -340,31 +342,31 @@ public class TestWatchesBuilder extends CuratorTestBase
CloseableUtils.closeQuietly(client);
}
}
-
+
@Test
public void testRemoveAllChildWatches() throws Exception
- {
+ {
Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.builder().
- connectString(server.getConnectString()).
- retryPolicy(new RetryOneTime(1)).
- build();
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
try
{
client.start();
-
+
final String path = "/";
final AtomicBoolean removedFlag = new AtomicBoolean(false);
final CountDownLatch removedLatch = new CountDownLatch(1);
-
- Watcher watcher1 = new BooleanWatcher(path, removedFlag, EventType.DataWatchRemoved);
- Watcher watcher2 = new CountDownWatcher(path, removedLatch, EventType.ChildWatchRemoved);
-
+
+ Watcher watcher1 = new BooleanWatcher(path, removedFlag, EventType.DataWatchRemoved);
+ Watcher watcher2 = new CountDownWatcher(path, removedLatch, EventType.ChildWatchRemoved);
+
client.checkExists().usingWatcher(watcher1).forPath(path);
client.getChildren().usingWatcher(watcher2).forPath(path);
-
+
client.watches().removeAll().ofType(WatcherType.Children).forPath(path);
-
+
Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");
Assert.assertEquals(removedFlag.get(), false);
}
@@ -372,34 +374,35 @@ public class TestWatchesBuilder extends CuratorTestBase
{
CloseableUtils.closeQuietly(client);
}
- }
-
+ }
+
@Test
- public void testRemoveLocalWatch() throws Exception {
+ public void testRemoveLocalWatch() throws Exception
+ {
Timing timing = new Timing();
CuratorFrameworkImpl client = (CuratorFrameworkImpl)CuratorFrameworkFactory.builder().
- connectString(server.getConnectString()).
- retryPolicy(new RetryOneTime(1)).
- build();
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
try
{
client.start();
-
+
AtomicReference<ConnectionState> stateRef = registerConnectionStateListener(client);
-
+
final String path = "/";
-
+
final CountDownLatch removedLatch = new CountDownLatch(1);
-
- Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved);
-
+
+ Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved);
+
client.checkExists().usingWatcher(watcher).forPath(path);
//Stop the server so we can check if we can remove watches locally when offline
server.stop();
-
+
Assert.assertTrue(blockUntilDesiredConnectionState(stateRef, timing, ConnectionState.SUSPENDED));
-
+
client.watches().removeAll().locally().forPath(path);
Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");
@@ -409,33 +412,34 @@ public class TestWatchesBuilder extends CuratorTestBase
CloseableUtils.closeQuietly(client);
}
}
-
+
@Test
- public void testRemoveLocalWatchInBackground() throws Exception {
+ public void testRemoveLocalWatchInBackground() throws Exception
+ {
Timing timing = new Timing();
CuratorFrameworkImpl client = (CuratorFrameworkImpl)CuratorFrameworkFactory.builder().
- connectString(server.getConnectString()).
- retryPolicy(new RetryOneTime(1)).
- build();
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
try
{
client.start();
-
+
AtomicReference<ConnectionState> stateRef = registerConnectionStateListener(client);
-
+
final String path = "/";
-
+
final CountDownLatch removedLatch = new CountDownLatch(1);
-
- Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved);
-
+
+ Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved);
+
client.checkExists().usingWatcher(watcher).forPath(path);
//Stop the server so we can check if we can remove watches locally when offline
server.stop();
-
+
Assert.assertTrue(blockUntilDesiredConnectionState(stateRef, timing, ConnectionState.SUSPENDED));
-
+
client.watches().removeAll().locally().inBackground().forPath(path);
Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");
@@ -444,8 +448,8 @@ public class TestWatchesBuilder extends CuratorTestBase
{
CloseableUtils.closeQuietly(client);
}
- }
-
+ }
+
/**
* Test the case where we try and remove an unregistered watcher. In this case we expect a NoWatcherException to
* be thrown.
@@ -455,21 +459,22 @@ public class TestWatchesBuilder extends CuratorTestBase
public void testRemoveUnregisteredWatcher() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.builder().
- connectString(server.getConnectString()).
- retryPolicy(new RetryOneTime(1)).
- build();
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
try
{
client.start();
-
- final String path = "/";
- Watcher watcher = new Watcher() {
+
+ final String path = "/";
+ Watcher watcher = new Watcher()
+ {
@Override
public void process(WatchedEvent event)
{
- }
+ }
};
-
+
try
{
client.watches().remove(watcher).forPath(path);
@@ -485,7 +490,7 @@ public class TestWatchesBuilder extends CuratorTestBase
CloseableUtils.closeQuietly(client);
}
}
-
+
/**
* Test the case where we try and remove an unregistered watcher but have the quietly flag set. In this case we expect success.
* @throws Exception
@@ -495,22 +500,22 @@ public class TestWatchesBuilder extends CuratorTestBase
{
Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.builder().
- connectString(server.getConnectString()).
- retryPolicy(new RetryOneTime(1)).
- build();
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
try
{
client.start();
-
+
final AtomicBoolean watcherRemoved = new AtomicBoolean(false);
-
- final String path = "/";
+
+ final String path = "/";
Watcher watcher = new BooleanWatcher(path, watcherRemoved, EventType.DataWatchRemoved);
-
+
client.watches().remove(watcher).quietly().forPath(path);
-
+
timing.sleepABit();
-
+
//There should be no watcher removed as none were registered.
Assert.assertEquals(watcherRemoved.get(), false);
}
@@ -519,94 +524,94 @@ public class TestWatchesBuilder extends CuratorTestBase
CloseableUtils.closeQuietly(client);
}
}
-
+
@Test
- public void testGuaranteedRemoveWatch() throws Exception {
+ public void testGuaranteedRemoveWatch() throws Exception
+ {
Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.builder().
- connectString(server.getConnectString()).
- retryPolicy(new RetryOneTime(1)).
- build();
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
try
{
client.start();
-
+
AtomicReference<ConnectionState> stateRef = registerConnectionStateListener(client);
-
+
String path = "/";
-
+
CountDownLatch removeLatch = new CountDownLatch(1);
-
- Watcher watcher = new CountDownWatcher(path, removeLatch, EventType.DataWatchRemoved);
+
+ Watcher watcher = new CountDownWatcher(path, removeLatch, EventType.DataWatchRemoved);
client.checkExists().usingWatcher(watcher).forPath(path);
-
- server.stop();
-
+
+ server.stop();
+
Assert.assertTrue(blockUntilDesiredConnectionState(stateRef, timing, ConnectionState.SUSPENDED));
-
+
//Remove the watch while we're not connected
- try
+ try
{
client.watches().remove(watcher).guaranteed().forPath(path);
Assert.fail();
}
- catch(KeeperException.ConnectionLossException e)
+ catch ( KeeperException.ConnectionLossException e )
{
//Expected
}
-
+
server.restart();
-
- timing.awaitLatch(removeLatch);
+
+ timing.awaitLatch(removeLatch);
}
finally
{
CloseableUtils.closeQuietly(client);
}
}
-
+
@Test
- public void testGuaranteedRemoveWatchInBackground() throws Exception {
+ public void testGuaranteedRemoveWatchInBackground() throws Exception
+ {
Timing timing = new Timing();
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(),
- new ExponentialBackoffRetry(100, 3));
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new ExponentialBackoffRetry(100, 3));
try
{
client.start();
-
+
AtomicReference<ConnectionState> stateRef = registerConnectionStateListener(client);
-
+
final CountDownLatch guaranteeAddedLatch = new CountDownLatch(1);
-
+
((CuratorFrameworkImpl)client).getFailedRemoveWatcherManager().debugListener = new FailedOperationManager.FailedOperationManagerListener<FailedRemoveWatchManager.FailedRemoveWatchDetails>()
{
@Override
- public void pathAddedForGuaranteedOperation(
- FailedRemoveWatchDetails detail)
+ public void pathAddedForGuaranteedOperation(FailedRemoveWatchDetails detail)
{
guaranteeAddedLatch.countDown();
}
};
-
+
String path = "/";
-
+
CountDownLatch removeLatch = new CountDownLatch(1);
-
- Watcher watcher = new CountDownWatcher(path, removeLatch, EventType.DataWatchRemoved);
+
+ Watcher watcher = new CountDownWatcher(path, removeLatch, EventType.DataWatchRemoved);
client.checkExists().usingWatcher(watcher).forPath(path);
-
- server.stop();
+
+ server.stop();
Assert.assertTrue(blockUntilDesiredConnectionState(stateRef, timing, ConnectionState.SUSPENDED));
-
+
//Remove the watch while we're not connected
client.watches().remove(watcher).guaranteed().inBackground().forPath(path);
-
+
timing.awaitLatch(guaranteeAddedLatch);
-
+
server.restart();
-
- timing.awaitLatch(removeLatch);
+
+ timing.awaitLatch(removeLatch);
}
finally
{
@@ -617,7 +622,7 @@ public class TestWatchesBuilder extends CuratorTestBase
@Test(groups = CuratorTestBase.zk36Group)
public void testPersistentRecursiveWatch() throws Exception
{
- try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
+ try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)))
{
client.start();
client.blockUntilConnected();
@@ -647,7 +652,7 @@ public class TestWatchesBuilder extends CuratorTestBase
};
return new ZooKeeper(connectString, sessionTimeout, actualWatcher);
};
- try (CuratorFramework client = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).zookeeperFactory(zookeeperFactory).build() )
+ try (CuratorFramework client = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).zookeeperFactory(zookeeperFactory).build())
{
client.start();
client.blockUntilConnected();
@@ -664,51 +669,59 @@ public class TestWatchesBuilder extends CuratorTestBase
}
}
- private static class CountDownWatcher implements Watcher {
+ private static class CountDownWatcher implements Watcher
+ {
private String path;
private EventType eventType;
private CountDownLatch removeLatch;
-
- public CountDownWatcher(String path, CountDownLatch removeLatch, EventType eventType) {
+
+ public CountDownWatcher(String path, CountDownLatch removeLatch, EventType eventType)
+ {
this.path = path;
this.eventType = eventType;
- this.removeLatch = removeLatch;
+ this.removeLatch = removeLatch;
}
-
+
@Override
public void process(WatchedEvent event)
{
- if(event.getPath() == null || event.getType() == null) {
+ if ( event.getPath() == null || event.getType() == null )
+ {
return;
}
-
- if(event.getPath().equals(path) && event.getType() == eventType) {
+
+ if ( event.getPath().equals(path) && event.getType() == eventType )
+ {
removeLatch.countDown();
}
- }
+ }
}
-
- private static class BooleanWatcher implements Watcher {
+
+ private static class BooleanWatcher implements Watcher
+ {
private String path;
private EventType eventType;
private AtomicBoolean removedFlag;
-
- public BooleanWatcher(String path, AtomicBoolean removedFlag, EventType eventType) {
+
+ public BooleanWatcher(String path, AtomicBoolean removedFlag, EventType eventType)
+ {
this.path = path;
this.eventType = eventType;
- this.removedFlag = removedFlag;
+ this.removedFlag = removedFlag;
}
-
+
@Override
public void process(WatchedEvent event)
{
- if(event.getPath() == null || event.getType() == null) {
+ if ( event.getPath() == null || event.getType() == null )
+ {
return;
}
-
- if(event.getPath().equals(path) && event.getType() == eventType) {
+
+ if ( event.getPath().equals(path) && event.getType() == eventType )
+ {
removedFlag.set(true);
}
- }
- }
+ }
+ }
}