You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2020/03/14 18:03:58 UTC
[curator] branch master updated: CURATOR-562 - Remove
ConnectionHandlingPolicy - flatten out behavior to match old
StandardConnectionHandlingPolicy
This is an automated email from the ASF dual-hosted git repository.
randgalt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/curator.git
The following commit(s) were added to refs/heads/master by this push:
new 9745ec4 CURATOR-562 - Remove ConnectionHandlingPolicy - flatten out behavior to match old StandardConnectionHandlingPolicy
9745ec4 is described below
commit 9745ec4ac2a8585b859350a9e15e06d59dca19fc
Author: tison <wa...@gmail.com>
AuthorDate: Wed Mar 11 00:25:13 2020 +0800
CURATOR-562 - Remove ConnectionHandlingPolicy - flatten out behavior to match old StandardConnectionHandlingPolicy
---
.../java/org/apache/curator/ConnectionState.java | 74 ++------------
.../org/apache/curator/CuratorZookeeperClient.java | 43 +-------
.../main/java/org/apache/curator/RetryLoop.java | 30 +++++-
.../connection/ConnectionHandlingPolicy.java | 111 ---------------------
.../StandardConnectionHandlingPolicy.java | 94 -----------------
.../curator/connection/ThreadLocalRetryLoop.java | 2 +-
.../java/org/apache/curator/TestEnsurePath.java | 3 -
.../curator/framework/CuratorFrameworkFactory.java | 71 ++++++-------
.../framework/imps/CuratorFrameworkImpl.java | 5 +-
.../curator/framework/state/ConnectionState.java | 12 +--
.../state/TestConnectionStateManager.java | 3 +-
11 files changed, 79 insertions(+), 369 deletions(-)
diff --git a/curator-client/src/main/java/org/apache/curator/ConnectionState.java b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
index 726b418..fda0a4b 100644
--- a/curator-client/src/main/java/org/apache/curator/ConnectionState.java
+++ b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
@@ -18,7 +18,6 @@
*/
package org.apache.curator;
-import org.apache.curator.connection.ConnectionHandlingPolicy;
import org.apache.curator.drivers.EventTrace;
import org.apache.curator.drivers.OperationTrace;
import org.apache.curator.drivers.TracerDriver;
@@ -27,7 +26,6 @@ import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.utils.DebugUtils;
import org.apache.curator.utils.ThreadUtils;
import org.apache.curator.utils.ZookeeperFactory;
-import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
@@ -36,7 +34,6 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.Queue;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -52,22 +49,16 @@ class ConnectionState implements Watcher, Closeable
private final AtomicBoolean isConnected = new AtomicBoolean(false);
private final AtomicInteger lastNegotiatedSessionTimeoutMs = new AtomicInteger(0);
private final EnsembleProvider ensembleProvider;
- private final int sessionTimeoutMs;
- private final int connectionTimeoutMs;
private final AtomicReference<TracerDriver> tracer;
- private final ConnectionHandlingPolicy connectionHandlingPolicy;
private final Queue<Exception> backgroundExceptions = new ConcurrentLinkedQueue<Exception>();
private final Queue<Watcher> parentWatchers = new ConcurrentLinkedQueue<Watcher>();
private final AtomicLong instanceIndex = new AtomicLong();
private volatile long connectionStartMs = 0;
- ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy)
+ ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly)
{
this.ensembleProvider = ensembleProvider;
- this.sessionTimeoutMs = sessionTimeoutMs;
- this.connectionTimeoutMs = connectionTimeoutMs;
this.tracer = tracer;
- this.connectionHandlingPolicy = connectionHandlingPolicy;
if ( parentWatcher != null )
{
parentWatchers.offer(parentWatcher);
@@ -93,7 +84,7 @@ class ConnectionState implements Watcher, Closeable
boolean localIsConnected = isConnected.get();
if ( !localIsConnected )
{
- checkTimeouts();
+ checkNewConnectionString();
}
return handleHolder.getZooKeeper();
@@ -204,64 +195,13 @@ class ConnectionState implements Watcher, Closeable
handleHolder.getZooKeeper(); // initiate connection
}
- private synchronized void checkTimeouts() throws Exception
+ private synchronized void checkNewConnectionString()
{
- final AtomicReference<String> newConnectionString = new AtomicReference<>();
- Callable<String> hasNewConnectionString = new Callable<String>()
- {
- @Override
- public String call()
- {
- newConnectionString.set(handleHolder.getNewConnectionString());
- return newConnectionString.get();
- }
- };
- int lastNegotiatedSessionTimeoutMs = getLastNegotiatedSessionTimeoutMs();
- int useSessionTimeoutMs = (lastNegotiatedSessionTimeoutMs > 0) ? lastNegotiatedSessionTimeoutMs : sessionTimeoutMs;
- ConnectionHandlingPolicy.CheckTimeoutsResult result = connectionHandlingPolicy.checkTimeouts(hasNewConnectionString, connectionStartMs, useSessionTimeoutMs, connectionTimeoutMs);
- switch ( result )
- {
- default:
- case NOP:
- {
- break;
- }
-
- case NEW_CONNECTION_STRING:
- {
- handleNewConnectionString(newConnectionString.get());
- break;
- }
-
- case RESET_CONNECTION:
- {
- if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
- {
- long elapsed = System.currentTimeMillis() - connectionStartMs;
- int maxTimeout = Math.max(useSessionTimeoutMs, connectionTimeoutMs);
- log.warn(String.format("Connection attempt unsuccessful after %d (greater than max timeout of %d). Resetting connection and trying again with a new connection.", elapsed, maxTimeout));
- }
- reset();
- break;
- }
+ final String newConnectionString = handleHolder.getNewConnectionString();
- case CONNECTION_TIMEOUT:
- {
- KeeperException.ConnectionLossException connectionLossException = new CuratorConnectionLossException();
- if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
- {
- long elapsed = System.currentTimeMillis() - connectionStartMs;
- log.error(String.format("Connection timed out for connection string (%s) and timeout (%d) / elapsed (%d)", handleHolder.getConnectionString(), connectionTimeoutMs, elapsed), connectionLossException);
- }
- new EventTrace("connections-timed-out", tracer.get(), getSessionId()).commit();
- throw connectionLossException;
- }
-
- case SESSION_TIMEOUT:
- {
- handleExpiredSession();
- break;
- }
+ if (newConnectionString != null)
+ {
+ handleNewConnectionString(newConnectionString);
}
}
diff --git a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
index 167695f..f266b38 100644
--- a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
+++ b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
@@ -20,8 +20,6 @@
package org.apache.curator;
import com.google.common.base.Preconditions;
-import org.apache.curator.connection.ConnectionHandlingPolicy;
-import org.apache.curator.connection.StandardConnectionHandlingPolicy;
import org.apache.curator.drivers.OperationTrace;
import org.apache.curator.drivers.TracerDriver;
import org.apache.curator.ensemble.EnsembleProvider;
@@ -55,7 +53,6 @@ public class CuratorZookeeperClient implements Closeable
private final int waitForShutdownTimeoutMs;
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicReference<TracerDriver> tracer = new AtomicReference<TracerDriver>(new DefaultTracerDriver());
- private final ConnectionHandlingPolicy connectionHandlingPolicy;
/**
*
@@ -67,7 +64,7 @@ public class CuratorZookeeperClient implements Closeable
*/
public CuratorZookeeperClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy)
{
- this(new DefaultZookeeperFactory(), new FixedEnsembleProvider(connectString), sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, false, new StandardConnectionHandlingPolicy());
+ this(new DefaultZookeeperFactory(), new FixedEnsembleProvider(connectString), sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, false);
}
/**
@@ -79,7 +76,7 @@ public class CuratorZookeeperClient implements Closeable
*/
public CuratorZookeeperClient(EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy)
{
- this(new DefaultZookeeperFactory(), ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, false, new StandardConnectionHandlingPolicy());
+ this(new DefaultZookeeperFactory(), ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, false);
}
/**
@@ -96,7 +93,7 @@ public class CuratorZookeeperClient implements Closeable
*/
public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy, boolean canBeReadOnly)
{
- this(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, canBeReadOnly, new StandardConnectionHandlingPolicy());
+ this(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, 0, watcher, retryPolicy, canBeReadOnly);
}
/**
@@ -104,24 +101,6 @@ public class CuratorZookeeperClient implements Closeable
* @param ensembleProvider the ensemble provider
* @param sessionTimeoutMs session timeout
* @param connectionTimeoutMs connection timeout
- * @param watcher default watcher or null
- * @param retryPolicy the retry policy to use
- * @param canBeReadOnly if true, allow ZooKeeper client to enter
- * read only mode in case of a network partition. See
- * {@link ZooKeeper#ZooKeeper(String, int, Watcher, long, byte[], boolean)}
- * for details
- * @param connectionHandlingPolicy connection handling policy - use one of the pre-defined policies or write your own
- * @since 3.0.0
- */
- public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy) {
- this(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, 0,
- watcher, retryPolicy, canBeReadOnly, connectionHandlingPolicy);
- }
- /**
- * @param zookeeperFactory factory for creating {@link ZooKeeper} instances
- * @param ensembleProvider the ensemble provider
- * @param sessionTimeoutMs session timeout
- * @param connectionTimeoutMs connection timeout
* @param waitForShutdownTimeoutMs default timeout fo close operation
* @param watcher default watcher or null
* @param retryPolicy the retry policy to use
@@ -129,14 +108,12 @@ public class CuratorZookeeperClient implements Closeable
* read only mode in case of a network partition. See
* {@link ZooKeeper#ZooKeeper(String, int, Watcher, long, byte[], boolean)}
* for details
- * @param connectionHandlingPolicy connection handling policy - use one of the pre-defined policies or write your own
* @since 4.0.2
*/
public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider,
int sessionTimeoutMs, int connectionTimeoutMs, int waitForShutdownTimeoutMs, Watcher watcher,
- RetryPolicy retryPolicy, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy)
+ RetryPolicy retryPolicy, boolean canBeReadOnly)
{
- this.connectionHandlingPolicy = connectionHandlingPolicy;
if ( sessionTimeoutMs < connectionTimeoutMs )
{
log.warn(String.format("session timeout [%d] is less than connection timeout [%d]", sessionTimeoutMs, connectionTimeoutMs));
@@ -147,7 +124,7 @@ public class CuratorZookeeperClient implements Closeable
this.connectionTimeoutMs = connectionTimeoutMs;
this.waitForShutdownTimeoutMs = waitForShutdownTimeoutMs;
- state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, tracer, canBeReadOnly, connectionHandlingPolicy);
+ state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs, watcher, tracer, canBeReadOnly);
setRetryPolicy(retryPolicy);
}
@@ -377,16 +354,6 @@ public class CuratorZookeeperClient implements Closeable
}
/**
- * Return the configured connection handling policy
- *
- * @return ConnectionHandlingPolicy
- */
- public ConnectionHandlingPolicy getConnectionHandlingPolicy()
- {
- return connectionHandlingPolicy;
- }
-
- /**
* Return the most recent value of {@link ZooKeeper#getSessionTimeout()} or 0
*
* @return session timeout or 0
diff --git a/curator-client/src/main/java/org/apache/curator/RetryLoop.java b/curator-client/src/main/java/org/apache/curator/RetryLoop.java
index 60b4a35..1720290 100644
--- a/curator-client/src/main/java/org/apache/curator/RetryLoop.java
+++ b/curator-client/src/main/java/org/apache/curator/RetryLoop.java
@@ -18,6 +18,8 @@
*/
package org.apache.curator;
+import org.apache.curator.connection.ThreadLocalRetryLoop;
+import org.apache.curator.utils.ThreadUtils;
import org.apache.zookeeper.KeeperException;
import java.util.concurrent.Callable;
@@ -78,7 +80,33 @@ public abstract class RetryLoop
*/
public static <T> T callWithRetry(CuratorZookeeperClient client, Callable<T> proc) throws Exception
{
- return client.getConnectionHandlingPolicy().callWithRetry(client, proc);
+ client.internalBlockUntilConnectedOrTimedOut();
+
+ T result = null;
+ ThreadLocalRetryLoop threadLocalRetryLoop = new ThreadLocalRetryLoop();
+ RetryLoop retryLoop = threadLocalRetryLoop.getRetryLoop(client::newRetryLoop);
+ try
+ {
+ while ( retryLoop.shouldContinue() )
+ {
+ try
+ {
+ result = proc.call();
+ retryLoop.markComplete();
+ }
+ catch ( Exception e )
+ {
+ ThreadUtils.checkInterrupted(e);
+ retryLoop.takeException(e);
+ }
+ }
+ }
+ finally
+ {
+ threadLocalRetryLoop.release();
+ }
+
+ return result;
}
/**
diff --git a/curator-client/src/main/java/org/apache/curator/connection/ConnectionHandlingPolicy.java b/curator-client/src/main/java/org/apache/curator/connection/ConnectionHandlingPolicy.java
deleted file mode 100644
index 8f6a147..0000000
--- a/curator-client/src/main/java/org/apache/curator/connection/ConnectionHandlingPolicy.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.connection;
-
-import org.apache.curator.CuratorZookeeperClient;
-import org.apache.curator.RetryLoop;
-import java.util.concurrent.Callable;
-
-/**
- * Abstracts connection handling so that Curator can emulate it's old, pre 3.0.0
- * handling and update to newer handling.
- */
-public interface ConnectionHandlingPolicy
-{
- /**
- * <p>
- * Prior to 3.0.0, Curator did not try to manage session expiration
- * other than the functionality provided by ZooKeeper itself. Starting with
- * 3.0.0, Curator has the option of attempting to monitor session expiration
- * above what is provided by ZooKeeper. The percentage returned by this method
- * determines how and if Curator will check for session expiration.
- * </p>
- *
- * <p>
- * If this method returns <tt>0</tt>, Curator does not
- * do any additional checking for session expiration.
- * </p>
- *
- * <p>
- * If a positive number is returned, Curator will check for session expiration
- * as follows: when ZooKeeper sends a Disconnect event, Curator will start a timer.
- * If re-connection is not achieved before the elapsed time exceeds the negotiated
- * session time multiplied by the session expiration percent, Curator will simulate
- * a session expiration. Due to timing/network issues, it is <b>not possible</b> for
- * a client to match the server's session timeout with complete accuracy. Thus, the need
- * for a session expiration percentage.
- * </p>
- *
- * @return a percentage from 0 to 100 (0 implied no extra session checking)
- */
- int getSimulatedSessionExpirationPercent();
-
- /**
- * Called by {@link RetryLoop#callWithRetry(CuratorZookeeperClient, Callable)} to do the work
- * of retrying
- *
- * @param client client
- * @param proc the procedure to retry
- * @return result
- * @throws Exception errors
- */
- <T> T callWithRetry(CuratorZookeeperClient client, Callable<T> proc) throws Exception;
-
- enum CheckTimeoutsResult
- {
- /**
- * Do nothing
- */
- NOP,
-
- /**
- * handle a new connection string
- */
- NEW_CONNECTION_STRING,
-
- /**
- * reset/recreate the internal ZooKeeper connection
- */
- RESET_CONNECTION,
-
- /**
- * handle a connection timeout
- */
- CONNECTION_TIMEOUT,
-
- /**
- * handle a session timeout
- */
- SESSION_TIMEOUT
- }
-
- /**
- * Check timeouts. NOTE: this method is only called when an attempt to access to the ZooKeeper instances
- * is made and the connection has not completed.
- *
- * @param getNewConnectionString proc to call to check if there is a new connection string. Important: the internal state is cleared after
- * this is called so you MUST handle the new connection string if non null is returned
- * @param connectionStartMs the epoch/ms time that the connection was first initiated
- * @param sessionTimeoutMs the configured/negotiated session timeout in milliseconds
- * @param connectionTimeoutMs the configured connection timeout in milliseconds
- * @return result
- * @throws Exception errors
- */
- CheckTimeoutsResult checkTimeouts(Callable<String> getNewConnectionString, long connectionStartMs, int sessionTimeoutMs, int connectionTimeoutMs) throws Exception;
-}
diff --git a/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java b/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
deleted file mode 100644
index 681fc84..0000000
--- a/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.connection;
-
-import com.google.common.base.Preconditions;
-import org.apache.curator.CuratorZookeeperClient;
-import org.apache.curator.RetryLoop;
-import org.apache.curator.utils.ThreadUtils;
-import java.util.concurrent.Callable;
-
-/**
- * Curator's standard connection handling since 3.0.0
- *
- * @since 3.0.0
- */
-public class StandardConnectionHandlingPolicy implements ConnectionHandlingPolicy
-{
- private final int expirationPercent;
-
- public StandardConnectionHandlingPolicy()
- {
- this(100);
- }
-
- public StandardConnectionHandlingPolicy(int expirationPercent)
- {
- Preconditions.checkArgument((expirationPercent > 0) && (expirationPercent <= 100), "expirationPercent must be > 0 and <= 100");
- this.expirationPercent = expirationPercent;
- }
-
- @Override
- public int getSimulatedSessionExpirationPercent()
- {
- return expirationPercent;
- }
-
- @Override
- public <T> T callWithRetry(CuratorZookeeperClient client, Callable<T> proc) throws Exception
- {
- client.internalBlockUntilConnectedOrTimedOut();
-
- T result = null;
- ThreadLocalRetryLoop threadLocalRetryLoop = new ThreadLocalRetryLoop();
- RetryLoop retryLoop = threadLocalRetryLoop.getRetryLoop(client::newRetryLoop);
- try
- {
- while ( retryLoop.shouldContinue() )
- {
- try
- {
- result = proc.call();
- retryLoop.markComplete();
- }
- catch ( Exception e )
- {
- ThreadUtils.checkInterrupted(e);
- retryLoop.takeException(e);
- }
- }
- }
- finally
- {
- threadLocalRetryLoop.release();
- }
-
- return result;
- }
-
- @Override
- public CheckTimeoutsResult checkTimeouts(Callable<String> hasNewConnectionString, long connectionStartMs, int sessionTimeoutMs, int connectionTimeoutMs) throws Exception
- {
- if ( hasNewConnectionString.call() != null )
- {
- return CheckTimeoutsResult.NEW_CONNECTION_STRING;
- }
- return CheckTimeoutsResult.NOP;
- }
-}
diff --git a/curator-client/src/main/java/org/apache/curator/connection/ThreadLocalRetryLoop.java b/curator-client/src/main/java/org/apache/curator/connection/ThreadLocalRetryLoop.java
index 225b967..c7a0c1f 100644
--- a/curator-client/src/main/java/org/apache/curator/connection/ThreadLocalRetryLoop.java
+++ b/curator-client/src/main/java/org/apache/curator/connection/ThreadLocalRetryLoop.java
@@ -142,7 +142,7 @@ public class ThreadLocalRetryLoop
}
/**
- * Must be called to release the retry loop. See {@link org.apache.curator.connection.StandardConnectionHandlingPolicy#callWithRetry(org.apache.curator.CuratorZookeeperClient, java.util.concurrent.Callable)}
+ * Must be called to release the retry loop. See {@link RetryLoop#callWithRetry(org.apache.curator.CuratorZookeeperClient, java.util.concurrent.Callable)}
* for an example usage.
*/
public void release()
diff --git a/curator-client/src/test/java/org/apache/curator/TestEnsurePath.java b/curator-client/src/test/java/org/apache/curator/TestEnsurePath.java
index 8eed2bf..5164bac 100644
--- a/curator-client/src/test/java/org/apache/curator/TestEnsurePath.java
+++ b/curator-client/src/test/java/org/apache/curator/TestEnsurePath.java
@@ -32,7 +32,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-import org.apache.curator.connection.StandardConnectionHandlingPolicy;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.utils.EnsurePath;
import org.apache.zookeeper.ZooKeeper;
@@ -52,7 +51,6 @@ public class TestEnsurePath
CuratorZookeeperClient curator = mock(CuratorZookeeperClient.class);
RetryPolicy retryPolicy = new RetryOneTime(1);
RetryLoop retryLoop = new RetryLoopImpl(retryPolicy, null);
- when(curator.getConnectionHandlingPolicy()).thenReturn(new StandardConnectionHandlingPolicy());
when(curator.getZooKeeper()).thenReturn(client);
when(curator.getRetryPolicy()).thenReturn(retryPolicy);
when(curator.newRetryLoop()).thenReturn(retryLoop);
@@ -78,7 +76,6 @@ public class TestEnsurePath
RetryPolicy retryPolicy = new RetryOneTime(1);
RetryLoop retryLoop = new RetryLoopImpl(retryPolicy, null);
final CuratorZookeeperClient curator = mock(CuratorZookeeperClient.class);
- when(curator.getConnectionHandlingPolicy()).thenReturn(new StandardConnectionHandlingPolicy());
when(curator.getZooKeeper()).thenReturn(client);
when(curator.getRetryPolicy()).thenReturn(retryPolicy);
when(curator.newRetryLoop()).thenReturn(retryLoop);
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
index 887a2aa..c4efef3 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
@@ -19,10 +19,9 @@
package org.apache.curator.framework;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.apache.curator.RetryPolicy;
-import org.apache.curator.connection.ConnectionHandlingPolicy;
-import org.apache.curator.connection.StandardConnectionHandlingPolicy;
import org.apache.curator.ensemble.EnsembleProvider;
import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
import org.apache.curator.framework.api.ACLProvider;
@@ -34,14 +33,12 @@ import org.apache.curator.framework.imps.CuratorTempFrameworkImpl;
import org.apache.curator.framework.imps.DefaultACLProvider;
import org.apache.curator.framework.imps.GzipCompressionProvider;
import org.apache.curator.framework.schema.SchemaSet;
-import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateErrorPolicy;
import org.apache.curator.framework.state.ConnectionStateListenerManagerFactory;
import org.apache.curator.framework.state.StandardConnectionStateErrorPolicy;
import org.apache.curator.utils.DefaultZookeeperFactory;
import org.apache.curator.utils.ZookeeperFactory;
import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.net.InetAddress;
@@ -148,12 +145,12 @@ public class CuratorFrameworkFactory
private boolean canBeReadOnly = false;
private boolean useContainerParentsIfAvailable = true;
private ConnectionStateErrorPolicy connectionStateErrorPolicy = new StandardConnectionStateErrorPolicy();
- private ConnectionHandlingPolicy connectionHandlingPolicy = new StandardConnectionHandlingPolicy();
private SchemaSet schemaSet = SchemaSet.getDefaultSchemaSet();
private boolean zk34CompatibilityMode = isZK34();
private int waitForShutdownTimeoutMs = 0;
private Executor runSafeService = null;
private ConnectionStateListenerManagerFactory connectionStateListenerManagerFactory = ConnectionStateListenerManagerFactory.standard;
+ private int simulatedSessionExpirationPercent = 100;
/**
* Apply the current values and build a new CuratorFramework
@@ -424,46 +421,41 @@ public class CuratorFrameworkFactory
/**
* <p>
- * Change the connection handling policy. The default policy is {@link StandardConnectionHandlingPolicy}.
+ * Prior to 3.0.0, Curator did not try to manage session expiration
+ * other than the functionality provided by ZooKeeper itself. Starting with
+ * 3.0.0, Curator has the option of attempting to monitor session expiration
+ * above what is provided by ZooKeeper. The percentage set by this method
+ * determines how and if Curator will check for session expiration.
* </p>
+ *
+ * <p>
+ * The default percentage is 100.
+ * </p>
+ *
* <p>
- * <strong>IMPORTANT: </strong> StandardConnectionHandlingPolicy has different behavior than the connection
- * policy handling prior to version 3.0.0.
+ * If it is set to <tt>0</tt>, Curator does not do any additional checking
+ * for session expiration.
* </p>
+ *
* <p>
- * Major differences from the older behavior are:
+ * If a positive number is set, Curator will check for session expiration
+ * as follows: when ZooKeeper sends a Disconnect event, Curator will start a timer.
+ * If re-connection is not achieved before the elapsed time exceeds the negotiated
+ * session time multiplied by the session expiration percent, Curator will simulate
+ * a session expiration. Due to timing/network issues, it is <b>not possible</b> for
+ * a client to match the server's session timeout with complete accuracy. Thus, the need
+ * for a session expiration percentage.
* </p>
- * <ul>
- * <li>
- * Session/connection timeouts are no longer managed by the low-level client. They are managed
- * by the CuratorFramework instance. There should be no noticeable differences.
- * </li>
- * <li>
- * Prior to 3.0.0, each iteration of the retry policy would allow the connection timeout to elapse
- * if the connection hadn't yet succeeded. This meant that the true connection timeout was the configured
- * value times the maximum retries in the retry policy. This longstanding issue has been address.
- * Now, the connection timeout can elapse only once for a single API call.
- * </li>
- * <li>
- * <strong>MOST IMPORTANTLY!</strong> Prior to 3.0.0, {@link ConnectionState#LOST} did not imply
- * a lost session (much to the confusion of users). Now,
- * Curator will set the LOST state only when it believes that the ZooKeeper session
- * has expired. ZooKeeper connections have a session. When the session expires, clients must take appropriate
- * action. In Curator, this is complicated by the fact that Curator internally manages the ZooKeeper
- * connection. Now, Curator will set the LOST state when any of the following occurs:
- * a) ZooKeeper returns a {@link Watcher.Event.KeeperState#Expired} or {@link KeeperException.Code#SESSIONEXPIRED};
- * b) Curator closes the internally managed ZooKeeper instance; c) The session timeout
- * elapses during a network partition.
- * </li>
- * </ul>
*
- * @param connectionHandlingPolicy the policy
+ * @param simulatedSessionExpirationPercent new simulated session expiration percentage
* @return this
- * @since 3.0.0
+ * @since 5.0
*/
- public Builder connectionHandlingPolicy(ConnectionHandlingPolicy connectionHandlingPolicy)
- {
- this.connectionHandlingPolicy = connectionHandlingPolicy;
+ public Builder simulatedSessionExpirationPercent(int simulatedSessionExpirationPercent) {
+ Preconditions.checkArgument(
+ (simulatedSessionExpirationPercent > 0) && (simulatedSessionExpirationPercent <= 100),
+ "simulatedSessionExpirationPercent must be > 0 and <= 100");
+ this.simulatedSessionExpirationPercent = simulatedSessionExpirationPercent;
return this;
}
@@ -581,9 +573,8 @@ public class CuratorFrameworkFactory
return connectionStateErrorPolicy;
}
- public ConnectionHandlingPolicy getConnectionHandlingPolicy()
- {
- return connectionHandlingPolicy;
+ public int getSimulatedSessionExpirationPercent() {
+ return simulatedSessionExpirationPercent;
}
public SchemaSet getSchemaSet()
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index e003bf0..46917c8 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -127,8 +127,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
}
},
builder.getRetryPolicy(),
- builder.canBeReadOnly(),
- builder.getConnectionHandlingPolicy()
+ builder.canBeReadOnly()
);
internalConnectionHandler = new StandardInternalConnectionHandler();
@@ -139,7 +138,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
namespace = new NamespaceImpl(this, builder.getNamespace());
threadFactory = getThreadFactory(builder);
maxCloseWaitMs = builder.getMaxCloseWaitMs();
- connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent(), builder.getConnectionStateListenerManagerFactory());
+ connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), builder.getSimulatedSessionExpirationPercent(), builder.getConnectionStateListenerManagerFactory());
compressionProvider = builder.getCompressionProvider();
aclProvider = builder.getAclProvider();
state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
index f9f245a..b836817 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
@@ -18,7 +18,6 @@
*/
package org.apache.curator.framework.state;
-import org.apache.curator.connection.ConnectionHandlingPolicy;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
@@ -50,7 +49,7 @@ public enum ConnectionState
{
return false;
}
- },
+ },
/**
* A suspended, lost, or read-only connection has been re-established
@@ -73,11 +72,6 @@ public enum ConnectionState
* b) Curator closes the internally managed ZooKeeper instance; c) The session timeout
* elapses during a network partition.
* </p>
- *
- * <p>
- * NOTE: see {@link CuratorFrameworkFactory.Builder#connectionHandlingPolicy(ConnectionHandlingPolicy)} for an important note about a
- * change in meaning to LOST since 3.0.0
- * </p>
*/
LOST
{
@@ -103,10 +97,10 @@ public enum ConnectionState
}
;
-
+
/**
* Check if this state indicates that Curator has a connection to ZooKeeper
- *
+ *
* @return True if connected, false otherwise
*/
public abstract boolean isConnected();
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/state/TestConnectionStateManager.java b/curator-framework/src/test/java/org/apache/curator/framework/state/TestConnectionStateManager.java
index c929b41..ff48468 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/state/TestConnectionStateManager.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/state/TestConnectionStateManager.java
@@ -18,7 +18,6 @@
*/
package org.apache.curator.framework.state;
-import org.apache.curator.connection.StandardConnectionHandlingPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryOneTime;
@@ -41,7 +40,7 @@ public class TestConnectionStateManager extends BaseClassForTests {
.sessionTimeoutMs(timing.session())
.retryPolicy(new RetryOneTime(1))
.connectionStateErrorPolicy(new SessionConnectionStateErrorPolicy())
- .connectionHandlingPolicy(new StandardConnectionHandlingPolicy(30))
+ .simulatedSessionExpirationPercent(30)
.build();
// we should get LOST around 30% of a session plus a little "slop" for processing, etc.