You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2020/02/12 22:10:18 UTC
[curator] 01/02: CURATOR-558
This is an automated email from the ASF dual-hosted git repository.
randgalt pushed a commit to branch CURATOR-558-pt1-remove-zk40-etc
in repository https://gitbox.apache.org/repos/asf/curator.git
commit 5a04ba4c0861a60192e1cd3e1d10875c4b03ea72
Author: randgalt <ra...@apache.org>
AuthorDate: Sun Feb 2 11:34:34 2020 -0500
CURATOR-558
Pt1 of change
* Remove the ZK 3.4 compatibility module and code
* Remove the deprecated ListenerContainer that leaks Guava classes into our APIs
* Remove Exhibitor support
* Various minor changes/cleanups
---
.../exhibitor/DefaultExhibitorRestClient.java | 68 ---
.../exhibitor/ExhibitorEnsembleProvider.java | 335 -----------
.../ensemble/exhibitor/ExhibitorRestClient.java | 34 --
.../curator/ensemble/exhibitor/Exhibitors.java | 66 ---
.../org/apache/curator/utils/Compatibility.java | 101 ----
.../curator/utils/InjectSessionExpiration.java | 77 ---
.../test/java/org/apache/curator/BasicTests.java | 4 +-
.../apache/curator/TestSessionFailRetryLoop.java | 10 +-
.../exhibitor/TestExhibitorEnsembleProvider.java | 227 --------
.../apache/curator/framework/CuratorFramework.java | 7 -
.../curator/framework/CuratorFrameworkFactory.java | 22 -
.../apache/curator/framework/SafeIsTtlMode.java | 44 --
.../framework/imps/CompatibleCreateCallback.java | 26 -
.../curator/framework/imps/CreateBuilderImpl.java | 78 +--
.../apache/curator/framework/imps/CreateZK35.java | 47 --
.../framework/imps/CuratorFrameworkImpl.java | 83 +--
.../imps/CuratorMultiTransactionImpl.java | 18 +-
.../framework/imps/WatcherRemovalManager.java | 7 +-
.../framework/listen/ListenerContainer.java | 112 ----
.../framework/listen/MappingListenerManager.java | 2 +-
.../framework/state/ConnectionStateManager.java | 3 +-
.../curator/framework/imps/TestCleanState.java | 7 -
.../framework/imps/TestCreateReturningStat.java | 2 -
.../imps/TestEnabledSessionExpiredState.java | 3 +-
.../curator/framework/imps/TestFramework.java | 2 -
.../curator/framework/imps/TestFrameworkEdges.java | 13 +-
.../framework/imps/TestReconfiguration.java | 2 -
.../curator/framework/imps/TestRemoveWatches.java | 2 -
.../curator/framework/imps/TestTtlNodes.java | 2 -
.../framework/imps/TestWatcherRemovalManager.java | 2 -
.../curator/framework/recipes/cache/NodeCache.java | 35 +-
.../framework/recipes/cache/PathChildrenCache.java | 39 +-
.../curator/framework/recipes/cache/TreeCache.java | 49 +-
.../framework/recipes/leader/LeaderLatch.java | 25 +-
.../framework/recipes/locks/ChildReaper.java | 307 ----------
.../curator/framework/recipes/locks/Reaper.java | 380 ------------
.../framework/recipes/nodes/PersistentNode.java | 41 +-
.../recipes/queue/DistributedDelayQueue.java | 4 +-
.../recipes/queue/DistributedIdQueue.java | 4 +-
.../recipes/queue/DistributedPriorityQueue.java | 4 +-
.../framework/recipes/queue/DistributedQueue.java | 63 +-
.../curator/framework/recipes/queue/QueueBase.java | 4 +-
.../framework/recipes/shared/SharedValue.java | 52 +-
.../recipes/shared/SharedValueReader.java | 4 +-
.../framework/recipes/cache/TestNodeCache.java | 4 +-
.../recipes/cache/TestPathChildrenCache.java | 4 +-
.../framework/recipes/cache/TestTreeCache.java | 6 +-
.../recipes/leader/TestLeaderSelector.java | 4 +-
.../framework/recipes/locks/TestChildReaper.java | 351 -----------
.../recipes/locks/TestInterProcessMutex.java | 4 +-
.../recipes/locks/TestInterProcessMutexBase.java | 4 +-
.../recipes/locks/TestInterProcessSemaphore.java | 47 --
.../framework/recipes/locks/TestReaper.java | 647 ---------------------
.../recipes/nodes/TestPersistentEphemeralNode.java | 10 +-
.../recipes/nodes/TestPersistentTtlNode.java | 2 -
.../test/compatibility/CuratorTestBase.java | 1 -
.../test/compatibility/Zk35MethodInterceptor.java | 56 --
.../x/async/modeled/details/ModeledCacheImpl.java | 21 +-
.../x/discovery/details/ServiceCacheImpl.java | 79 +--
.../x/discovery/details/TestServiceDiscovery.java | 8 +-
pom.xml | 6 -
src/site/confluence/exhibitor.confluence | 24 -
...y.confluence => zk-compatibility-34.confluence} | 20 +-
src/site/site.xml | 7 +-
64 files changed, 237 insertions(+), 3485 deletions(-)
diff --git a/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/DefaultExhibitorRestClient.java b/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/DefaultExhibitorRestClient.java
deleted file mode 100644
index 01b9525..0000000
--- a/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/DefaultExhibitorRestClient.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.ensemble.exhibitor;
-
-import org.apache.curator.utils.CloseableUtils;
-import java.io.BufferedInputStream;
-import java.io.InputStream;
-import java.net.HttpURLConnection;
-import java.net.URI;
-
-@SuppressWarnings("UnusedDeclaration")
-public class DefaultExhibitorRestClient implements ExhibitorRestClient
-{
- private final boolean useSsl;
-
- public DefaultExhibitorRestClient()
- {
- this(false);
- }
-
- public DefaultExhibitorRestClient(boolean useSsl)
- {
- this.useSsl = useSsl;
- }
-
- @Override
- public String getRaw(String hostname, int port, String uriPath, String mimeType) throws Exception
- {
- URI uri = new URI(useSsl ? "https" : "http", null, hostname, port, uriPath, null, null);
- HttpURLConnection connection = (HttpURLConnection)uri.toURL().openConnection();
- connection.addRequestProperty("Accept", mimeType);
- StringBuilder str = new StringBuilder();
- InputStream in = new BufferedInputStream(connection.getInputStream());
- try
- {
- for(;;)
- {
- int b = in.read();
- if ( b < 0 )
- {
- break;
- }
- str.append((char)(b & 0xff));
- }
- }
- finally
- {
- CloseableUtils.closeQuietly(in);
- }
- return str.toString();
- }
-}
diff --git a/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/ExhibitorEnsembleProvider.java b/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/ExhibitorEnsembleProvider.java
deleted file mode 100644
index eb4f8bd..0000000
--- a/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/ExhibitorEnsembleProvider.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.ensemble.exhibitor;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.curator.RetryLoop;
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.ensemble.EnsembleProvider;
-import org.apache.curator.utils.ThreadUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.URLDecoder;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * Ensemble provider that polls a cluster of Exhibitor (https://github.com/Netflix/exhibitor)
- * instances for the connection string.
- * If the set of instances should change, new ZooKeeper connections will use the new connection
- * string.
- */
-public class ExhibitorEnsembleProvider implements EnsembleProvider
-{
- private final Logger log = LoggerFactory.getLogger(getClass());
- private final AtomicReference<Exhibitors> exhibitors = new AtomicReference<Exhibitors>();
- private final AtomicReference<Exhibitors> masterExhibitors = new AtomicReference<Exhibitors>();
- private final ExhibitorRestClient restClient;
- private final String restUriPath;
- private final int pollingMs;
- private final RetryPolicy retryPolicy;
- private final ScheduledExecutorService service = ThreadUtils.newSingleThreadScheduledExecutor("ExhibitorEnsembleProvider");
- private final Random random = new Random();
- private final AtomicReference<String> connectionString = new AtomicReference<String>("");
- private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
-
- private static final String MIME_TYPE = "application/x-www-form-urlencoded";
-
- private static final String VALUE_PORT = "port";
- private static final String VALUE_COUNT = "count";
- private static final String VALUE_SERVER_PREFIX = "server";
-
- private enum State
- {
- LATENT,
- STARTED,
- CLOSED
- }
-
- /**
- * @param exhibitors the current set of exhibitor instances (can be changed later via {@link #setExhibitors(Exhibitors)})
- * @param restClient the rest client to use (use {@link DefaultExhibitorRestClient} for most cases)
- * @param restUriPath the path of the REST call used to get the server set. Usually: <code>/exhibitor/v1/cluster/list</code>
- * @param pollingMs how ofter to poll the exhibitors for the list
- * @param retryPolicy retry policy to use when connecting to the exhibitors
- */
- public ExhibitorEnsembleProvider(Exhibitors exhibitors, ExhibitorRestClient restClient, String restUriPath, int pollingMs, RetryPolicy retryPolicy)
- {
- this.exhibitors.set(exhibitors);
- this.masterExhibitors.set(exhibitors);
- this.restClient = restClient;
- this.restUriPath = restUriPath;
- this.pollingMs = pollingMs;
- this.retryPolicy = retryPolicy;
- }
-
- /**
- * Change the set of exhibitors to poll
- *
- * @param newExhibitors new set
- */
- public void setExhibitors(Exhibitors newExhibitors)
- {
- exhibitors.set(newExhibitors);
- masterExhibitors.set(newExhibitors);
- }
-
- /**
- * Can be called prior to starting the Curator instance to set the current connection string
- *
- * @throws Exception errors
- */
- public void pollForInitialEnsemble() throws Exception
- {
- Preconditions.checkState(state.get() == State.LATENT, "Cannot be called after start()");
- poll();
- }
-
- @Override
- public void start() throws Exception
- {
- Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
-
- service.scheduleWithFixedDelay
- (
- new Runnable()
- {
- @Override
- public void run()
- {
- poll();
- }
- },
- pollingMs,
- pollingMs,
- TimeUnit.MILLISECONDS
- );
- }
-
- @Override
- public void close() throws IOException
- {
- Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started");
-
- service.shutdownNow();
- }
-
- @Override
- public String getConnectionString()
- {
- return connectionString.get();
- }
-
- @Override
- public void setConnectionString(String connectionString)
- {
- log.info("setConnectionString received. Ignoring. " + connectionString);
- }
-
- @Override
- public boolean updateServerListEnabled()
- {
- return false;
- }
-
- @VisibleForTesting
- protected void poll()
- {
- Exhibitors localExhibitors = exhibitors.get();
- Map<String, String> values = queryExhibitors(localExhibitors);
-
- int count = getCountFromValues(values);
- if ( count == 0 )
- {
- log.warn("0 count returned from Exhibitors. Using backup connection values.");
- values = useBackup(localExhibitors);
- count = getCountFromValues(values);
- }
-
- if ( count > 0 )
- {
- int port = Integer.parseInt(values.get(VALUE_PORT));
- StringBuilder newConnectionString = new StringBuilder();
- List<String> newHostnames = Lists.newArrayList();
-
- for ( int i = 0; i < count; ++i )
- {
- if ( newConnectionString.length() > 0 )
- {
- newConnectionString.append(",");
- }
- String server = values.get(VALUE_SERVER_PREFIX + i);
- newConnectionString.append(server).append(":").append(port);
- newHostnames.add(server);
- }
-
- String newConnectionStringValue = newConnectionString.toString();
- if ( !newConnectionStringValue.equals(connectionString.get()) )
- {
- log.info(String.format("Connection string has changed. Old value (%s), new value (%s)", connectionString.get(), newConnectionStringValue));
- }
- Exhibitors newExhibitors = new Exhibitors
- (
- newHostnames,
- localExhibitors.getRestPort(),
- new Exhibitors.BackupConnectionStringProvider()
- {
- @Override
- public String getBackupConnectionString() throws Exception
- {
- return masterExhibitors.get().getBackupConnectionString(); // this may be overloaded by clients. Make sure there is always a method call to get the value.
- }
- }
- );
- connectionString.set(newConnectionStringValue);
- exhibitors.set(newExhibitors);
- }
- }
-
- private int getCountFromValues(Map<String, String> values)
- {
- try
- {
- return Integer.parseInt(values.get(VALUE_COUNT));
- }
- catch ( NumberFormatException e )
- {
- // ignore
- }
- return 0;
- }
-
- private Map<String, String> useBackup(Exhibitors localExhibitors)
- {
- Map<String, String> values = newValues();
-
- try
- {
- String backupConnectionString = localExhibitors.getBackupConnectionString();
-
- int thePort = -1;
- int count = 0;
- for ( String spec : backupConnectionString.split(",") )
- {
- spec = spec.trim();
- String[] parts = spec.split(":");
- if ( parts.length == 2 )
- {
- String hostname = parts[0];
- int port = Integer.parseInt(parts[1]);
- if ( thePort < 0 )
- {
- thePort = port;
- }
- else if ( port != thePort )
- {
- log.warn("Inconsistent port in connection component: " + spec);
- }
- values.put(VALUE_SERVER_PREFIX + count, hostname);
- ++count;
- }
- else
- {
- log.warn("Bad backup connection component: " + spec);
- }
- }
- values.put(VALUE_COUNT, Integer.toString(count));
- values.put(VALUE_PORT, Integer.toString(thePort));
- }
- catch ( Exception e )
- {
- ThreadUtils.checkInterrupted(e);
- log.error("Couldn't get backup connection string", e);
- }
- return values;
- }
-
- private Map<String, String> newValues()
- {
- Map<String, String> values = Maps.newHashMap();
- values.put(VALUE_COUNT, "0");
- return values;
- }
-
- private static Map<String, String> decodeExhibitorList(String str) throws UnsupportedEncodingException
- {
- Map<String, String> values = Maps.newHashMap();
- for ( String spec : str.split("&") )
- {
- String[] parts = spec.split("=");
- if ( parts.length == 2 )
- {
- values.put(parts[0], URLDecoder.decode(parts[1], "UTF-8"));
- }
- }
-
- return values;
- }
-
- private Map<String, String> queryExhibitors(Exhibitors localExhibitors)
- {
- Map<String, String> values = newValues();
-
- long start = System.currentTimeMillis();
- int retries = 0;
- boolean done = false;
- while ( !done )
- {
- List<String> hostnames = Lists.newArrayList(localExhibitors.getHostnames());
- if ( hostnames.size() == 0 )
- {
- done = true;
- }
- else
- {
- String hostname = hostnames.get(random.nextInt(hostnames.size()));
- try
- {
- String encoded = restClient.getRaw(hostname, localExhibitors.getRestPort(), restUriPath, MIME_TYPE);
- values.putAll(decodeExhibitorList(encoded));
- done = true;
- }
- catch ( Throwable e )
- {
- ThreadUtils.checkInterrupted(e);
- if ( retryPolicy.allowRetry(retries++, System.currentTimeMillis() - start, RetryLoop.getDefaultRetrySleeper()) )
- {
- log.warn("Couldn't get servers from Exhibitor. Retrying.", e);
- }
- else
- {
- log.error("Couldn't get servers from Exhibitor. Giving up.", e);
- done = true;
- }
- }
- }
- }
-
- return values;
- }
-}
diff --git a/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/ExhibitorRestClient.java b/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/ExhibitorRestClient.java
deleted file mode 100644
index 007f652..0000000
--- a/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/ExhibitorRestClient.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.ensemble.exhibitor;
-
-public interface ExhibitorRestClient
-{
- /**
- * Connect to the given Exhibitor and return the raw result
- *
- * @param hostname host to connect to
- * @param port connect port
- * @param uriPath path
- * @param mimeType Accept mime type
- * @return raw result
- * @throws Exception errors
- */
- public String getRaw(String hostname, int port, String uriPath, String mimeType) throws Exception;
-}
diff --git a/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/Exhibitors.java b/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/Exhibitors.java
deleted file mode 100644
index 24e0c14..0000000
--- a/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/Exhibitors.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.ensemble.exhibitor;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import java.util.Collection;
-
-/**
- * POJO for specifying the cluster of Exhibitor instances
- */
-public class Exhibitors
-{
- private final Collection<String> hostnames;
- private final int restPort;
- private final BackupConnectionStringProvider backupConnectionStringProvider;
-
- public interface BackupConnectionStringProvider
- {
- public String getBackupConnectionString() throws Exception;
- }
-
- /**
- * @param hostnames set of Exhibitor instance host names
- * @param restPort the REST port used to connect to Exhibitor
- * @param backupConnectionStringProvider in case an Exhibitor instance can't be contacted, returns the fixed
- * connection string to use as a backup
- */
- public Exhibitors(Collection<String> hostnames, int restPort, BackupConnectionStringProvider backupConnectionStringProvider)
- {
- this.backupConnectionStringProvider = Preconditions.checkNotNull(backupConnectionStringProvider, "backupConnectionStringProvider cannot be null");
- this.hostnames = ImmutableList.copyOf(hostnames);
- this.restPort = restPort;
- }
-
- public Collection<String> getHostnames()
- {
- return hostnames;
- }
-
- public int getRestPort()
- {
- return restPort;
- }
-
- public String getBackupConnectionString() throws Exception
- {
- return backupConnectionStringProvider.getBackupConnectionString();
- }
-}
diff --git a/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java b/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java
deleted file mode 100644
index 1ee2301..0000000
--- a/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.utils;
-
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.lang.reflect.Method;
-
-/**
- * Utils to help with ZK 3.4.x compatibility
- */
-public class Compatibility
-{
- private static final boolean hasZooKeeperAdmin;
- private static final Method queueEventMethod;
- private static final Logger logger = LoggerFactory.getLogger(Compatibility.class);
-
- static
- {
- boolean localHasZooKeeperAdmin;
- try
- {
- Class.forName("org.apache.zookeeper.admin.ZooKeeperAdmin");
- localHasZooKeeperAdmin = true;
- }
- catch ( ClassNotFoundException e )
- {
- localHasZooKeeperAdmin = false;
- logger.info("Running in ZooKeeper 3.4.x compatibility mode");
- }
- hasZooKeeperAdmin = localHasZooKeeperAdmin;
-
- Method localQueueEventMethod;
- try
- {
- Class<?> testableClass = Class.forName("org.apache.zookeeper.Testable");
- localQueueEventMethod = testableClass.getMethod("queueEvent", WatchedEvent.class);
- }
- catch ( ReflectiveOperationException ignore )
- {
- localQueueEventMethod = null;
- LoggerFactory.getLogger(Compatibility.class).info("Using emulated InjectSessionExpiration");
- }
- queueEventMethod = localQueueEventMethod;
- }
-
- /**
- * Return true if the classpath ZooKeeper library is 3.4.x
- *
- * @return true/false
- */
- public static boolean isZK34()
- {
- return !hasZooKeeperAdmin;
- }
-
- /**
- * For ZooKeeper 3.5.x, use the supported <code>zooKeeper.getTestable().injectSessionExpiration()</code>.
- * For ZooKeeper 3.4.x do the equivalent via reflection
- *
- * @param zooKeeper client
- */
- public static void injectSessionExpiration(ZooKeeper zooKeeper)
- {
- if ( isZK34() || (queueEventMethod == null) )
- {
- InjectSessionExpiration.injectSessionExpiration(zooKeeper);
- }
- else
- {
- try
- {
- WatchedEvent event = new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, null);
- queueEventMethod.invoke(zooKeeper.getTestable(), event);
- }
- catch ( Exception e )
- {
- logger.error("Could not call Testable.queueEvent()", e);
- }
- }
- }
-}
diff --git a/curator-client/src/main/java/org/apache/curator/utils/InjectSessionExpiration.java b/curator-client/src/main/java/org/apache/curator/utils/InjectSessionExpiration.java
deleted file mode 100644
index 8ad2b5d..0000000
--- a/curator-client/src/main/java/org/apache/curator/utils/InjectSessionExpiration.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.utils;
-
-import org.apache.zookeeper.ClientCnxn;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-
-// reflective version of zooKeeper.getTestable().injectSessionExpiration();
-@SuppressWarnings("JavaReflectionMemberAccess")
-public class InjectSessionExpiration
-{
- private static final Field cnxnField;
- private static final Field eventThreadField;
- private static final Method queueEventMethod;
- static
- {
- Field localCnxnField;
- Field localEventThreadField;
- Method localQueueEventMethod;
- try
- {
- Class<?> eventThreadClass = Class.forName("org.apache.zookeeper.ClientCnxn$EventThread");
-
- localCnxnField = ZooKeeper.class.getDeclaredField("cnxn");
- localCnxnField.setAccessible(true);
- localEventThreadField = ClientCnxn.class.getDeclaredField("eventThread");
- localEventThreadField.setAccessible(true);
- localQueueEventMethod = eventThreadClass.getDeclaredMethod("queueEvent", WatchedEvent.class);
- localQueueEventMethod.setAccessible(true);
- }
- catch ( ReflectiveOperationException e )
- {
- throw new RuntimeException("Could not access internal ZooKeeper fields", e);
- }
- cnxnField = localCnxnField;
- eventThreadField = localEventThreadField;
- queueEventMethod = localQueueEventMethod;
- }
-
- public static void injectSessionExpiration(ZooKeeper zooKeeper)
- {
- try
- {
- WatchedEvent event = new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, null);
-
- ClientCnxn clientCnxn = (ClientCnxn)cnxnField.get(zooKeeper);
- Object eventThread = eventThreadField.get(clientCnxn);
- queueEventMethod.invoke(eventThread, event);
-
- // we used to set the state field to CLOSED here and a few other things but this resulted in CURATOR-498
- }
- catch ( ReflectiveOperationException e )
- {
- throw new RuntimeException("Could not inject session expiration using reflection", e);
- }
- }
-}
diff --git a/curator-client/src/test/java/org/apache/curator/BasicTests.java b/curator-client/src/test/java/org/apache/curator/BasicTests.java
index f951fb5..96b1ad0 100644
--- a/curator-client/src/test/java/org/apache/curator/BasicTests.java
+++ b/curator-client/src/test/java/org/apache/curator/BasicTests.java
@@ -21,8 +21,8 @@ package org.apache.curator;
import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Compatibility;
import org.apache.curator.test.Timing;
-import org.apache.curator.utils.Compatibility;
import org.apache.curator.utils.ZookeeperFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -100,7 +100,7 @@ public class BasicTests extends BaseClassForTests
// ignore
}
- Compatibility.injectSessionExpiration(client.getZooKeeper());
+ client.getZooKeeper().getTestable().injectSessionExpiration();
Assert.assertTrue(timing.awaitLatch(latch));
}
diff --git a/curator-client/src/test/java/org/apache/curator/TestSessionFailRetryLoop.java b/curator-client/src/test/java/org/apache/curator/TestSessionFailRetryLoop.java
index 7c9c963..e661930 100644
--- a/curator-client/src/test/java/org/apache/curator/TestSessionFailRetryLoop.java
+++ b/curator-client/src/test/java/org/apache/curator/TestSessionFailRetryLoop.java
@@ -20,9 +20,9 @@ package org.apache.curator;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Compatibility;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.test.Timing;
-import org.apache.curator.utils.Compatibility;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.concurrent.Callable;
@@ -57,7 +57,7 @@ public class TestSessionFailRetryLoop extends BaseClassForTests
if ( firstTime.compareAndSet(true, false) )
{
Assert.assertNull(client.getZooKeeper().exists("/foo/bar", false));
- Compatibility.injectSessionExpiration(client.getZooKeeper());
+ client.getZooKeeper().getTestable().injectSessionExpiration();
client.getZooKeeper();
client.blockUntilConnectedOrTimedOut();
}
@@ -131,7 +131,7 @@ public class TestSessionFailRetryLoop extends BaseClassForTests
if ( firstTime.compareAndSet(true, false) )
{
Assert.assertNull(client.getZooKeeper().exists("/foo/bar", false));
- Compatibility.injectSessionExpiration(client.getZooKeeper());
+ client.getZooKeeper().getTestable().injectSessionExpiration();
client.getZooKeeper();
client.blockUntilConnectedOrTimedOut();
}
@@ -196,7 +196,7 @@ public class TestSessionFailRetryLoop extends BaseClassForTests
public Void call() throws Exception
{
Assert.assertNull(client.getZooKeeper().exists("/foo/bar", false));
- Compatibility.injectSessionExpiration(client.getZooKeeper());
+ client.getZooKeeper().getTestable().injectSessionExpiration();
timing.sleepABit();
@@ -258,7 +258,7 @@ public class TestSessionFailRetryLoop extends BaseClassForTests
public Void call() throws Exception
{
Assert.assertNull(client.getZooKeeper().exists("/foo/bar", false));
- Compatibility.injectSessionExpiration(client.getZooKeeper());
+ client.getZooKeeper().getTestable().injectSessionExpiration();
client.getZooKeeper();
client.blockUntilConnectedOrTimedOut();
diff --git a/curator-client/src/test/java/org/apache/curator/ensemble/exhibitor/TestExhibitorEnsembleProvider.java b/curator-client/src/test/java/org/apache/curator/ensemble/exhibitor/TestExhibitorEnsembleProvider.java
deleted file mode 100644
index 60c817a..0000000
--- a/curator-client/src/test/java/org/apache/curator/ensemble/exhibitor/TestExhibitorEnsembleProvider.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.ensemble.exhibitor;
-
-import com.google.common.collect.Lists;
-import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.CuratorZookeeperClient;
-import org.apache.curator.RetryLoop;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.curator.retry.RetryOneTime;
-import org.apache.curator.test.TestingServer;
-import org.apache.curator.test.Timing;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.Stat;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-import java.io.IOException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicReference;
-
-public class TestExhibitorEnsembleProvider extends BaseClassForTests
-{
- private static final Exhibitors.BackupConnectionStringProvider dummyConnectionStringProvider = new Exhibitors.BackupConnectionStringProvider()
- {
- @Override
- public String getBackupConnectionString() throws Exception
- {
- return null;
- }
- };
-
- @Test
- public void testExhibitorFailures() throws Exception
- {
- final AtomicReference<String> backupConnectionString = new AtomicReference<String>("backup1:1");
- final AtomicReference<String> connectionString = new AtomicReference<String>("count=1&port=2&server0=localhost");
- Exhibitors exhibitors = new Exhibitors
- (
- Lists.newArrayList("foo", "bar"),
- 1000,
- new Exhibitors.BackupConnectionStringProvider()
- {
- @Override
- public String getBackupConnectionString()
- {
- return backupConnectionString.get();
- }
- }
- );
- ExhibitorRestClient mockRestClient = new ExhibitorRestClient()
- {
- @Override
- public String getRaw(String hostname, int port, String uriPath, String mimeType) throws Exception
- {
- String localConnectionString = connectionString.get();
- if ( localConnectionString == null )
- {
- throw new IOException();
- }
- return localConnectionString;
- }
- };
-
- final Semaphore semaphore = new Semaphore(0);
- ExhibitorEnsembleProvider provider = new ExhibitorEnsembleProvider(exhibitors, mockRestClient, "/foo", 10, new RetryOneTime(1))
- {
- @Override
- protected void poll()
- {
- super.poll();
- semaphore.release();
- }
- };
- provider.pollForInitialEnsemble();
- try
- {
- provider.start();
-
- Assert.assertEquals(provider.getConnectionString(), "localhost:2");
-
- connectionString.set(null);
- semaphore.drainPermits();
- semaphore.acquire(); // wait for next poll
- Assert.assertEquals(provider.getConnectionString(), "backup1:1");
-
- backupConnectionString.set("backup2:2");
- semaphore.drainPermits();
- semaphore.acquire(); // wait for next poll
- Assert.assertEquals(provider.getConnectionString(), "backup2:2");
-
- connectionString.set("count=1&port=3&server0=localhost3");
- semaphore.drainPermits();
- semaphore.acquire(); // wait for next poll
- Assert.assertEquals(provider.getConnectionString(), "localhost3:3");
- }
- finally
- {
- CloseableUtils.closeQuietly(provider);
- }
- }
-
- @Test
- public void testChanging() throws Exception
- {
- TestingServer secondServer = new TestingServer();
- try
- {
- String mainConnectionString = "count=1&port=" + server.getPort() + "&server0=localhost";
- String secondConnectionString = "count=1&port=" + secondServer.getPort() + "&server0=localhost";
-
- final Semaphore semaphore = new Semaphore(0);
- final AtomicReference<String> connectionString = new AtomicReference<String>(mainConnectionString);
- Exhibitors exhibitors = new Exhibitors(Lists.newArrayList("foo", "bar"), 1000, dummyConnectionStringProvider);
- ExhibitorRestClient mockRestClient = new ExhibitorRestClient()
- {
- @Override
- public String getRaw(String hostname, int port, String uriPath, String mimeType) throws Exception
- {
- semaphore.release();
- return connectionString.get();
- }
- };
- ExhibitorEnsembleProvider provider = new ExhibitorEnsembleProvider(exhibitors, mockRestClient, "/foo", 10, new RetryOneTime(1));
- provider.pollForInitialEnsemble();
-
- Timing timing = new Timing().multiple(4);
- final CuratorZookeeperClient client = new CuratorZookeeperClient(provider, timing.session(), timing.connection(), null, new RetryOneTime(2));
- client.start();
- try
- {
- RetryLoop.callWithRetry
- (
- client,
- new Callable<Object>()
- {
- @Override
- public Object call() throws Exception
- {
- client.getZooKeeper().create("/test", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- return null;
- }
- }
- );
-
- connectionString.set(secondConnectionString);
- semaphore.drainPermits();
- semaphore.acquire();
-
- server.stop(); // create situation where the current zookeeper gets a sys-disconnected
-
- Stat stat = RetryLoop.callWithRetry
- (
- client,
- new Callable<Stat>()
- {
- @Override
- public Stat call() throws Exception
- {
- return client.getZooKeeper().exists("/test", false);
- }
- }
- );
- Assert.assertNull(stat); // it's a different server so should be null
- }
- finally
- {
- client.close();
- }
- }
- finally
- {
- CloseableUtils.closeQuietly(secondServer);
- }
- }
-
- @Test
- public void testSimple() throws Exception
- {
- Exhibitors exhibitors = new Exhibitors(Lists.newArrayList("foo", "bar"), 1000, dummyConnectionStringProvider);
- ExhibitorRestClient mockRestClient = new ExhibitorRestClient()
- {
- @Override
- public String getRaw(String hostname, int port, String uriPath, String mimeType) throws Exception
- {
- return "count=1&port=" + server.getPort() + "&server0=localhost";
- }
- };
- ExhibitorEnsembleProvider provider = new ExhibitorEnsembleProvider(exhibitors, mockRestClient, "/foo", 10, new RetryOneTime(1));
- provider.pollForInitialEnsemble();
-
- Timing timing = new Timing();
- CuratorZookeeperClient client = new CuratorZookeeperClient(provider, timing.session(), timing.connection(), null, new ExponentialBackoffRetry(timing.milliseconds(), 3));
- client.start();
- try
- {
- client.blockUntilConnectedOrTimedOut();
- client.getZooKeeper().exists("/", false);
- }
- catch ( Exception e )
- {
- Assert.fail("provider.getConnectionString(): " + provider.getConnectionString() + " server.getPort(): " + server.getPort(), e);
- }
- finally
- {
- client.close();
- }
- }
-}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index 3737faa..8b39ebd 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -325,13 +325,6 @@ public interface CuratorFramework extends Closeable
SchemaSet getSchemaSet();
/**
- * Return true if this instance is running in ZK 3.4.x compatibility mode
- *
- * @return true/false
- */
- boolean isZk34CompatibilityMode();
-
- /**
* Calls {@link #notifyAll()} on the given object after first synchronizing on it. This is
* done from the {@link #runSafe(Runnable)} thread.
*
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
index 887a2aa..9c075bb 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
@@ -54,8 +54,6 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.curator.CuratorZookeeperClient;
-import static org.apache.curator.utils.Compatibility.isZK34;
-
/**
* Factory methods for creating framework-style clients
*/
@@ -150,7 +148,6 @@ public class CuratorFrameworkFactory
private ConnectionStateErrorPolicy connectionStateErrorPolicy = new StandardConnectionStateErrorPolicy();
private ConnectionHandlingPolicy connectionHandlingPolicy = new StandardConnectionHandlingPolicy();
private SchemaSet schemaSet = SchemaSet.getDefaultSchemaSet();
- private boolean zk34CompatibilityMode = isZK34();
private int waitForShutdownTimeoutMs = 0;
private Executor runSafeService = null;
private ConnectionStateListenerManagerFactory connectionStateListenerManagerFactory = ConnectionStateListenerManagerFactory.standard;
@@ -395,20 +392,6 @@ public class CuratorFrameworkFactory
}
/**
- * If mode is true, create a ZooKeeper 3.4.x compatible client. IMPORTANT: If the client
- * library used is ZooKeeper 3.4.x <code>zk34CompatibilityMode</code> is enabled by default.
- *
- * @since 3.5.0
- * @param mode true/false
- * @return this
- */
- public Builder zk34CompatibilityMode(boolean mode)
- {
- this.zk34CompatibilityMode = mode;
- return this;
- }
-
- /**
* Set a timeout for {@link CuratorZookeeperClient#close(int)} }.
* The default is 0, which means that this feature is disabled.
*
@@ -591,11 +574,6 @@ public class CuratorFrameworkFactory
return schemaSet;
}
- public boolean isZk34CompatibilityMode()
- {
- return zk34CompatibilityMode;
- }
-
@Deprecated
public String getAuthScheme()
{
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/SafeIsTtlMode.java b/curator-framework/src/main/java/org/apache/curator/framework/SafeIsTtlMode.java
deleted file mode 100644
index e499a7b..0000000
--- a/curator-framework/src/main/java/org/apache/curator/framework/SafeIsTtlMode.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.framework;
-
-import org.apache.curator.utils.Compatibility;
-import org.apache.zookeeper.CreateMode;
-
-public class SafeIsTtlMode
-{
- private static class Internal
- {
- private static final Internal instance = new Internal();
-
- public boolean isTtl(CreateMode mode)
- {
- return mode.isTTL();
- }
- }
-
- public static boolean isTtl(CreateMode mode)
- {
- return !Compatibility.isZK34() && Internal.instance.isTtl(mode);
- }
-
- private SafeIsTtlMode()
- {
- }
-}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CompatibleCreateCallback.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CompatibleCreateCallback.java
deleted file mode 100644
index 30ca391..0000000
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CompatibleCreateCallback.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.framework.imps;
-
-import org.apache.zookeeper.data.Stat;
-
-interface CompatibleCreateCallback
-{
- void processResult(int rc, String path, Object ctx, String name, Stat stat);
-}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
index a02a6be..ee5c541 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
@@ -124,7 +124,6 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro
@Override
public CreateBuilderMain withTtl(long ttl)
{
- Preconditions.checkState(!client.isZk34CompatibilityMode(), "TTLs are not support when running in ZooKeeper 3.4 compatibility mode");
this.ttl = ttl;
return this;
}
@@ -182,14 +181,7 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro
}
String fixedPath = client.fixForNamespace(path);
- if ( client.isZk34CompatibilityMode() )
- {
- transaction.add(Op.create(fixedPath, data, acling.getAclList(path), createMode), OperationType.CREATE, path);
- }
- else
- {
- transaction.add(Op.create(fixedPath, data, acling.getAclList(path), createMode, ttl), OperationType.CREATE, path);
- }
+ transaction.add(Op.create(fixedPath, data, acling.getAclList(path), createMode, ttl), OperationType.CREATE, path);
return context;
}
};
@@ -636,10 +628,11 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro
final OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("CreateBuilderImpl-Background");
final byte[] data = operationAndData.getData().getData();
- final CompatibleCreateCallback mainCallback = new CompatibleCreateCallback()
+ AsyncCallback.Create2Callback callback = new AsyncCallback.Create2Callback()
{
@Override
- public void processResult(int rc, String path, Object ctx, String name, Stat stat) {
+ public void processResult(int rc, String path, Object ctx, String name, Stat stat)
+ {
trace.setReturnCode(rc).setRequestBytesLength(data).setPath(path).commit();
if ( (stat != null) && (storingStat != null) )
@@ -661,41 +654,16 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro
}
}
};
-
- if ( client.isZk34CompatibilityMode() )
- {
- AsyncCallback.StringCallback stringCallback = new AsyncCallback.StringCallback()
- {
- @Override
- public void processResult(int rc, String path, Object ctx, String name)
- {
- mainCallback.processResult(rc, path, ctx, name, null);
- }
- };
- client.getZooKeeper().create
- (
- operationAndData.getData().getPath(),
- data,
- acling.getAclList(operationAndData.getData().getPath()),
- createMode,
- stringCallback,
- backgrounding.getContext()
- );
- }
- else
- {
- CreateZK35.create
- (
- client.getZooKeeper(),
- operationAndData.getData().getPath(),
- data,
- acling.getAclList(operationAndData.getData().getPath()),
- createMode,
- mainCallback,
- backgrounding.getContext(),
- ttl
- );
- }
+ client.getZooKeeper().create
+ (
+ operationAndData.getData().getPath(),
+ data,
+ acling.getAclList(operationAndData.getData().getPath()),
+ createMode,
+ callback,
+ backgrounding.getContext(),
+ ttl
+ );
}
catch ( Throwable e )
{
@@ -1171,28 +1139,14 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro
{
try
{
- if ( client.isZk34CompatibilityMode() )
- {
- createdPath = client.getZooKeeper().create(path, data, aclList, createMode);
- }
- else
- {
- createdPath = client.getZooKeeper().create(path, data, aclList, createMode, storingStat, ttl);
- }
+ createdPath = client.getZooKeeper().create(path, data, aclList, createMode, storingStat, ttl);
}
catch ( KeeperException.NoNodeException e )
{
if ( createParentsIfNeeded )
{
ZKPaths.mkdirs(client.getZooKeeper(), path, false, acling.getACLProviderForParents(), createParentsAsContainers);
- if ( client.isZk34CompatibilityMode() )
- {
- createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode);
- }
- else
- {
- createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode, storingStat, ttl);
- }
+ createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode, storingStat, ttl);
}
else
{
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateZK35.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateZK35.java
deleted file mode 100644
index a32e614..0000000
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateZK35.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.framework.imps;
-
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Stat;
-import java.util.List;
-
-// keep reference to AsyncCallback.Create2Callback in separate class for ZK 3.4 compatibility
-class CreateZK35
-{
- static void create(ZooKeeper zooKeeper, String path, byte data[], List<ACL> acl, CreateMode createMode, final CompatibleCreateCallback compatibleCallback, Object ctx, long ttl)
- {
- AsyncCallback.Create2Callback callback = new AsyncCallback.Create2Callback()
- {
- @Override
- public void processResult(int rc, String path, Object ctx, String name, Stat stat)
- {
- compatibleCallback.processResult(rc, path, ctx, name, stat);
- }
- };
- zooKeeper.create(path, data, acl, createMode, callback, ctx, ttl);
- }
-
- private CreateZK35()
- {
- }
-}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index e003bf0..90f740d 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -20,7 +20,6 @@
package org.apache.curator.framework.imps;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.apache.curator.CuratorConnectionLossException;
@@ -36,7 +35,7 @@ import org.apache.curator.framework.api.transaction.CuratorMultiTransaction;
import org.apache.curator.framework.api.transaction.CuratorTransaction;
import org.apache.curator.framework.api.transaction.TransactionOp;
import org.apache.curator.framework.listen.Listenable;
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.StandardListenerManager;
import org.apache.curator.framework.schema.SchemaSet;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateErrorPolicy;
@@ -67,8 +66,8 @@ public class CuratorFrameworkImpl implements CuratorFramework
{
private final Logger log = LoggerFactory.getLogger(getClass());
private final CuratorZookeeperClient client;
- private final ListenerContainer<CuratorListener> listeners;
- private final ListenerContainer<UnhandledErrorListener> unhandledErrorListeners;
+ private final StandardListenerManager<CuratorListener> listeners;
+ private final StandardListenerManager<UnhandledErrorListener> unhandledErrorListeners;
private final ThreadFactory threadFactory;
private final int maxCloseWaitMs;
private final BlockingQueue<OperationAndData<?>> backgroundOperations;
@@ -88,7 +87,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
private final InternalConnectionHandler internalConnectionHandler;
private final EnsembleTracker ensembleTracker;
private final SchemaSet schemaSet;
- private final boolean zk34CompatibilityMode;
private final Executor runSafeService;
private volatile ExecutorService executorService;
@@ -132,8 +130,8 @@ public class CuratorFrameworkImpl implements CuratorFramework
);
internalConnectionHandler = new StandardInternalConnectionHandler();
- listeners = new ListenerContainer<CuratorListener>();
- unhandledErrorListeners = new ListenerContainer<UnhandledErrorListener>();
+ listeners = StandardListenerManager.standard();
+ unhandledErrorListeners = StandardListenerManager.standard();
backgroundOperations = new DelayQueue<OperationAndData<?>>();
forcedSleepOperations = new LinkedBlockingQueue<>();
namespace = new NamespaceImpl(this, builder.getNamespace());
@@ -146,7 +144,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
useContainerParentsIfAvailable = builder.useContainerParentsIfAvailable();
connectionStateErrorPolicy = Preconditions.checkNotNull(builder.getConnectionStateErrorPolicy(), "errorPolicy cannot be null");
schemaSet = Preconditions.checkNotNull(builder.getSchemaSet(), "schemaSet cannot be null");
- zk34CompatibilityMode = builder.isZk34CompatibilityMode();
byte[] builderDefaultData = builder.getDefaultData();
defaultData = (builderDefaultData != null) ? Arrays.copyOf(builderDefaultData, builderDefaultData.length) : new byte[0];
@@ -156,7 +153,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
failedRemoveWatcherManager = new FailedRemoveWatchManager(this);
namespaceFacadeCache = new NamespaceFacadeCache(this);
- ensembleTracker = zk34CompatibilityMode ? null : new EnsembleTracker(this, builder.getEnsembleProvider());
+ ensembleTracker = new EnsembleTracker(this, builder.getEnsembleProvider());
runSafeService = makeRunSafeService(builder);
}
@@ -254,7 +251,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
connectionStateErrorPolicy = parent.connectionStateErrorPolicy;
internalConnectionHandler = parent.internalConnectionHandler;
schemaSet = parent.schemaSet;
- zk34CompatibilityMode = parent.zk34CompatibilityMode;
ensembleTracker = null;
runSafeService = parent.runSafeService;
}
@@ -368,22 +364,17 @@ public class CuratorFrameworkImpl implements CuratorFramework
log.debug("Closing");
if ( state.compareAndSet(CuratorFrameworkState.STARTED, CuratorFrameworkState.STOPPED) )
{
- listeners.forEach(new Function<CuratorListener, Void>()
+ listeners.forEach(listener ->
{
- @Override
- public Void apply(CuratorListener listener)
+ CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.CLOSING, 0, null, null, null, null, null, null, null, null, null);
+ try
{
- CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.CLOSING, 0, null, null, null, null, null, null, null, null, null);
- try
- {
- listener.eventReceived(CuratorFrameworkImpl.this, event);
- }
- catch ( Exception e )
- {
- ThreadUtils.checkInterrupted(e);
- log.error("Exception while sending Closing event", e);
- }
- return null;
+ listener.eventReceived(CuratorFrameworkImpl.this, event);
+ }
+ catch ( Exception e )
+ {
+ ThreadUtils.checkInterrupted(e);
+ log.error("Exception while sending Closing event", e);
}
});
@@ -498,14 +489,12 @@ public class CuratorFrameworkImpl implements CuratorFramework
@Override
public ReconfigBuilder reconfig()
{
- Preconditions.checkState(!isZk34CompatibilityMode(), "reconfig/config APIs are not support when running in ZooKeeper 3.4 compatibility mode");
return new ReconfigBuilderImpl(this);
}
@Override
public GetConfigBuilder getConfig()
{
- Preconditions.checkState(!isZk34CompatibilityMode(), "reconfig/config APIs are not support when running in ZooKeeper 3.4 compatibility mode");
return new GetConfigBuilderImpl(this);
}
@@ -567,7 +556,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
@Override
public RemoveWatchesBuilder watches()
{
- Preconditions.checkState(!isZk34CompatibilityMode(), "Remove watches APIs are not support when running in ZooKeeper 3.4 compatibility mode");
return new RemoveWatchesBuilderImpl(this);
}
@@ -705,15 +693,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
}
final String localReason = reason;
- unhandledErrorListeners.forEach(new Function<UnhandledErrorListener, Void>()
- {
- @Override
- public Void apply(UnhandledErrorListener listener)
- {
- listener.unhandledError(localReason, e);
- return null;
- }
- });
+ unhandledErrorListeners.forEach(l -> l.unhandledError(localReason, e));
if ( debugUnhandledErrorListener != null )
{
@@ -824,12 +804,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
connectionStateManager.addStateChange(newConnectionState);
}
- @Override
- public boolean isZk34CompatibilityMode()
- {
- return zk34CompatibilityMode;
- }
-
EnsembleTracker getEnsembleTracker()
{
return ensembleTracker;
@@ -1037,23 +1011,18 @@ public class CuratorFrameworkImpl implements CuratorFramework
validateConnection(curatorEvent.getWatchedEvent().getState());
}
- listeners.forEach(new Function<CuratorListener, Void>()
+ listeners.forEach(listener ->
{
- @Override
- public Void apply(CuratorListener listener)
+ try
{
- try
- {
- OperationTrace trace = client.startAdvancedTracer("EventListener");
- listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent);
- trace.commit();
- }
- catch ( Exception e )
- {
- ThreadUtils.checkInterrupted(e);
- logError("Event listener threw exception", e);
- }
- return null;
+ OperationTrace trace = client.startAdvancedTracer("EventListener");
+ listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent);
+ trace.commit();
+ }
+ catch ( Exception e )
+ {
+ ThreadUtils.checkInterrupted(e);
+ logError("Event listener threw exception", e);
}
});
}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionImpl.java
index 9057934..bdab158 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionImpl.java
@@ -35,7 +35,6 @@ import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
import org.apache.curator.framework.schema.Schema;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.proto.CreateRequest;
@@ -136,22 +135,7 @@ public class CuratorMultiTransactionImpl implements
if ( (curatorOp.get().getType() == ZooDefs.OpCode.create) || (curatorOp.get().getType() == ZooDefs.OpCode.createContainer) )
{
CreateRequest createRequest = (CreateRequest)curatorOp.get().toRequestRecord();
- CreateMode createMode;
- if ( client.isZk34CompatibilityMode() )
- {
- try
- {
- createMode = CreateMode.fromFlag(createRequest.getFlags());
- }
- catch ( KeeperException.BadArgumentsException dummy )
- {
- createMode = CreateMode.PERSISTENT;
- }
- }
- else
- {
- createMode = CreateMode.fromFlag(createRequest.getFlags(), CreateMode.PERSISTENT);
- }
+ CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags(), CreateMode.PERSISTENT);
schema.validateCreate(createMode, createRequest.getPath(), createRequest.getData(), createRequest.getAcl());
}
else if ( (curatorOp.get().getType() == ZooDefs.OpCode.delete) || (curatorOp.get().getType() == ZooDefs.OpCode.deleteContainer) )
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java
index bdb5428..b9c9044 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java
@@ -53,16 +53,11 @@ public class WatcherRemovalManager
void removeWatchers()
{
- if ( client.isZk34CompatibilityMode() )
- {
- return;
- }
-
List<NamespaceWatcher> localEntries = Lists.newArrayList(entries);
while ( localEntries.size() > 0 )
{
NamespaceWatcher watcher = localEntries.remove(0);
- if ( entries.remove(watcher) && !client.isZk34CompatibilityMode() )
+ if ( entries.remove(watcher) )
{
try
{
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerContainer.java b/curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerContainer.java
deleted file mode 100644
index 9139439..0000000
--- a/curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerContainer.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.framework.listen;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.MoreExecutors;
-import org.apache.curator.utils.ThreadUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.util.Map;
-import java.util.concurrent.Executor;
-
-/**
- * Abstracts an object that has listeners
- *
- * @deprecated Prefer {@link MappingListenerManager} and
- * {@link StandardListenerManager}
- */
-@Deprecated
-public class ListenerContainer<T> implements Listenable<T>
-{
- private final Logger log = LoggerFactory.getLogger(getClass());
- private final Map<T, ListenerEntry<T>> listeners = Maps.newConcurrentMap();
-
- @Override
- public void addListener(T listener)
- {
- addListener(listener, MoreExecutors.directExecutor());
- }
-
- @Override
- public void addListener(T listener, Executor executor)
- {
- listeners.put(listener, new ListenerEntry<T>(listener, executor));
- }
-
- @Override
- public void removeListener(T listener)
- {
- if ( listener != null )
- {
- listeners.remove(listener);
- }
- }
-
- /**
- * Remove all listeners
- */
- public void clear()
- {
- listeners.clear();
- }
-
- /**
- * Return the number of listeners
- *
- * @return number
- */
- public int size()
- {
- return listeners.size();
- }
-
- /**
- * Utility - apply the given function to each listener. The function receives
- * the listener as an argument.
- *
- * @param function function to call for each listener
- */
- public void forEach(final Function<T, Void> function)
- {
- for ( final ListenerEntry<T> entry : listeners.values() )
- {
- entry.executor.execute
- (
- new Runnable()
- {
- @Override
- public void run()
- {
- try
- {
- function.apply(entry.listener);
- }
- catch ( Throwable e )
- {
- ThreadUtils.checkInterrupted(e);
- log.error(String.format("Listener (%s) threw an exception", entry.listener), e);
- }
- }
- }
- );
- }
- }
-}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerManager.java b/curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerManager.java
index bd9f51a..5a6c9c7 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerManager.java
@@ -28,7 +28,7 @@ import java.util.function.Consumer;
import java.util.function.Function;
/**
- * Upgraded version of {@link org.apache.curator.framework.listen.ListenerContainer} that
+ * Upgraded version of ListenerContainer that
* doesn't leak Guava's internals and also supports mapping/wrapping of listeners
*/
public class MappingListenerManager<K, V> implements ListenerManager<K, V>
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index 46325d2..7285431 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -23,7 +23,6 @@ import com.google.common.base.Preconditions;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.listen.Listenable;
import org.apache.curator.framework.listen.UnaryListenerManager;
-import org.apache.curator.utils.Compatibility;
import org.apache.curator.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -308,7 +307,7 @@ public class ConnectionStateManager implements Closeable
log.warn(String.format("Session timeout has elapsed while SUSPENDED. Injecting a session expiration. Elapsed ms: %d. Adjusted session timeout ms: %d", elapsedMs, useSessionTimeoutMs));
try
{
- Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+ client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
}
catch ( Exception e )
{
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCleanState.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCleanState.java
index d80e053..b4791ff 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCleanState.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCleanState.java
@@ -22,7 +22,6 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.test.WatchersDebug;
import org.apache.curator.test.compatibility.Timing2;
import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.utils.Compatibility;
import org.apache.zookeeper.ZooKeeper;
import java.util.concurrent.Callable;
@@ -35,12 +34,6 @@ public class TestCleanState
return;
}
- if ( Compatibility.isZK34() )
- {
- CloseableUtils.closeQuietly(client);
- return;
- }
-
try
{
Timing2 timing = new Timing2();
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCreateReturningStat.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCreateReturningStat.java
index 034791d..10b237f 100755
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCreateReturningStat.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCreateReturningStat.java
@@ -26,7 +26,6 @@ import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.compatibility.CuratorTestBase;
import org.apache.curator.test.Timing;
-import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.data.Stat;
import org.testng.Assert;
@@ -34,7 +33,6 @@ import org.testng.annotations.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
-@Test(groups = Zk35MethodInterceptor.zk35Group)
public class TestCreateReturningStat extends CuratorTestBase
{
private CuratorFramework createClient()
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
index 773d9c9..1656351 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
@@ -27,7 +27,6 @@ import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.compatibility.Timing2;
import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.utils.Compatibility;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
@@ -127,7 +126,7 @@ public class TestEnabledSessionExpiredState extends BaseClassForTests
{
Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED);
- Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+ client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
Assert.assertEquals(states.poll(timing.forSessionSleep().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.LOST);
Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.RECONNECTED);
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java
index fe49ad7..8b22812 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java
@@ -32,7 +32,6 @@ import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.Timing;
import org.apache.curator.test.compatibility.Timing2;
-import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.utils.EnsurePath;
import org.apache.curator.utils.ZKPaths;
@@ -77,7 +76,6 @@ public class TestFramework extends BaseClassForTests
super.teardown();
}
- @Test(groups = Zk35MethodInterceptor.zk35Group)
public void testWaitForShutdownTimeoutMs() throws Exception
{
final BlockingQueue<Integer> timeoutQueue = new ArrayBlockingQueue<>(1);
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
index 7c6d156..c091fad 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
@@ -24,7 +24,6 @@ import org.apache.curator.RetryPolicy;
import org.apache.curator.RetrySleeper;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.SafeIsTtlMode;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CreateBuilder;
import org.apache.curator.framework.api.CuratorEvent;
@@ -37,16 +36,15 @@ import org.apache.curator.retry.RetryForever;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Compatibility;
import org.apache.curator.test.InstanceSpec;
import org.apache.curator.test.TestingCluster;
import org.apache.curator.test.TestingServer;
import org.apache.curator.test.compatibility.Timing2;
import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.utils.Compatibility;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
@@ -63,7 +61,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class TestFrameworkEdges extends BaseClassForTests
@@ -92,7 +89,7 @@ public class TestFrameworkEdges extends BaseClassForTests
}
};
client.checkExists().usingWatcher(watcher).forPath("/foobar");
- Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+ client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
Assert.assertTrue(timing.awaitLatch(expiredLatch));
}
}
@@ -327,7 +324,7 @@ public class TestFrameworkEdges extends BaseClassForTests
final String TEST_PATH = "/a/b/c/test-";
long ttl = timing.forWaiting().milliseconds() * 1000;
CreateBuilder firstCreateBuilder = client.create();
- if ( SafeIsTtlMode.isTtl(mode) )
+ if ( mode.isTTL() )
{
firstCreateBuilder.withTtl(ttl);
}
@@ -343,7 +340,7 @@ public class TestFrameworkEdges extends BaseClassForTests
CreateBuilderImpl createBuilder = (CreateBuilderImpl)client.create();
createBuilder.withProtection();
- if ( SafeIsTtlMode.isTtl(mode) )
+ if ( mode.isTTL() )
{
createBuilder.withTtl(ttl);
}
@@ -532,7 +529,7 @@ public class TestFrameworkEdges extends BaseClassForTests
};
client.checkExists().usingWatcher(watcher).forPath("/sessionTest");
- Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+ client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
Assert.assertTrue(timing.awaitLatch(sessionDiedLatch));
Assert.assertNotNull(client.checkExists().forPath("/sessionTest"));
}
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
index c6ff2bb..1ff2805 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
@@ -32,7 +32,6 @@ import org.apache.curator.test.TestingCluster;
import org.apache.curator.test.TestingZooKeeperServer;
import org.apache.curator.test.compatibility.CuratorTestBase;
import org.apache.curator.test.compatibility.Timing2;
-import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
@@ -57,7 +56,6 @@ import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
-@Test(groups = Zk35MethodInterceptor.zk35Group)
public class TestReconfiguration extends CuratorTestBase
{
private final Timing2 timing = new Timing2();
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
index 63d8931..82f2cf4 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
@@ -32,7 +32,6 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.compatibility.CuratorTestBase;
import org.apache.curator.test.Timing;
-import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
@@ -45,7 +44,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-@Test(groups = Zk35MethodInterceptor.zk35Group)
public class TestRemoveWatches extends CuratorTestBase
{
private AtomicReference<ConnectionState> registerConnectionStateListener(CuratorFramework client)
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestTtlNodes.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestTtlNodes.java
index e2156df..fc37e41 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestTtlNodes.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestTtlNodes.java
@@ -25,7 +25,6 @@ import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.compatibility.CuratorTestBase;
import org.apache.curator.test.Timing;
-import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
import org.apache.zookeeper.CreateMode;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -34,7 +33,6 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.util.concurrent.CountDownLatch;
-@Test(groups = Zk35MethodInterceptor.zk35Group)
public class TestTtlNodes extends CuratorTestBase
{
@BeforeClass
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java
index 74aac1d..8b17576 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java
@@ -27,7 +27,6 @@ import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.compatibility.CuratorTestBase;
import org.apache.curator.test.Timing;
import org.apache.curator.test.WatchersDebug;
-import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
@@ -37,7 +36,6 @@ import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
-@Test(groups = Zk35MethodInterceptor.zk35Group)
public class TestWatcherRemovalManager extends CuratorTestBase
{
@Test
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
index e7778ff..c654f4f 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
@@ -19,7 +19,6 @@
package org.apache.curator.framework.recipes.cache;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
@@ -27,7 +26,8 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.WatcherRemoveCuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.StandardListenerManager;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.utils.PathUtils;
@@ -62,7 +62,7 @@ public class NodeCache implements Closeable
private final boolean dataIsCompressed;
private final AtomicReference<ChildData> data = new AtomicReference<ChildData>(null);
private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
- private final ListenerContainer<NodeCacheListener> listeners = new ListenerContainer<NodeCacheListener>();
+ private final StandardListenerManager<NodeCacheListener> listeners = StandardListenerManager.standard();
private final AtomicBoolean isConnected = new AtomicBoolean(true);
private ConnectionStateListener connectionStateListener = new ConnectionStateListener()
{
@@ -204,7 +204,7 @@ public class NodeCache implements Closeable
*
* @return listenable
*/
- public ListenerContainer<NodeCacheListener> getListenable()
+ public Listenable<NodeCacheListener> getListenable()
{
Preconditions.checkState(state.get() != State.CLOSED, "Closed");
@@ -314,26 +314,17 @@ public class NodeCache implements Closeable
ChildData previousData = data.getAndSet(newData);
if ( !Objects.equal(previousData, newData) )
{
- listeners.forEach
- (
- new Function<NodeCacheListener, Void>()
+ listeners.forEach(listener -> {
+ try
{
- @Override
- public Void apply(NodeCacheListener listener)
- {
- try
- {
- listener.nodeChanged();
- }
- catch ( Exception e )
- {
- ThreadUtils.checkInterrupted(e);
- log.error("Calling listener", e);
- }
- return null;
- }
+ listener.nodeChanged();
+ }
+ catch ( Exception e )
+ {
+ ThreadUtils.checkInterrupted(e);
+ log.error("Calling listener", e);
}
- );
+ });
if ( rebuildTestExchanger != null )
{
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
index bdc73cc..14c889a 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
@@ -20,7 +20,6 @@
package org.apache.curator.framework.recipes.cache;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
@@ -31,7 +30,8 @@ import org.apache.curator.framework.EnsureContainers;
import org.apache.curator.framework.WatcherRemoveCuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.StandardListenerManager;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.utils.CloseableExecutorService;
@@ -74,7 +74,7 @@ public class PathChildrenCache implements Closeable
private final CloseableExecutorService executorService;
private final boolean cacheData;
private final boolean dataIsCompressed;
- private final ListenerContainer<PathChildrenCacheListener> listeners = new ListenerContainer<PathChildrenCacheListener>();
+ private final StandardListenerManager<PathChildrenCacheListener> listeners = StandardListenerManager.standard();
private final ConcurrentMap<String, ChildData> currentData = Maps.newConcurrentMap();
private final AtomicReference<Map<String, ChildData>> initialSet = new AtomicReference<Map<String, ChildData>>();
private final Set<Operation> operationsQuantizer = Sets.newSetFromMap(Maps.<Operation, Boolean>newConcurrentMap());
@@ -394,7 +394,7 @@ public class PathChildrenCache implements Closeable
*
* @return listenable
*/
- public ListenerContainer<PathChildrenCacheListener> getListenable()
+ public Listenable<PathChildrenCacheListener> getListenable()
{
return listeners;
}
@@ -526,26 +526,17 @@ public class PathChildrenCache implements Closeable
void callListeners(final PathChildrenCacheEvent event)
{
- listeners.forEach
- (
- new Function<PathChildrenCacheListener, Void>()
- {
- @Override
- public Void apply(PathChildrenCacheListener listener)
- {
- try
- {
- listener.childEvent(client, event);
- }
- catch ( Exception e )
- {
- ThreadUtils.checkInterrupted(e);
- handleException(e);
- }
- return null;
- }
- }
- );
+ listeners.forEach(listener -> {
+ try
+ {
+ listener.childEvent(client, event);
+ }
+ catch ( Exception e )
+ {
+ ThreadUtils.checkInterrupted(e);
+ handleException(e);
+ }
+ });
}
void getDataAndStat(final String fullPath) throws Exception
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
index f42c1d5..ad14dab 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
@@ -20,7 +20,6 @@
package org.apache.curator.framework.recipes.cache;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
@@ -33,7 +32,7 @@ import org.apache.curator.framework.api.Pathable;
import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.api.Watchable;
import org.apache.curator.framework.listen.Listenable;
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.StandardListenerManager;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.utils.PathUtils;
@@ -531,8 +530,8 @@ public class TreeCache implements Closeable
private final boolean cacheData;
private final boolean dataIsCompressed;
private final int maxDepth;
- private final ListenerContainer<TreeCacheListener> listeners = new ListenerContainer<TreeCacheListener>();
- private final ListenerContainer<UnhandledErrorListener> errorListeners = new ListenerContainer<UnhandledErrorListener>();
+ private final StandardListenerManager<TreeCacheListener> listeners = StandardListenerManager.standard();
+ private final StandardListenerManager<UnhandledErrorListener> errorListeners = StandardListenerManager.standard();
private final AtomicReference<TreeState> treeState = new AtomicReference<TreeState>(TreeState.LATENT);
private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
@@ -750,21 +749,16 @@ public class TreeCache implements Closeable
private void callListeners(final TreeCacheEvent event)
{
- listeners.forEach(new Function<TreeCacheListener, Void>()
+ listeners.forEach(listener ->
{
- @Override
- public Void apply(TreeCacheListener listener)
+ try
{
- try
- {
- listener.childEvent(client, event);
- }
- catch ( Exception e )
- {
- ThreadUtils.checkInterrupted(e);
- handleException(e);
- }
- return null;
+ listener.childEvent(client, event);
+ }
+ catch ( Exception e )
+ {
+ ThreadUtils.checkInterrupted(e);
+ handleException(e);
}
});
}
@@ -780,21 +774,16 @@ public class TreeCache implements Closeable
}
else
{
- errorListeners.forEach(new Function<UnhandledErrorListener, Void>()
+ errorListeners.forEach(listener ->
{
- @Override
- public Void apply(UnhandledErrorListener listener)
+ try
{
- try
- {
- listener.unhandledError("", e);
- }
- catch ( Exception e )
- {
- ThreadUtils.checkInterrupted(e);
- LOG.error("Exception handling exception", e);
- }
- return null;
+ listener.unhandledError("", e);
+ }
+ catch ( Exception e2 )
+ {
+ ThreadUtils.checkInterrupted(e2);
+ LOG.error("Exception handling exception", e2);
}
});
}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index 79b2601..7d9ca3c 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -20,13 +20,12 @@
package org.apache.curator.framework.recipes.leader;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.WatcherRemoveCuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.StandardListenerManager;
import org.apache.curator.framework.recipes.AfterConnectionEstablished;
import org.apache.curator.framework.recipes.locks.LockInternals;
import org.apache.curator.framework.recipes.locks.LockInternalsSorter;
@@ -71,7 +70,7 @@ public class LeaderLatch implements Closeable
private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
private final AtomicBoolean hasLeadership = new AtomicBoolean(false);
private final AtomicReference<String> ourPath = new AtomicReference<String>();
- private final ListenerContainer<LeaderLatchListener> listeners = new ListenerContainer<LeaderLatchListener>();
+ private final StandardListenerManager<LeaderLatchListener> listeners = StandardListenerManager.standard();
private final CloseMode closeMode;
private final AtomicReference<Future<?>> startTask = new AtomicReference<Future<?>>();
@@ -682,27 +681,11 @@ public class LeaderLatch implements Closeable
if ( oldValue && !newValue )
{ // Lost leadership, was true, now false
- listeners.forEach(new Function<LeaderLatchListener, Void>()
- {
- @Override
- public Void apply(LeaderLatchListener listener)
- {
- listener.notLeader();
- return null;
- }
- });
+ listeners.forEach(LeaderLatchListener::notLeader);
}
else if ( !oldValue && newValue )
{ // Gained leadership, was false, now true
- listeners.forEach(new Function<LeaderLatchListener, Void>()
- {
- @Override
- public Void apply(LeaderLatchListener input)
- {
- input.isLeader();
- return null;
- }
- });
+ listeners.forEach(LeaderLatchListener::isLeader);
}
notifyAll();
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
deleted file mode 100644
index 3c8b56a..0000000
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
+++ /dev/null
@@ -1,307 +0,0 @@
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.framework.recipes.locks;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.leader.LeaderLatch;
-import org.apache.curator.utils.CloseableScheduledExecutorService;
-import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.utils.PathUtils;
-import org.apache.curator.utils.ThreadUtils;
-import org.apache.curator.utils.ZKPaths;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * Utility to reap empty child nodes of a parent node. Periodically calls getChildren on
- * the node and adds empty nodes to an internally managed {@link Reaper}
- *
- * @deprecated Since 2.9.0 - Reaper/ChildReaper are no longer needed. Use {@link CreateMode#CONTAINER}.
- * Also, all Curator recipes create container parents.
- */
-@Deprecated
-public class ChildReaper implements Closeable
-{
- private final Logger log = LoggerFactory.getLogger(getClass());
- private final Reaper reaper;
- private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
- private final CuratorFramework client;
- private final Collection<String> paths = Sets.newConcurrentHashSet();
- private volatile Iterator<String> pathIterator = null;
- private final Reaper.Mode mode;
- private final CloseableScheduledExecutorService executor;
- private final int reapingThresholdMs;
- private final LeaderLatch leaderLatch;
- private final Set<String> lockSchema;
- private final AtomicInteger maxChildren = new AtomicInteger(-1);
-
- private volatile Future<?> task;
-
- private enum State
- {
- LATENT,
- STARTED,
- CLOSED
- }
-
- /**
- * @param client the client
- * @param path path to reap children from
- * @param mode reaping mode
- */
- public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode)
- {
- this(client, path, mode, newExecutorService(), Reaper.DEFAULT_REAPING_THRESHOLD_MS, null);
- }
-
- /**
- * @param client the client
- * @param path path to reap children from
- * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted
- * @param mode reaping mode
- */
- public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode, int reapingThresholdMs)
- {
- this(client, path, mode, newExecutorService(), reapingThresholdMs, null);
- }
-
- /**
- * @param client the client
- * @param path path to reap children from
- * @param executor executor to use for background tasks
- * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted
- * @param mode reaping mode
- */
- public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode, ScheduledExecutorService executor, int reapingThresholdMs)
- {
- this(client, path, mode, executor, reapingThresholdMs, null);
- }
-
- /**
- * @param client the client
- * @param path path to reap children from
- * @param executor executor to use for background tasks
- * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted
- * @param mode reaping mode
- * @param leaderPath if not null, uses a leader selection so that only 1 reaper is active in the cluster
- */
- public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode, ScheduledExecutorService executor, int reapingThresholdMs, String leaderPath)
- {
- this(client, path, mode, executor, reapingThresholdMs, leaderPath, Collections.<String>emptySet());
- }
-
-
- /**
- * @param client the client
- * @param path path to reap children from
- * @param executor executor to use for background tasks
- * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted
- * @param mode reaping mode
- * @param leaderPath if not null, uses a leader selection so that only 1 reaper is active in the cluster
- * @param lockSchema a set of the possible subnodes of the children of path that must be reaped in addition to the child nodes
- */
- public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode, ScheduledExecutorService executor, int reapingThresholdMs, String leaderPath, Set<String> lockSchema)
- {
- this.client = client;
- this.mode = mode;
- this.executor = new CloseableScheduledExecutorService(executor);
- this.reapingThresholdMs = reapingThresholdMs;
- if (leaderPath != null)
- {
- leaderLatch = new LeaderLatch(client, leaderPath);
- }
- else
- {
- leaderLatch = null;
- }
- this.reaper = new Reaper(client, executor, reapingThresholdMs, leaderLatch);
- this.lockSchema = lockSchema;
- addPath(path);
- }
-
- /**
- * The reaper must be started
- *
- * @throws Exception errors
- */
- public void start() throws Exception
- {
- Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
-
- task = executor.scheduleWithFixedDelay
- (
- new Runnable()
- {
- @Override
- public void run()
- {
- doWork();
- }
- },
- reapingThresholdMs,
- reapingThresholdMs,
- TimeUnit.MILLISECONDS
- );
- if (leaderLatch != null)
- {
- leaderLatch.start();
- }
- reaper.start();
- }
-
- @Override
- public void close() throws IOException
- {
- if ( state.compareAndSet(State.STARTED, State.CLOSED) )
- {
- CloseableUtils.closeQuietly(reaper);
- if (leaderLatch != null)
- {
- CloseableUtils.closeQuietly(leaderLatch);
- }
- task.cancel(true);
- }
- }
-
- /**
- * Add a path to reap children from
- *
- * @param path the path
- * @return this for chaining
- */
- public ChildReaper addPath(String path)
- {
- paths.add(PathUtils.validatePath(path));
- return this;
- }
-
- /**
- * Remove a path from reaping
- *
- * @param path the path
- * @return true if the path existed and was removed
- */
- public boolean removePath(String path)
- {
- return paths.remove(PathUtils.validatePath(path));
- }
-
- /**
- * If a node has so many children that {@link CuratorFramework#getChildren()} will fail
- * (due to jute.maxbuffer) it can cause connection instability. Set the max number of
- * children here to prevent the path from being queried in these cases. The number should usually
- * be: average-node-name-length/1000000
- *
- * @param maxChildren max children
- */
- public void setMaxChildren(int maxChildren)
- {
- this.maxChildren.set(maxChildren);
- }
-
- public static ScheduledExecutorService newExecutorService()
- {
- return ThreadUtils.newFixedThreadScheduledPool(2, "ChildReaper");
- }
-
- @VisibleForTesting
- protected void warnMaxChildren(String path, Stat stat)
- {
- log.warn(String.format("Skipping %s as it has too many children: %d", path, stat.getNumChildren()));
- }
-
- private void doWork()
- {
- if ( shouldDoWork() )
- {
- if ( (pathIterator == null) || !pathIterator.hasNext() )
- {
- pathIterator = paths.iterator();
- }
- while ( pathIterator.hasNext() )
- {
- String path = pathIterator.next();
- try
- {
- int maxChildren = this.maxChildren.get();
- if ( maxChildren > 0 )
- {
- Stat stat = client.checkExists().forPath(path);
- if ( (stat != null) && (stat.getNumChildren() > maxChildren) )
- {
- warnMaxChildren(path, stat);
- continue;
- }
- }
-
- List<String> children = client.getChildren().forPath(path);
- log.info(String.format("Found %d children for %s", children.size(), path));
- for ( String name : children )
- {
- String childPath = ZKPaths.makePath(path, name);
- addPathToReaperIfEmpty(childPath);
- for ( String subNode : lockSchema )
- {
- addPathToReaperIfEmpty(ZKPaths.makePath(childPath, subNode));
- }
-
- }
- }
- catch ( Exception e )
- {
- ThreadUtils.checkInterrupted(e);
- log.error("Could not get children for path: " + path, e);
- }
- }
- }
- }
-
- private void addPathToReaperIfEmpty(String path) throws Exception
- {
- Stat stat = client.checkExists().forPath(path);
- if ( (stat != null) && (stat.getNumChildren() == 0) )
- {
- log.info("Adding " + path);
- reaper.addPath(path, mode);
- }
- }
-
- private boolean shouldDoWork()
- {
- return this.leaderLatch == null || this.leaderLatch.hasLeadership();
- }
-}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
deleted file mode 100644
index 2522154..0000000
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
+++ /dev/null
@@ -1,380 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.framework.recipes.locks;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.leader.LeaderLatch;
-import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
-import org.apache.curator.utils.CloseableScheduledExecutorService;
-import org.apache.curator.utils.ThreadUtils;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * Utility to clean up parent lock nodes so that they don't stay around as garbage
- *
- * @deprecated Since 2.9.0 - Reaper/ChildReaper are no longer needed. Use {@link CreateMode#CONTAINER}.
- * Also, all Curator recipes create container parents.
- */
-@Deprecated
-public class Reaper implements Closeable
-{
- private final Logger log = LoggerFactory.getLogger(getClass());
- private final CuratorFramework client;
- private final CloseableScheduledExecutorService executor;
- private final int reapingThresholdMs;
- private final Map<String, PathHolder> activePaths = Maps.newConcurrentMap();
- private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
- private final LeaderLatch leaderLatch;
- private final AtomicBoolean reapingIsActive = new AtomicBoolean(true);
- private final boolean ownsLeaderLatch;
-
- private enum State
- {
- LATENT,
- STARTED,
- CLOSED
- }
-
- static final int DEFAULT_REAPING_THRESHOLD_MS = (int)TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES);
-
- @VisibleForTesting
- static final int EMPTY_COUNT_THRESHOLD = 3;
-
- @VisibleForTesting
- class PathHolder implements Runnable
- {
- final String path;
- final Mode mode;
- final int emptyCount;
-
- @Override
- public void run()
- {
- reap(this);
- }
-
- private PathHolder(String path, Mode mode, int emptyCount)
- {
- this.path = path;
- this.mode = mode;
- this.emptyCount = emptyCount;
- }
- }
-
- public enum Mode
- {
- /**
- * Reap forever, or until removePath is called for the path
- */
- REAP_INDEFINITELY,
-
- /**
- * Reap until the Reaper succeeds in deleting the path
- */
- REAP_UNTIL_DELETE,
-
- /**
- * Reap until the path no longer exists
- */
- REAP_UNTIL_GONE
- }
-
- /**
- * Uses the default reaping threshold of 5 minutes and creates an internal thread pool
- *
- * @param client client
- */
- public Reaper(CuratorFramework client)
- {
- this(client, newExecutorService(), DEFAULT_REAPING_THRESHOLD_MS, (String) null);
- }
-
- /**
- * Uses the given reaping threshold and creates an internal thread pool
- *
- * @param client client
- * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted
- */
- public Reaper(CuratorFramework client, int reapingThresholdMs)
- {
- this(client, newExecutorService(), reapingThresholdMs, (String) null);
- }
-
- /**
- * @param client client
- * @param executor thread pool
- * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted
- */
- public Reaper(CuratorFramework client, ScheduledExecutorService executor, int reapingThresholdMs)
- {
- this(client, executor, reapingThresholdMs, (String) null);
- }
-
- /**
- * @param client client
- * @param executor thread pool
- * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted
- * @param leaderPath if not null, uses a leader selection so that only 1 reaper is active in the cluster
- */
- public Reaper(CuratorFramework client, ScheduledExecutorService executor, int reapingThresholdMs, String leaderPath)
- {
- this(client, executor, reapingThresholdMs, makeLeaderLatchIfPathNotNull(client, leaderPath), true);
- }
-
- /**
- * @param client client
- * @param executor thread pool
- * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted
- * @param leaderLatch a pre-created leader latch to ensure only 1 reaper is active in the cluster
- */
- public Reaper(CuratorFramework client, ScheduledExecutorService executor, int reapingThresholdMs, LeaderLatch leaderLatch)
- {
- this(client, executor, reapingThresholdMs, leaderLatch, false);
- }
-
- /**
- * @param client client
- * @param executor thread pool
- * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted
- * @param leaderLatch a pre-created leader latch to ensure only 1 reaper is active in the cluster
- * @param ownsLeaderLatch indicates whether or not the reaper owns the leader latch (if it exists) and thus should start/stop it
- * */
- private Reaper(CuratorFramework client, ScheduledExecutorService executor, int reapingThresholdMs, LeaderLatch leaderLatch, boolean ownsLeaderLatch)
- {
- this.client = client;
- this.executor = new CloseableScheduledExecutorService(executor);
- this.reapingThresholdMs = reapingThresholdMs / EMPTY_COUNT_THRESHOLD;
- this.leaderLatch = leaderLatch;
- if (leaderLatch != null)
- {
- addListenerToLeaderLatch(leaderLatch);
- }
- this.ownsLeaderLatch = ownsLeaderLatch;
- }
-
-
- /**
- * Add a path (using Mode.REAP_INDEFINITELY) to be checked by the reaper. The path will be checked periodically
- * until the reaper is closed.
- *
- * @param path path to check
- */
- public void addPath(String path)
- {
- addPath(path, Mode.REAP_INDEFINITELY);
- }
-
- /**
- * Add a path to be checked by the reaper. The path will be checked periodically
- * until the reaper is closed, or until the point specified by the Mode
- *
- * @param path path to check
- * @param mode reaping mode
- */
- public void addPath(String path, Mode mode)
- {
- PathHolder pathHolder = new PathHolder(path, mode, 0);
- activePaths.put(path, pathHolder);
- schedule(pathHolder, reapingThresholdMs);
- }
-
- /**
- * Stop reaping the given path
- *
- * @param path path to remove
- * @return true if the path was removed
- */
- public boolean removePath(String path)
- {
- return activePaths.remove(path) != null;
- }
-
- /**
- * The reaper must be started
- *
- * @throws Exception errors
- */
- public void start() throws Exception
- {
- Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
-
- if ( leaderLatch != null && ownsLeaderLatch)
- {
- leaderLatch.start();
- }
- }
-
- @Override
- public void close() throws IOException
- {
- if ( state.compareAndSet(State.STARTED, State.CLOSED) )
- {
- executor.close();
- if ( leaderLatch != null && ownsLeaderLatch )
- {
- leaderLatch.close();
- }
- }
- }
-
- @VisibleForTesting
- protected Future<?> schedule(PathHolder pathHolder, int reapingThresholdMs)
- {
- if ( reapingIsActive.get() )
- {
- return executor.schedule(pathHolder, reapingThresholdMs, TimeUnit.MILLISECONDS);
- }
- return null;
- }
-
- @VisibleForTesting
- protected void reap(PathHolder holder)
- {
- if ( !activePaths.containsKey(holder.path) )
- {
- return;
- }
-
- boolean addBack = true;
- int newEmptyCount = 0;
- try
- {
- Stat stat = client.checkExists().forPath(holder.path);
- if ( stat != null ) // otherwise already deleted
- {
- if ( stat.getNumChildren() == 0 )
- {
- if ( (holder.emptyCount + 1) >= EMPTY_COUNT_THRESHOLD )
- {
- try
- {
- client.delete().forPath(holder.path);
- log.info("Reaping path: " + holder.path);
- if ( holder.mode == Mode.REAP_UNTIL_DELETE || holder.mode == Mode.REAP_UNTIL_GONE )
- {
- addBack = false;
- }
- }
- catch ( KeeperException.NoNodeException ignore )
- {
- // Node must have been deleted by another process/thread
- if ( holder.mode == Mode.REAP_UNTIL_GONE )
- {
- addBack = false;
- }
- }
- catch ( KeeperException.NotEmptyException ignore )
- {
- // ignore - it must have been re-used
- }
- }
- else
- {
- newEmptyCount = holder.emptyCount + 1;
- }
- }
- }
- else
- {
- if ( holder.mode == Mode.REAP_UNTIL_GONE )
- {
- addBack = false;
- }
- }
- }
- catch ( Exception e )
- {
- ThreadUtils.checkInterrupted(e);
- log.error("Trying to reap: " + holder.path, e);
- }
-
- if ( !addBack )
- {
- activePaths.remove(holder.path);
- }
- else if ( !Thread.currentThread().isInterrupted() && (state.get() == State.STARTED) && activePaths.containsKey(holder.path) )
- {
- activePaths.put(holder.path, holder);
- schedule(new PathHolder(holder.path, holder.mode, newEmptyCount), reapingThresholdMs);
- }
- }
-
- /**
- * Allocate an executor service for the reaper
- *
- * @return service
- */
- public static ScheduledExecutorService newExecutorService()
- {
- return ThreadUtils.newSingleThreadScheduledExecutor("Reaper");
- }
-
- private void addListenerToLeaderLatch(LeaderLatch leaderLatch)
- {
-
- LeaderLatchListener listener = new LeaderLatchListener()
- {
- @Override
- public void isLeader()
- {
- reapingIsActive.set(true);
- for ( PathHolder holder : activePaths.values() )
- {
- schedule(holder, reapingThresholdMs);
- }
- }
-
- @Override
- public void notLeader()
- {
- reapingIsActive.set(false);
- }
- };
- leaderLatch.addListener(listener);
-
- reapingIsActive.set(leaderLatch.hasLeadership());
- }
-
- private static LeaderLatch makeLeaderLatchIfPathNotNull(CuratorFramework client, String leaderPath)
- {
- if (leaderPath == null)
- {
- return null;
- }
- else
- {
- return new LeaderLatch(client, leaderPath);
- }
- }
-}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
index 81e8dd9..f7b4653 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
@@ -19,11 +19,9 @@
package org.apache.curator.framework.recipes.nodes;
-import com.google.common.base.Function;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.SafeIsTtlMode;
import org.apache.curator.framework.WatcherRemoveCuratorFramework;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.curator.framework.api.BackgroundCallback;
@@ -32,7 +30,8 @@ import org.apache.curator.framework.api.CreateBuilderMain;
import org.apache.curator.framework.api.CreateModable;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorWatcher;
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.StandardListenerManager;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.utils.PathUtils;
@@ -77,7 +76,7 @@ public class PersistentNode implements Closeable
private final BackgroundCallback backgroundCallback;
private final boolean useProtection;
private final AtomicReference<CreateModable<ACLBackgroundPathAndBytesable<String>>> createMethod = new AtomicReference<CreateModable<ACLBackgroundPathAndBytesable<String>>>(null);
- private final ListenerContainer<PersistentNodeListener> listeners = new ListenerContainer<PersistentNodeListener>();
+ private final StandardListenerManager<PersistentNodeListener> listeners = StandardListenerManager.standard();
private final CuratorWatcher watcher = new CuratorWatcher()
{
@Override
@@ -362,7 +361,7 @@ public class PersistentNode implements Closeable
*
* @return listenable
*/
- public ListenerContainer<PersistentNodeListener> getListenable()
+ public Listenable<PersistentNodeListener> getListenable()
{
return listeners;
}
@@ -462,7 +461,7 @@ public class PersistentNode implements Closeable
CreateModable<ACLBackgroundPathAndBytesable<String>> localCreateMethod = createMethod.get();
if ( localCreateMethod == null )
{
- CreateBuilderMain createBuilder = SafeIsTtlMode.isTtl(mode) ? client.create().withTtl(ttl) : client.create();
+ CreateBuilderMain createBuilder = mode.isTTL() ? client.create().withTtl(ttl) : client.create();
CreateModable<ACLBackgroundPathAndBytesable<String>> tempCreateMethod = useProtection ? createBuilder.creatingParentContainersIfNeeded().withProtection() : createBuilder.creatingParentContainersIfNeeded();
createMethod.compareAndSet(null, tempCreateMethod);
localCreateMethod = createMethod.get();
@@ -523,25 +522,17 @@ public class PersistentNode implements Closeable
private void notifyListeners()
{
final String path = getActualPath();
- listeners.forEach(
- new Function<PersistentNodeListener, Void>()
- {
- @Override
- public Void apply(PersistentNodeListener listener)
- {
- try
- {
- listener.nodeCreated(path);
- }
- catch ( Exception e )
- {
- ThreadUtils.checkInterrupted(e);
- log.error("From PersistentNode listener", e);
- }
- return null;
- }
- }
- );
+ listeners.forEach(listener -> {
+ try
+ {
+ listener.nodeCreated(path);
+ }
+ catch ( Exception e )
+ {
+ ThreadUtils.checkInterrupted(e);
+ log.error("From PersistentNode listener", e);
+ }
+ });
}
private boolean isActive()
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedDelayQueue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedDelayQueue.java
index 8f321b3..5327a09 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedDelayQueue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedDelayQueue.java
@@ -21,7 +21,7 @@ package org.apache.curator.framework.recipes.queue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.Listenable;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
@@ -215,7 +215,7 @@ public class DistributedDelayQueue<T> implements Closeable, QueueBase<T>
* @return put listener container
*/
@Override
- public ListenerContainer<QueuePutListener<T>> getPutListenerContainer()
+ public Listenable<QueuePutListener<T>> getPutListenerContainer()
{
return queue.getPutListenerContainer();
}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedIdQueue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedIdQueue.java
index 15045aa..45b84c4 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedIdQueue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedIdQueue.java
@@ -21,7 +21,7 @@ package org.apache.curator.framework.recipes.queue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.Listenable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
@@ -105,7 +105,7 @@ public class DistributedIdQueue<T> implements QueueBase<T>
}
@Override
- public ListenerContainer<QueuePutListener<T>> getPutListenerContainer()
+ public Listenable<QueuePutListener<T>> getPutListenerContainer()
{
return queue.getPutListenerContainer();
}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedPriorityQueue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedPriorityQueue.java
index 4b70ec5..be61de2 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedPriorityQueue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedPriorityQueue.java
@@ -21,7 +21,7 @@ package org.apache.curator.framework.recipes.queue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.Listenable;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.Executor;
@@ -176,7 +176,7 @@ public class DistributedPriorityQueue<T> implements Closeable, QueueBase<T>
* @return put listener container
*/
@Override
- public ListenerContainer<QueuePutListener<T>> getPutListenerContainer()
+ public Listenable<QueuePutListener<T>> getPutListenerContainer()
{
return queue.getPutListenerContainer();
}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedQueue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedQueue.java
index 14d1266..0df2930 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedQueue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedQueue.java
@@ -19,18 +19,18 @@
package org.apache.curator.framework.recipes.queue;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-
-import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.framework.imps.CuratorFrameworkState;
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.StandardListenerManager;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.utils.PathUtils;
import org.apache.curator.utils.ThreadUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
@@ -50,7 +50,6 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.curator.utils.PathUtils;
/**
* <p>An implementation of the Distributed Queue ZK recipe. Items put into the queue
@@ -81,7 +80,7 @@ public class DistributedQueue<T> implements QueueBase<T>
private final boolean isProducerOnly;
private final String lockPath;
private final AtomicReference<ErrorMode> errorMode = new AtomicReference<ErrorMode>(ErrorMode.REQUEUE);
- private final ListenerContainer<QueuePutListener<T>> putListenerContainer = new ListenerContainer<QueuePutListener<T>>();
+ private final StandardListenerManager<QueuePutListener<T>> putListenerContainer = StandardListenerManager.standard();
private final AtomicInteger lastChildCount = new AtomicInteger(0);
private final int maxItems;
private final int finalFlushMs;
@@ -233,7 +232,7 @@ public class DistributedQueue<T> implements QueueBase<T>
* @return put listener container
*/
@Override
- public ListenerContainer<QueuePutListener<T>> getPutListenerContainer()
+ public Listenable<QueuePutListener<T>> getPutListenerContainer()
{
return putListenerContainer;
}
@@ -407,25 +406,16 @@ public class DistributedQueue<T> implements QueueBase<T>
putCount.decrementAndGet();
putCount.notifyAll();
}
- putListenerContainer.forEach
- (
- new Function<QueuePutListener<T>, Void>()
+ putListenerContainer.forEach(listener -> {
+ if ( item != null )
{
- @Override
- public Void apply(QueuePutListener<T> listener)
- {
- if ( item != null )
- {
- listener.putCompleted(item);
- }
- else
- {
- listener.putMultiCompleted(givenMultiItem);
- }
- return null;
- }
+ listener.putCompleted(item);
}
- );
+ else
+ {
+ listener.putMultiCompleted(givenMultiItem);
+ }
+ });
}
private void doPutInBackground(final T item, String path, final MultiItem<T> givenMultiItem, byte[] bytes) throws Exception
@@ -449,25 +439,16 @@ public class DistributedQueue<T> implements QueueBase<T>
}
}
- putListenerContainer.forEach
- (
- new Function<QueuePutListener<T>, Void>()
+ putListenerContainer.forEach(listener -> {
+ if ( item != null )
{
- @Override
- public Void apply(QueuePutListener<T> listener)
- {
- if ( item != null )
- {
- listener.putCompleted(item);
- }
- else
- {
- listener.putMultiCompleted(givenMultiItem);
- }
- return null;
- }
+ listener.putCompleted(item);
}
- );
+ else
+ {
+ listener.putMultiCompleted(givenMultiItem);
+ }
+ });
}
};
internalCreateNode(path, bytes, callback);
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/QueueBase.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/QueueBase.java
index 0a21ce8..f9fee13 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/QueueBase.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/QueueBase.java
@@ -18,7 +18,7 @@
*/
package org.apache.curator.framework.recipes.queue;
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.Listenable;
import java.io.Closeable;
import java.util.concurrent.TimeUnit;
@@ -36,7 +36,7 @@ public interface QueueBase<T> extends Closeable
*
* @return put listener container
*/
- ListenerContainer<QueuePutListener<T>> getPutListenerContainer();
+ Listenable<QueuePutListener<T>> getPutListenerContainer();
/**
* Used when the queue is created with a {@link QueueBuilder#lockPath(String)}. Determines
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
index 5d7abce..d605234 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
@@ -20,14 +20,14 @@
package org.apache.curator.framework.recipes.shared;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.WatcherRemoveCuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorWatcher;
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.StandardListenerManager;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.utils.PathUtils;
@@ -52,7 +52,7 @@ public class SharedValue implements Closeable, SharedValueReader
private static final int UNINITIALIZED_VERSION = -1;
private final Logger log = LoggerFactory.getLogger(getClass());
- private final ListenerContainer<SharedValueListener> listeners = new ListenerContainer<SharedValueListener>();
+ private final StandardListenerManager<SharedValueListener> listeners = StandardListenerManager.standard();
private final WatcherRemoveCuratorFramework client;
private final String path;
private final byte[] seedValue;
@@ -235,7 +235,7 @@ public class SharedValue implements Closeable, SharedValueReader
*
* @return listenable
*/
- public ListenerContainer<SharedValueListener> getListenable()
+ public Listenable<SharedValueListener> getListenable()
{
return listeners;
}
@@ -297,41 +297,21 @@ public class SharedValue implements Closeable, SharedValueReader
private void notifyListeners()
{
final byte[] localValue = getValue();
- listeners.forEach
- (
- new Function<SharedValueListener, Void>()
- {
- @Override
- public Void apply(SharedValueListener listener)
- {
- try
- {
- listener.valueHasChanged(SharedValue.this, localValue);
- }
- catch ( Exception e )
- {
- ThreadUtils.checkInterrupted(e);
- log.error("From SharedValue listener", e);
- }
- return null;
- }
- }
- );
+ listeners.forEach(listener -> {
+ try
+ {
+ listener.valueHasChanged(SharedValue.this, localValue);
+ }
+ catch ( Exception e )
+ {
+ ThreadUtils.checkInterrupted(e);
+ log.error("From SharedValue listener", e);
+ }
+ });
}
private void notifyListenerOfStateChanged(final ConnectionState newState)
{
- listeners.forEach
- (
- new Function<SharedValueListener, Void>()
- {
- @Override
- public Void apply(SharedValueListener listener)
- {
- listener.stateChanged(client, newState);
- return null;
- }
- }
- );
+ listeners.forEach(listener -> listener.stateChanged(client, newState));
}
}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValueReader.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValueReader.java
index e298cca..a846c54 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValueReader.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValueReader.java
@@ -18,7 +18,7 @@
*/
package org.apache.curator.framework.recipes.shared;
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.Listenable;
/**
* Abstracts a shared value and allows listening for changes to the value
@@ -44,5 +44,5 @@ public interface SharedValueReader
*
* @return listenable
*/
- public ListenerContainer<SharedValueListener> getListenable();
+ public Listenable<SharedValueListener> getListenable();
}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestNodeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestNodeCache.java
index ff416d5..705cad5 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestNodeCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestNodeCache.java
@@ -20,6 +20,7 @@ package org.apache.curator.framework.recipes.cache;
import org.apache.curator.framework.imps.TestCleanState;
import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Compatibility;
import org.apache.curator.test.compatibility.Timing2;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.framework.CuratorFramework;
@@ -27,7 +28,6 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.Timing;
-import org.apache.curator.utils.Compatibility;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.concurrent.Callable;
@@ -196,7 +196,7 @@ public class TestNodeCache extends BaseClassForTests
}
);
- Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+ client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
Thread.sleep(timing.multiple(1.5).session());
Assert.assertEquals(cache.getCurrentData().getData(), "start".getBytes());
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
index 78fabd5..3c65994 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
@@ -29,11 +29,11 @@ import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Compatibility;
import org.apache.curator.test.ExecuteCalledWatchingExecutorService;
import org.apache.curator.test.TestingServer;
import org.apache.curator.test.Timing;
import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.utils.Compatibility;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.testng.Assert;
@@ -814,7 +814,7 @@ public class TestPathChildrenCache extends BaseClassForTests
client.create().withMode(CreateMode.EPHEMERAL).forPath("/test/me", "data".getBytes());
Assert.assertTrue(timing.awaitLatch(childAddedLatch));
- Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+ client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
Assert.assertTrue(timing.awaitLatch(lostLatch));
Assert.assertTrue(timing.awaitLatch(reconnectedLatch));
Assert.assertTrue(timing.awaitLatch(removedLatch));
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java
index 1e97ce2..7da3ecb 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java
@@ -23,8 +23,8 @@ import com.google.common.collect.ImmutableSet;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
+import org.apache.curator.test.Compatibility;
import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.utils.Compatibility;
import org.apache.zookeeper.CreateMode;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -423,9 +423,9 @@ public class TestTreeCache extends BaseTestTreeCache
client.create().withMode(CreateMode.EPHEMERAL).forPath("/test/me", "data".getBytes());
assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/me");
- Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
- assertEvent(TreeCacheEvent.Type.NODE_REMOVED, "/test/me", "data".getBytes(), true);
+ client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
assertEvent(TreeCacheEvent.Type.INITIALIZED, null, null, true);
+ assertEvent(TreeCacheEvent.Type.NODE_REMOVED, "/test/me", "data".getBytes(), true);
assertNoMoreEvents();
}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
index b92c3a2..97a326e 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
@@ -30,11 +30,11 @@ import org.apache.curator.framework.state.SessionConnectionStateErrorPolicy;
import org.apache.curator.framework.state.StandardConnectionStateErrorPolicy;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Compatibility;
import org.apache.curator.test.TestingServer;
import org.apache.curator.test.Timing;
import org.apache.curator.test.compatibility.Timing2;
import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.utils.Compatibility;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.Arrays;
@@ -531,7 +531,7 @@ public class TestLeaderSelector extends BaseClassForTests
Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
- Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+ client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
Assert.assertTrue(timing.awaitLatch(interruptedLatch));
timing.sleepABit();
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java
deleted file mode 100644
index cafb9a3..0000000
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java
+++ /dev/null
@@ -1,351 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.framework.recipes.locks;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.recipes.leader.LeaderLatch;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.curator.retry.RetryOneTime;
-import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.test.TestingServer;
-import org.apache.curator.test.Timing;
-import org.apache.curator.utils.CloseableUtils;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-
-public class TestChildReaper extends BaseClassForTests
-{
- @Test
- public void testMaxChildren() throws Exception
- {
- server.close();
-
- final int LARGE_QTY = 10000;
-
- System.setProperty("jute.maxbuffer", "" + LARGE_QTY);
- server = new TestingServer();
- try
- {
- Timing timing = new Timing();
- ChildReaper reaper = null;
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new ExponentialBackoffRetry(100, 3));
- try
- {
- client.start();
-
- for ( int i = 0; i < LARGE_QTY; ++i )
- {
- if ( (i % 1000) == 0 )
- {
- System.out.println(i);
- }
- client.create().creatingParentsIfNeeded().forPath("/big/node-" + i);
- }
-
- try
- {
- client.getChildren().forPath("/big");
- Assert.fail("Should have been a connection loss");
- }
- catch ( KeeperException.ConnectionLossException e )
- {
- // expected
- }
-
- final CountDownLatch latch = new CountDownLatch(1);
- reaper = new ChildReaper(client, "/big", Reaper.Mode.REAP_UNTIL_DELETE, 1)
- {
- @Override
- protected void warnMaxChildren(String path, Stat stat)
- {
- latch.countDown();
- super.warnMaxChildren(path, stat);
- }
- };
- reaper.setMaxChildren(100);
- reaper.start();
- Assert.assertTrue(timing.awaitLatch(latch));
- }
- finally
- {
- CloseableUtils.closeQuietly(reaper);
- CloseableUtils.closeQuietly(client);
- }
- }
- finally
- {
- System.clearProperty("jute.maxbuffer");
- }
- }
-
- @Test
- public void testLargeNodes() throws Exception
- {
- server.close();
-
- final int LARGE_QTY = 10000;
- final int SMALL_QTY = 100;
-
- System.setProperty("jute.maxbuffer", "" + LARGE_QTY);
- server = new TestingServer();
- try
- {
- Timing timing = new Timing();
- ChildReaper reaper = null;
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new ExponentialBackoffRetry(100, 3));
- try
- {
- client.start();
-
- for ( int i = 0; i < LARGE_QTY; ++i )
- {
- if ( (i % 1000) == 0 )
- {
- System.out.println(i);
- }
- client.create().creatingParentsIfNeeded().forPath("/big/node-" + i);
-
- if ( i < SMALL_QTY )
- {
- client.create().creatingParentsIfNeeded().forPath("/small/node-" + i);
- }
- }
-
- reaper = new ChildReaper(client, "/foo", Reaper.Mode.REAP_UNTIL_DELETE, 1);
- reaper.start();
-
- reaper.addPath("/big");
- reaper.addPath("/small");
-
- int count = -1;
- for ( int i = 0; (i < 10) && (count != 0); ++i )
- {
- timing.sleepABit();
- count = client.checkExists().forPath("/small").getNumChildren();
- }
- Assert.assertEquals(count, 0);
- }
- finally
- {
- CloseableUtils.closeQuietly(reaper);
- CloseableUtils.closeQuietly(client);
- }
- }
- finally
- {
- System.clearProperty("jute.maxbuffer");
- }
- }
-
- @Test
- public void testSomeNodes() throws Exception
- {
- Timing timing = new Timing();
- ChildReaper reaper = null;
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- try
- {
- client.start();
-
- Random r = new Random();
- int nonEmptyNodes = 0;
- for ( int i = 0; i < 10; ++i )
- {
- client.create().creatingParentsIfNeeded().forPath("/test/" + Integer.toString(i));
- if ( r.nextBoolean() )
- {
- client.create().forPath("/test/" + Integer.toString(i) + "/foo");
- ++nonEmptyNodes;
- }
- }
-
- reaper = new ChildReaper(client, "/test", Reaper.Mode.REAP_UNTIL_DELETE, 1);
- reaper.start();
-
- timing.forWaiting().sleepABit();
-
- Stat stat = client.checkExists().forPath("/test");
- Assert.assertEquals(stat.getNumChildren(), nonEmptyNodes);
- }
- finally
- {
- CloseableUtils.closeQuietly(reaper);
- CloseableUtils.closeQuietly(client);
- }
- }
-
- @Test
- public void testSimple() throws Exception
- {
- Timing timing = new Timing();
- ChildReaper reaper = null;
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- try
- {
- client.start();
-
- for ( int i = 0; i < 10; ++i )
- {
- client.create().creatingParentsIfNeeded().forPath("/test/" + Integer.toString(i));
- }
-
- reaper = new ChildReaper(client, "/test", Reaper.Mode.REAP_UNTIL_DELETE, 1);
- reaper.start();
-
- timing.forWaiting().sleepABit();
-
- Stat stat = client.checkExists().forPath("/test");
- Assert.assertEquals(stat.getNumChildren(), 0);
- }
- finally
- {
- CloseableUtils.closeQuietly(reaper);
- CloseableUtils.closeQuietly(client);
- }
- }
-
- @Test
- public void testLeaderElection() throws Exception
- {
- Timing timing = new Timing();
- ChildReaper reaper = null;
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- LeaderLatch otherLeader = null;
- try
- {
- client.start();
-
- for ( int i = 0; i < 10; ++i )
- {
- client.create().creatingParentsIfNeeded().forPath("/test/" + Integer.toString(i));
- }
-
- otherLeader = new LeaderLatch(client, "/test-leader");
- otherLeader.start();
- otherLeader.await();
-
- reaper = new ChildReaper(client, "/test", Reaper.Mode.REAP_UNTIL_DELETE, ChildReaper.newExecutorService(), 1, "/test-leader");
- reaper.start();
-
- timing.forWaiting().sleepABit();
-
- //Should not have reaped anything at this point since otherLeader is still leader
- Stat stat = client.checkExists().forPath("/test");
- Assert.assertEquals(stat.getNumChildren(), 10);
-
- CloseableUtils.closeQuietly(otherLeader);
-
- timing.forWaiting().sleepABit();
-
- stat = client.checkExists().forPath("/test");
- Assert.assertEquals(stat.getNumChildren(), 0);
- }
- finally
- {
- CloseableUtils.closeQuietly(reaper);
- if ( otherLeader != null && otherLeader.getState() == LeaderLatch.State.STARTED )
- {
- CloseableUtils.closeQuietly(otherLeader);
- }
- CloseableUtils.closeQuietly(client);
- }
- }
-
- @Test
- public void testMultiPath() throws Exception
- {
- Timing timing = new Timing();
- ChildReaper reaper = null;
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- try
- {
- client.start();
-
- for ( int i = 0; i < 10; ++i )
- {
- client.create().creatingParentsIfNeeded().forPath("/test1/" + Integer.toString(i));
- client.create().creatingParentsIfNeeded().forPath("/test2/" + Integer.toString(i));
- client.create().creatingParentsIfNeeded().forPath("/test3/" + Integer.toString(i));
- }
-
- reaper = new ChildReaper(client, "/test2", Reaper.Mode.REAP_UNTIL_DELETE, 1);
- reaper.start();
- reaper.addPath("/test1");
-
- timing.forWaiting().sleepABit();
-
- Stat stat = client.checkExists().forPath("/test1");
- Assert.assertEquals(stat.getNumChildren(), 0);
- stat = client.checkExists().forPath("/test2");
- Assert.assertEquals(stat.getNumChildren(), 0);
- stat = client.checkExists().forPath("/test3");
- Assert.assertEquals(stat.getNumChildren(), 10);
- }
- finally
- {
- CloseableUtils.closeQuietly(reaper);
- CloseableUtils.closeQuietly(client);
- }
- }
-
- @Test
- public void testNamespace() throws Exception
- {
- Timing timing = new Timing();
- ChildReaper reaper = null;
- CuratorFramework client = CuratorFrameworkFactory.builder()
- .connectString(server.getConnectString())
- .sessionTimeoutMs(timing.session())
- .connectionTimeoutMs(timing.connection())
- .retryPolicy(new RetryOneTime(1))
- .namespace("foo")
- .build();
- try
- {
- client.start();
-
- for ( int i = 0; i < 10; ++i )
- {
- client.create().creatingParentsIfNeeded().forPath("/test/" + Integer.toString(i));
- }
-
- reaper = new ChildReaper(client, "/test", Reaper.Mode.REAP_UNTIL_DELETE, 1);
- reaper.start();
-
- timing.forWaiting().sleepABit();
-
- Stat stat = client.checkExists().forPath("/test");
- Assert.assertEquals(stat.getNumChildren(), 0);
-
- stat = client.usingNamespace(null).checkExists().forPath("/foo/test");
- Assert.assertNotNull(stat);
- Assert.assertEquals(stat.getNumChildren(), 0);
- }
- finally
- {
- CloseableUtils.closeQuietly(reaper);
- CloseableUtils.closeQuietly(client);
- }
- }
-}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java
index e9645e8..95b139d 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java
@@ -26,8 +26,8 @@ import org.apache.curator.framework.imps.TestCleanState;
import org.apache.curator.framework.schema.Schema;
import org.apache.curator.framework.schema.SchemaSet;
import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.Compatibility;
import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.utils.Compatibility;
import org.apache.zookeeper.CreateMode;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -172,7 +172,7 @@ public class TestInterProcessMutex extends TestInterProcessMutexBase
Assert.assertTrue(lock.isAcquiredInThisProcess());
// Kill the session, check that lock node still exists
- Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+ client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
Assert.assertNotNull(client.checkExists().forPath(LOCK_PATH));
// Release the lock and verify that the actual lock node created no longer exists
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java
index 0c62650..afebf8a 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java
@@ -27,11 +27,11 @@ import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Compatibility;
import org.apache.curator.test.TestingServer;
import org.apache.curator.test.Timing;
import org.apache.curator.test.compatibility.Timing2;
import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.utils.Compatibility;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.KeeperException;
import org.testng.Assert;
@@ -200,7 +200,7 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
);
Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
- Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+ client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
Assert.assertTrue(timing.forSessionSleep().acquireSemaphore(semaphore, 1));
}
finally
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
index 50f6bce..ce9f8fa 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
@@ -664,53 +664,6 @@ public class TestInterProcessSemaphore extends BaseClassForTests
}
@Test
- public void testChildReaperCleansUpLockNodes() throws Exception
- {
- Timing timing = new Timing();
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- client.start();
-
- ChildReaper childReaper = null;
- try
- {
- InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test/lock", 1);
- semaphore.returnLease(semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS));
-
- Assert.assertTrue(client.getChildren().forPath("/test").size() > 0);
-
- childReaper = new ChildReaper(
- client,
- "/test",
- Reaper.Mode.REAP_UNTIL_GONE,
- ChildReaper.newExecutorService(),
- 1,
- "/test-leader",
- InterProcessSemaphoreV2.LOCK_SCHEMA
- );
- childReaper.start();
-
- timing.forWaiting().sleepABit();
-
- try
- {
- List<String> children = client.getChildren().forPath("/test");
-
- Assert.assertEquals(children.size(), 0, "All children of /test should have been reaped");
- }
- catch ( KeeperException.NoNodeException ok )
- {
- // this is OK - if Container Nodes are used the "/test" path will go away - no point in updating the test for deprecated code
- }
- }
- finally
- {
- CloseableUtils.closeQuietly(childReaper);
- CloseableUtils.closeQuietly(client);
- }
-
- }
-
- @Test
public void testNoOrphanedNodes() throws Exception
{
final Timing timing = new Timing();
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java
deleted file mode 100644
index c47808f..0000000
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java
+++ /dev/null
@@ -1,647 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.curator.framework.recipes.locks;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.recipes.leader.LeaderLatch;
-import org.apache.curator.framework.recipes.leader.LeaderSelector;
-import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.retry.RetryOneTime;
-import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.test.Timing;
-import org.apache.curator.utils.CloseableUtils;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.data.Stat;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-import java.io.IOException;
-import java.util.Queue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class TestReaper extends BaseClassForTests
-{
- @Test
- public void testUsingLeaderPath() throws Exception
- {
- final Timing timing = new Timing();
- CuratorFramework client = makeClient(timing, null);
- Reaper reaper1 = null;
- Reaper reaper2 = null;
- try
- {
- final AtomicInteger reaper1Count = new AtomicInteger();
- reaper1 = new Reaper(client, Reaper.newExecutorService(), 1, "/reaper/leader")
- {
- @Override
- protected void reap(PathHolder holder)
- {
- reaper1Count.incrementAndGet();
- super.reap(holder);
- }
- };
-
- final AtomicInteger reaper2Count = new AtomicInteger();
- reaper2 = new Reaper(client, Reaper.newExecutorService(), 1, "/reaper/leader")
- {
- @Override
- protected void reap(PathHolder holder)
- {
- reaper2Count.incrementAndGet();
- super.reap(holder);
- }
- };
-
- client.start();
- client.create().creatingParentsIfNeeded().forPath("/one/two/three");
-
- reaper1.start();
- reaper2.start();
-
- reaper1.addPath("/one/two/three");
- reaper2.addPath("/one/two/three");
-
- timing.sleepABit();
-
- Assert.assertTrue((reaper1Count.get() == 0) || (reaper2Count.get() == 0));
- Assert.assertTrue((reaper1Count.get() > 0) || (reaper2Count.get() > 0));
-
- Reaper activeReaper;
- AtomicInteger inActiveReaperCount;
- if ( reaper1Count.get() > 0 )
- {
- activeReaper = reaper1;
- inActiveReaperCount = reaper2Count;
- }
- else
- {
- activeReaper = reaper2;
- inActiveReaperCount = reaper1Count;
- }
- Assert.assertEquals(inActiveReaperCount.get(), 0);
- activeReaper.close();
- timing.sleepABit();
- Assert.assertTrue(inActiveReaperCount.get() > 0);
- }
- finally
- {
- CloseableUtils.closeQuietly(reaper1);
- CloseableUtils.closeQuietly(reaper2);
- CloseableUtils.closeQuietly(client);
- }
- }
-
- @Test
- public void testUsingLeaderLatch() throws Exception
- {
- final Timing timing = new Timing();
- CuratorFramework client = makeClient(timing, null);
- Reaper reaper1 = null;
- Reaper reaper2 = null;
- LeaderLatch leaderLatch1 = null;
- LeaderLatch leaderLatch2 = null;
- try
- {
- final AtomicInteger reaper1Count = new AtomicInteger();
- leaderLatch1 = new LeaderLatch(client, "/reaper/leader");
- reaper1 = new Reaper(client, Reaper.newExecutorService(), 1, leaderLatch1)
- {
- @Override
- protected void reap(PathHolder holder)
- {
- reaper1Count.incrementAndGet();
- super.reap(holder);
- }
- };
-
- final AtomicInteger reaper2Count = new AtomicInteger();
- leaderLatch2 = new LeaderLatch(client, "/reaper/leader");
- reaper2 = new Reaper(client, Reaper.newExecutorService(), 1, leaderLatch2)
- {
- @Override
- protected void reap(PathHolder holder)
- {
- reaper2Count.incrementAndGet();
- super.reap(holder);
- }
- };
-
- client.start();
- client.create().creatingParentsIfNeeded().forPath("/one/two/three");
-
- leaderLatch1.start();
- leaderLatch2.start();
-
- reaper1.start();
- reaper2.start();
-
- reaper1.addPath("/one/two/three");
- reaper2.addPath("/one/two/three");
-
- timing.sleepABit();
-
- Assert.assertTrue((reaper1Count.get() == 0) || (reaper2Count.get() == 0));
- Assert.assertTrue((reaper1Count.get() > 0) || (reaper2Count.get() > 0));
-
- Reaper activeReaper;
- LeaderLatch activeLeaderLeatch;
- AtomicInteger inActiveReaperCount;
- if ( reaper1Count.get() > 0 )
- {
- activeReaper = reaper1;
- activeLeaderLeatch = leaderLatch1;
- inActiveReaperCount = reaper2Count;
- }
- else
- {
- activeReaper = reaper2;
- activeLeaderLeatch = leaderLatch2;
- inActiveReaperCount = reaper1Count;
- }
- Assert.assertEquals(inActiveReaperCount.get(), 0);
- activeReaper.close();
- activeLeaderLeatch.close();
- timing.sleepABit();
- Assert.assertTrue(inActiveReaperCount.get() > 0);
- }
- finally
- {
- CloseableUtils.closeQuietly(reaper1);
- CloseableUtils.closeQuietly(reaper2);
- if (leaderLatch1 != null && LeaderLatch.State.STARTED == leaderLatch1.getState()) {
- CloseableUtils.closeQuietly(leaderLatch1);
- }
- if (leaderLatch2 != null && LeaderLatch.State.STARTED == leaderLatch2.getState()) {
- CloseableUtils.closeQuietly(leaderLatch2);
- }
- CloseableUtils.closeQuietly(client);
- }
- }
-
- @Test
- public void testUsingManualLeader() throws Exception
- {
- final Timing timing = new Timing();
- final CuratorFramework client = makeClient(timing, null);
- final CountDownLatch latch = new CountDownLatch(1);
- LeaderSelectorListener listener = new LeaderSelectorListener()
- {
- @Override
- public void takeLeadership(CuratorFramework client) throws Exception
- {
- Reaper reaper = new Reaper(client, 1);
- try
- {
- reaper.addPath("/one/two/three", Reaper.Mode.REAP_UNTIL_DELETE);
- reaper.start();
-
- timing.sleepABit();
- latch.countDown();
- }
- finally
- {
- CloseableUtils.closeQuietly(reaper);
- }
- }
-
- @Override
- public void stateChanged(CuratorFramework client, ConnectionState newState)
- {
- }
- };
- LeaderSelector selector = new LeaderSelector(client, "/leader", listener);
- try
- {
- client.start();
- client.create().creatingParentsIfNeeded().forPath("/one/two/three");
-
- Assert.assertNotNull(client.checkExists().forPath("/one/two/three"));
-
- selector.start();
- timing.awaitLatch(latch);
-
- Assert.assertNull(client.checkExists().forPath("/one/two/three"));
- }
- finally
- {
- CloseableUtils.closeQuietly(selector);
- CloseableUtils.closeQuietly(client);
- }
- }
-
- @Test
- public void testSparseUseNoReap() throws Exception
- {
- final int THRESHOLD = 3000;
-
- Timing timing = new Timing();
- Reaper reaper = null;
- CuratorFramework client = makeClient(timing, null);
- try
- {
- client.start();
- client.create().creatingParentsIfNeeded().forPath("/one/two/three");
-
- Assert.assertNotNull(client.checkExists().forPath("/one/two/three"));
-
- final Queue<Reaper.PathHolder> holders = new ConcurrentLinkedQueue<Reaper.PathHolder>();
- final ExecutorService pool = Executors.newCachedThreadPool();
- ScheduledExecutorService service = new ScheduledThreadPoolExecutor(1);
-
- reaper = new Reaper(client, service, THRESHOLD)
- {
- @Override
- protected Future<Void> schedule(final PathHolder pathHolder, int reapingThresholdMs)
- {
- holders.add(pathHolder);
- final Future<?> f = super.schedule(pathHolder, reapingThresholdMs);
- pool.submit(new Callable<Void>()
- {
- @Override
- public Void call() throws Exception
- {
- f.get();
- holders.remove(pathHolder);
- return null;
- }
- }
- );
- return null;
- }
- };
- reaper.start();
- reaper.addPath("/one/two/three");
-
- long start = System.currentTimeMillis();
- boolean emptyCountIsCorrect = false;
- while ( ((System.currentTimeMillis() - start) < timing.forWaiting().milliseconds()) && !emptyCountIsCorrect ) // need to loop as the Holder can go in/out of the Reaper's DelayQueue
- {
- for ( Reaper.PathHolder holder : holders )
- {
- if ( holder.path.endsWith("/one/two/three") )
- {
- emptyCountIsCorrect = (holder.emptyCount > 0);
- break;
- }
- }
- Thread.sleep(1);
- }
- Assert.assertTrue(emptyCountIsCorrect);
-
- client.create().forPath("/one/two/three/foo");
-
- Thread.sleep(2 * (THRESHOLD / Reaper.EMPTY_COUNT_THRESHOLD));
- Assert.assertNotNull(client.checkExists().forPath("/one/two/three"));
- client.delete().forPath("/one/two/three/foo");
-
- Thread.sleep(THRESHOLD);
- timing.sleepABit();
-
- Assert.assertNull(client.checkExists().forPath("/one/two/three"));
- }
- finally
- {
- CloseableUtils.closeQuietly(reaper);
- CloseableUtils.closeQuietly(client);
- }
- }
-
- @Test
- public void testReapUntilDelete() throws Exception
- {
- testReapUntilDelete(null);
- }
-
- @Test
- public void testReapUntilDeleteNamespace() throws Exception
- {
- testReapUntilDelete("test");
- }
-
- @Test
- public void testReapUntilGone() throws Exception
- {
- testReapUntilGone(null);
- }
-
- @Test
- public void testReapUntilGoneNamespace() throws Exception
- {
- testReapUntilGone("test");
- }
-
- @Test
- public void testRemove() throws Exception
- {
- testRemove(null);
- }
-
- @Test
- public void testRemoveNamespace() throws Exception
- {
- testRemove("test");
- }
-
- @Test
- public void testSimulationWithLocks() throws Exception
- {
- testSimulationWithLocks(null);
- }
-
- @Test
- public void testSimulationWithLocksNamespace() throws Exception
- {
- testSimulationWithLocks("test");
- }
-
- @Test
- public void testWithEphemerals() throws Exception
- {
- testWithEphemerals(null);
- }
-
- @Test
- public void testWithEphemeralsNamespace() throws Exception
- {
- testWithEphemerals("test");
- }
-
- @Test
- public void testBasic() throws Exception
- {
- testBasic(null);
- }
-
- @Test
- public void testBasicNamespace() throws Exception
- {
- testBasic("test");
- }
-
- private void testReapUntilDelete(String namespace) throws Exception
- {
- Timing timing = new Timing();
- Reaper reaper = null;
- CuratorFramework client = makeClient(timing, namespace);
- try
- {
- client.start();
- client.create().creatingParentsIfNeeded().forPath("/one/two/three");
-
- Assert.assertNotNull(client.checkExists().forPath("/one/two/three"));
-
- reaper = new Reaper(client, 100);
- reaper.start();
-
- reaper.addPath("/one/two/three", Reaper.Mode.REAP_UNTIL_DELETE);
- timing.sleepABit();
-
- Assert.assertNull(client.checkExists().forPath("/one/two/three"));
-
- client.create().forPath("/one/two/three");
- timing.sleepABit();
- Assert.assertNotNull(client.checkExists().forPath("/one/two/three"));
- }
- finally
- {
- CloseableUtils.closeQuietly(reaper);
- CloseableUtils.closeQuietly(client);
- }
- }
-
- private void testReapUntilGone(String namespace) throws Exception
- {
- Timing timing = new Timing();
- Reaper reaper = null;
- CuratorFramework client = makeClient(timing, namespace);
- try
- {
- client.start();
-
- reaper = new Reaper(client, 100);
- reaper.start();
-
- reaper.addPath("/one/two/three", Reaper.Mode.REAP_UNTIL_GONE);
- timing.sleepABit();
-
- client.create().creatingParentsIfNeeded().forPath("/one/two/three");
- Assert.assertNotNull(client.checkExists().forPath("/one/two/three"));
-
- reaper.addPath("/one/two/three", Reaper.Mode.REAP_UNTIL_GONE);
- timing.sleepABit();
-
- Assert.assertNull(client.checkExists().forPath("/one/two/three"));
- }
- finally
- {
- CloseableUtils.closeQuietly(reaper);
- CloseableUtils.closeQuietly(client);
- }
- }
-
- private CuratorFramework makeClient(Timing timing, String namespace) throws IOException
- {
- CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().connectionTimeoutMs(timing.connection()).sessionTimeoutMs(timing.session()).connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1));
- if ( namespace != null )
- {
- builder = builder.namespace(namespace);
- }
- return builder.build();
- }
-
- private void testRemove(String namespace) throws Exception
- {
- Timing timing = new Timing();
- Reaper reaper = null;
- CuratorFramework client = makeClient(timing, namespace);
- try
- {
- client.start();
- client.create().creatingParentsIfNeeded().forPath("/one/two/three");
-
- Assert.assertNotNull(client.checkExists().forPath("/one/two/three"));
-
- reaper = new Reaper(client, 100);
- reaper.start();
-
- reaper.addPath("/one/two/three");
- timing.sleepABit();
-
- Assert.assertNull(client.checkExists().forPath("/one/two/three"));
-
- Assert.assertTrue(reaper.removePath("/one/two/three"));
-
- client.create().forPath("/one/two/three");
- timing.sleepABit();
- Assert.assertNotNull(client.checkExists().forPath("/one/two/three"));
- }
- finally
- {
- CloseableUtils.closeQuietly(reaper);
- CloseableUtils.closeQuietly(client);
- }
- }
-
- private void testSimulationWithLocks(String namespace) throws Exception
- {
- final int LOCK_CLIENTS = 10;
- final int ITERATIONS = 250;
- final int MAX_WAIT_MS = 10;
-
- ExecutorService service = Executors.newFixedThreadPool(LOCK_CLIENTS);
- ExecutorCompletionService<Object> completionService = new ExecutorCompletionService<Object>(service);
-
- Timing timing = new Timing();
- Reaper reaper = null;
- final CuratorFramework client = makeClient(timing, namespace);
- try
- {
- client.start();
-
- reaper = new Reaper(client, MAX_WAIT_MS / 2);
- reaper.start();
- reaper.addPath("/a/b");
-
- for ( int i = 0; i < LOCK_CLIENTS; ++i )
- {
- completionService.submit(new Callable<Object>()
- {
- @Override
- public Object call() throws Exception
- {
- final InterProcessMutex lock = new InterProcessMutex(client, "/a/b");
- for ( int i = 0; i < ITERATIONS; ++i )
- {
- lock.acquire();
- try
- {
- Thread.sleep((int)(Math.random() * MAX_WAIT_MS));
- }
- finally
- {
- lock.release();
- }
- }
- return null;
- }
- }
- );
- }
-
- for ( int i = 0; i < LOCK_CLIENTS; ++i )
- {
- completionService.take().get();
- }
-
- Thread.sleep(timing.session());
- timing.sleepABit();
-
- Stat stat = client.checkExists().forPath("/a/b");
- Assert.assertNull(stat, "Child qty: " + ((stat != null) ? stat.getNumChildren() : 0));
- }
- finally
- {
- service.shutdownNow();
- CloseableUtils.closeQuietly(reaper);
- CloseableUtils.closeQuietly(client);
- }
- }
-
- private void testWithEphemerals(String namespace) throws Exception
- {
- Timing timing = new Timing();
- Reaper reaper = null;
- CuratorFramework client2 = null;
- CuratorFramework client = makeClient(timing, namespace);
- try
- {
- client.start();
- client.create().creatingParentsIfNeeded().forPath("/one/two/three");
-
- Assert.assertNotNull(client.checkExists().forPath("/one/two/three"));
-
- client2 = makeClient(timing, namespace);
- client2.start();
- for ( int i = 0; i < 10; ++i )
- {
- client2.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/one/two/three/foo-");
- }
-
- reaper = new Reaper(client, 100);
- reaper.start();
-
- reaper.addPath("/one/two/three");
- timing.sleepABit();
-
- Assert.assertNotNull(client.checkExists().forPath("/one/two/three"));
-
- client2.close(); // should clear ephemerals
- client2 = null;
-
- Thread.sleep(timing.session());
- timing.sleepABit();
-
- Assert.assertNull(client.checkExists().forPath("/one/two/three"));
- }
- finally
- {
- CloseableUtils.closeQuietly(reaper);
- CloseableUtils.closeQuietly(client2);
- CloseableUtils.closeQuietly(client);
- }
- }
-
- private void testBasic(String namespace) throws Exception
- {
- Timing timing = new Timing();
- Reaper reaper = null;
- CuratorFramework client = makeClient(timing, namespace);
- try
- {
- client.start();
- client.create().creatingParentsIfNeeded().forPath("/one/two/three");
-
- Assert.assertNotNull(client.checkExists().forPath("/one/two/three"));
-
- reaper = new Reaper(client, 100);
- reaper.start();
-
- reaper.addPath("/one/two/three");
- timing.sleepABit();
-
- Assert.assertNull(client.checkExists().forPath("/one/two/three"));
- }
- finally
- {
- CloseableUtils.closeQuietly(reaper);
- CloseableUtils.closeQuietly(client);
- }
- }
-}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
index 87585af..906282f 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
@@ -31,9 +31,9 @@ import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Compatibility;
import org.apache.curator.test.compatibility.Timing2;
import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.utils.Compatibility;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
@@ -329,7 +329,7 @@ public class TestPersistentEphemeralNode extends BaseClassForTests
observer.checkExists().usingWatcher(deletedTrigger).forPath(node.getActualPath());
node.debugCreateNodeLatch = new CountDownLatch(1);
- Compatibility.injectSessionExpiration(curator.getZookeeperClient().getZooKeeper());
+ curator.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
// Make sure the node got deleted
assertTrue(deletedTrigger.firedWithin(timing.forSessionSleep().seconds(), TimeUnit.SECONDS));
@@ -359,7 +359,7 @@ public class TestPersistentEphemeralNode extends BaseClassForTests
observer.checkExists().usingWatcher(deletedTrigger).forPath(node.getActualPath());
node.debugCreateNodeLatch = new CountDownLatch(1);
- Compatibility.injectSessionExpiration(curator.getZookeeperClient().getZooKeeper());
+ curator.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
// Make sure the node got deleted...
assertTrue(deletedTrigger.firedWithin(timing.forSessionSleep().seconds(), TimeUnit.SECONDS));
@@ -400,7 +400,7 @@ public class TestPersistentEphemeralNode extends BaseClassForTests
node.debugCreateNodeLatch = new CountDownLatch(1);
// Kill the session, thus cleaning up the node...
- Compatibility.injectSessionExpiration(curator.getZookeeperClient().getZooKeeper());
+ curator.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
// Make sure the node ended up getting deleted...
assertTrue(deletionTrigger.firedWithin(timing.multiple(1.5).forSessionSleep().seconds(), TimeUnit.SECONDS));
@@ -443,7 +443,7 @@ public class TestPersistentEphemeralNode extends BaseClassForTests
Trigger deletedTrigger = Trigger.deletedOrSetData();
observer.checkExists().usingWatcher(deletedTrigger).forPath(node.getActualPath());
- Compatibility.injectSessionExpiration(nodeCreator.getZookeeperClient().getZooKeeper());
+ nodeCreator.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
// Make sure the node got deleted...
assertTrue(deletedTrigger.firedWithin(timing.forWaiting().seconds(), TimeUnit.SECONDS));
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentTtlNode.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentTtlNode.java
index 360c876..b95df11 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentTtlNode.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentTtlNode.java
@@ -26,7 +26,6 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.Timing;
import org.apache.curator.test.compatibility.CuratorTestBase;
-import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
import org.apache.curator.utils.ZKPaths;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -38,7 +37,6 @@ import java.util.concurrent.TimeUnit;
import static org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode.BUILD_INITIAL_CACHE;
-@Test(groups = Zk35MethodInterceptor.zk35Group)
public class TestPersistentTtlNode extends CuratorTestBase
{
private final Timing timing = new Timing();
diff --git a/curator-test/src/main/java/org/apache/curator/test/compatibility/CuratorTestBase.java b/curator-test/src/main/java/org/apache/curator/test/compatibility/CuratorTestBase.java
index a3c2a29..830db1f 100644
--- a/curator-test/src/main/java/org/apache/curator/test/compatibility/CuratorTestBase.java
+++ b/curator-test/src/main/java/org/apache/curator/test/compatibility/CuratorTestBase.java
@@ -21,7 +21,6 @@ package org.apache.curator.test.compatibility;
import org.apache.curator.test.BaseClassForTests;
import org.testng.annotations.Listeners;
-@Listeners(Zk35MethodInterceptor.class)
public class CuratorTestBase extends BaseClassForTests
{
protected final Timing2 timing = new Timing2();
diff --git a/curator-test/src/main/java/org/apache/curator/test/compatibility/Zk35MethodInterceptor.java b/curator-test/src/main/java/org/apache/curator/test/compatibility/Zk35MethodInterceptor.java
deleted file mode 100644
index 8072b68..0000000
--- a/curator-test/src/main/java/org/apache/curator/test/compatibility/Zk35MethodInterceptor.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.test.compatibility;
-
-import org.apache.curator.test.Compatibility;
-import org.testng.IMethodInstance;
-import org.testng.IMethodInterceptor;
-import org.testng.ITestContext;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-public class Zk35MethodInterceptor implements IMethodInterceptor
-{
- public static final String zk35Group = "zk35";
-
- @Override
- public List<IMethodInstance> intercept(List<IMethodInstance> methods, ITestContext context)
- {
- if ( !Compatibility.isZK34() )
- {
- return methods;
- }
-
- List<IMethodInstance> filteredMethods = new ArrayList<>();
- for ( IMethodInstance method : methods )
- {
- if ( !isInGroup(method.getMethod().getGroups()) )
- {
- filteredMethods.add(method);
- }
- }
- return filteredMethods;
- }
-
- private boolean isInGroup(String[] groups)
- {
- return (groups != null) && Arrays.asList(groups).contains(zk35Group);
- }
-}
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
index b95e92d..5c015f4 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
@@ -20,7 +20,7 @@ package org.apache.curator.x.async.modeled.details;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.listen.Listenable;
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.StandardListenerManager;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
@@ -28,10 +28,10 @@ import org.apache.curator.utils.ThreadUtils;
import org.apache.curator.x.async.api.CreateOption;
import org.apache.curator.x.async.modeled.ModelSerializer;
import org.apache.curator.x.async.modeled.ModelSpec;
+import org.apache.curator.x.async.modeled.ZNode;
import org.apache.curator.x.async.modeled.ZPath;
import org.apache.curator.x.async.modeled.cached.ModeledCache;
import org.apache.curator.x.async.modeled.cached.ModeledCacheListener;
-import org.apache.curator.x.async.modeled.ZNode;
import org.apache.zookeeper.data.Stat;
import java.util.AbstractMap;
import java.util.Map;
@@ -45,7 +45,7 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
private final TreeCache cache;
private final Map<ZPath, Entry<T>> entries = new ConcurrentHashMap<>();
private final ModelSerializer<T> serializer;
- private final ListenerContainer<ModeledCacheListener<T>> listenerContainer = new ListenerContainer<>();
+ private final StandardListenerManager<ModeledCacheListener<T>> listenerContainer = StandardListenerManager.standard();
private final ZPath basePath;
private static final class Entry<T>
@@ -144,10 +144,7 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
{
ThreadUtils.checkInterrupted(e);
- listenerContainer.forEach(l -> {
- l.handleException(e);
- return null;
- });
+ listenerContainer.forEach(l -> l.handleException(e));
}
}
@@ -188,10 +185,7 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
case INITIALIZED:
{
- listenerContainer.forEach(l -> {
- l.initialized();
- return null;
- });
+ listenerContainer.forEach(ModeledCacheListener::initialized);
break;
}
@@ -203,9 +197,6 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
private void accept(ModeledCacheListener.Type type, ZPath path, Stat stat, T model)
{
- listenerContainer.forEach(l -> {
- l.accept(type, path, stat, model);
- return null;
- });
+ listenerContainer.forEach(l -> l.accept(type, path, stat, model));
}
}
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
index 05df301..c154836 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.curator.x.discovery.details;
import com.google.common.annotations.VisibleForTesting;
@@ -23,14 +24,14 @@ import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import org.apache.curator.utils.CloseableExecutorService;
-import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.StandardListenerManager;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.utils.CloseableExecutorService;
+import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.curator.x.discovery.ServiceCache;
import org.apache.curator.x.discovery.ServiceInstance;
@@ -45,17 +46,15 @@ import java.util.concurrent.atomic.AtomicReference;
public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheListener
{
- private final ListenerContainer<ServiceCacheListener> listenerContainer = new ListenerContainer<ServiceCacheListener>();
- private final ServiceDiscoveryImpl<T> discovery;
- private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
- private final PathChildrenCache cache;
- private final ConcurrentMap<String, ServiceInstance<T>> instances = Maps.newConcurrentMap();
+ private final StandardListenerManager<ServiceCacheListener> listenerContainer = StandardListenerManager.standard();
+ private final ServiceDiscoveryImpl<T> discovery;
+ private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+ private final PathChildrenCache cache;
+ private final ConcurrentMap<String, ServiceInstance<T>> instances = Maps.newConcurrentMap();
private enum State
{
- LATENT,
- STARTED,
- STOPPED
+ LATENT, STARTED, STOPPED
}
private static CloseableExecutorService convertThreadFactory(ThreadFactory threadFactory)
@@ -123,18 +122,7 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
{
Preconditions.checkState(state.compareAndSet(State.STARTED, State.STOPPED), "Already closed or has not been started");
- listenerContainer.forEach
- (
- new Function<ServiceCacheListener, Void>()
- {
- @Override
- public Void apply(ServiceCacheListener listener)
- {
- discovery.getClient().getConnectionStateListenable().removeListener(listener);
- return null;
- }
- }
- );
+ listenerContainer.forEach(l -> discovery.getClient().getConnectionStateListenable().removeListener(l));
listenerContainer.clear();
CloseableUtils.closeQuietly(cache);
@@ -166,39 +154,28 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
{
- boolean notifyListeners = false;
+ boolean notifyListeners = false;
switch ( event.getType() )
{
- case CHILD_ADDED:
- case CHILD_UPDATED:
- {
- addInstance(event.getData(), false);
- notifyListeners = true;
- break;
- }
+ case CHILD_ADDED:
+ case CHILD_UPDATED:
+ {
+ addInstance(event.getData(), false);
+ notifyListeners = true;
+ break;
+ }
- case CHILD_REMOVED:
- {
- instances.remove(instanceIdFromData(event.getData()));
- notifyListeners = true;
- break;
- }
+ case CHILD_REMOVED:
+ {
+ instances.remove(instanceIdFromData(event.getData()));
+ notifyListeners = true;
+ break;
+ }
}
if ( notifyListeners )
{
- listenerContainer.forEach
- (
- new Function<ServiceCacheListener, Void>()
- {
- @Override
- public Void apply(ServiceCacheListener listener)
- {
- listener.cacheChanged();
- return null;
- }
- }
- );
+ listenerContainer.forEach(ServiceCacheListener::cacheChanged);
}
}
@@ -209,8 +186,8 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
private void addInstance(ChildData childData, boolean onlyIfAbsent) throws Exception
{
- String instanceId = instanceIdFromData(childData);
- ServiceInstance<T> serviceInstance = discovery.getSerializer().deserialize(childData.getData());
+ String instanceId = instanceIdFromData(childData);
+ ServiceInstance<T> serviceInstance = discovery.getSerializer().deserialize(childData.getData());
if ( onlyIfAbsent )
{
instances.putIfAbsent(instanceId, serviceInstance);
diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
index 54719a5..7eecf3c 100644
--- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
+++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
@@ -25,9 +25,9 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Compatibility;
import org.apache.curator.test.Timing;
import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.utils.Compatibility;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
@@ -79,7 +79,7 @@ public class TestServiceDiscovery extends BaseClassForTests
timing.acquireSemaphore(semaphore, 2);
Assert.assertEquals(discovery.queryForInstances("test").size(), 2);
- Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+ client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
server.stop();
server.restart();
@@ -121,7 +121,7 @@ public class TestServiceDiscovery extends BaseClassForTests
timing.acquireSemaphore(semaphore);
Assert.assertEquals(discovery.queryForInstances("test").size(), 1);
- Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+ client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
server.stop();
server.restart();
@@ -154,7 +154,7 @@ public class TestServiceDiscovery extends BaseClassForTests
Assert.assertEquals(discovery.queryForInstances("test").size(), 1);
- Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+ client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
Thread.sleep(timing.multiple(1.5).session());
Assert.assertEquals(discovery.queryForInstances("test").size(), 1);
diff --git a/pom.xml b/pom.xml
index 43f2d561..b95a2bc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -317,7 +317,6 @@
<module>curator-x-discovery</module>
<module>curator-x-discovery-server</module>
<module>curator-x-async</module>
- <module>curator-test-zk34</module>
</modules>
<dependencyManagement>
@@ -873,11 +872,6 @@
<relocation>
<pattern>com.google</pattern>
<shadedPattern>org.apache.curator.shaded.com.google</shadedPattern>
- <excludes>
- <exclude>com.google.common.base.Function</exclude>
- <exclude>com.google.common.base.Predicate</exclude>
- <exclude>com.google.common.reflect.TypeToken</exclude>
- </excludes>
</relocation>
</relocations>
<artifactSet>
diff --git a/src/site/confluence/exhibitor.confluence b/src/site/confluence/exhibitor.confluence
deleted file mode 100644
index 156ff50..0000000
--- a/src/site/confluence/exhibitor.confluence
+++ /dev/null
@@ -1,24 +0,0 @@
-h1. Exhibitor Integration
-
-Curator can be integrated with Netflix [[Exhibitor|https://github.com/Netflix/exhibitor]] to achieve a live/updating list
-of the ZooKeeper ensemble. This means that your ZooKeeper client (i.e. Curator) will automatically/dynamically adjust
-to changes in the makeup of a ZooKeeper ensemble. Further, this support is generalized so that a service other than Exhibitor could be used if available.
-
-h2. Enabling
-
-The integration is enabled via the {{CuratorFrameworkFactory}} (or when constructing the {{CuratorZookeeperClient}} if not
-using the framework). Pass an {{EnsembleProvider}} to the {{ensembleProvider()}} method of the {{CuratorFrameworkFactory}} builder.
-
-h2. EnsembleProvider
-
-For Exhibitor, construct an instance of {{ExhibitorEnsembleProvider}} to pass to the builder. It takes a number of arguments:
-* exhibitors \- a list of the exhibitor instances. This is the initial set of Exhibitor instances. This set will get automatically updated if it changes.
-* restClient \- a simple facade to access the Exhibitor instances via REST. Use the provided {{DefaultExhibitorRestClient}} or something else of your choosing.
-* restUriPath \- the REST path to use. In most cases this should be: {{/exhibitor/v1/cluster/list}}
-* pollingMs \- how often to poll the Exhibitor instances
-* retryPolicy \- the retry policy to use when polling the Exhibitor instances.
-
-h2. Details
-
-Once configured, Curator will poll the Exhibitors for changes in the ensemble. If Curator should need to re\-create the ZooKeeper
-instance (due to a SysDisconnected event, etc.) it will use the updated ensemble list to do so.
diff --git a/src/site/confluence/zk-compatibility.confluence b/src/site/confluence/zk-compatibility-34.confluence
similarity index 68%
rename from src/site/confluence/zk-compatibility.confluence
rename to src/site/confluence/zk-compatibility-34.confluence
index ed6e32e..911a642 100644
--- a/src/site/confluence/zk-compatibility.confluence
+++ b/src/site/confluence/zk-compatibility-34.confluence
@@ -1,18 +1,8 @@
-h1. ZooKeeper Version Compatibility
+h1. ZooKeeper Version 3.4.x Compatibility
-While ZooKeeper 3.5.x is still considered "beta" by the ZooKeeper development team, the reality is that it is
-used in production by many users. However, ZooKeeper 3.4.x is also used in production. Prior to Apache Curator
-4.0, both versions of ZooKeeper were supported via two versions of Apache Curator. Starting with Curator 4.0
-both versions of ZooKeeper are supported via the same Curator libraries.
-
-h2. ZooKeeper 3.5.x
-
-* Curator 4.0 has a hard dependency on ZooKeeper 3.5.x
-* If you are using ZooKeeper 3.5.x there's nothing additional to do \- just use Curator 4.0
-
-h2. ZooKeeper 3.4.x
-
-Curator 4.0 supports ZooKeeper 3.4.x ensembles in a soft\-compatibility mode. To use this mode
+ZooKeeper 3.4.x is now at end-of-life. Consequently, the latest versions of Curator have removed support
+for it. If you wish to use Curator with ZooKeeper 3.4.x you should pin to version 4.2.x of Curator.
+Curator 4.2.x supports ZooKeeper 3.4.x ensembles in a soft\-compatibility mode. To use this mode
you must exclude ZooKeeper when adding Curator to your dependency management tool.
_Maven_
@@ -21,7 +11,7 @@ _Maven_
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
- <version>${curator-version}</version>
+ <version>4.2.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
diff --git a/src/site/site.xml b/src/site/site.xml
index 3ff625f..bdfbce5 100644
--- a/src/site/site.xml
+++ b/src/site/site.xml
@@ -81,10 +81,6 @@
<item name="Schema Support" href="curator-framework/schema.html"/>
</menu>
- <menu name="Compatibility" inherit="top">
- <item name="ZooKeeper Versions" href="zk-compatibility.html"/>
- </menu>
-
<menu name="Low Level" inherit="top">
<item name="Framework" href="curator-framework/index.html"/>
<item name="Utilities" href="utilities.html"/>
@@ -101,7 +97,8 @@
<item name="API Compatibility" href="compatibility.html"/>
<item name="Javadoc" href="apidocs/index.html"/>
<item name="Wiki" href="https://cwiki.apache.org/confluence/display/CURATOR"/>
- <item name="Releases" href="https://cwiki.apache.org/confluence/display/CURATOR/Releases"/>
+ <item name="Releases/Compatibility" href="https://cwiki.apache.org/confluence/display/CURATOR/Releases"/>
+ <item name="ZooKeeper 3.4.x" href="zk-compatibility-34.html"/>
</menu>
<menu name="Extensions" inherit="top">