You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2015/08/22 00:36:04 UTC

[1/3] curator git commit: First pass at new (optional) definition of state LOST

Repository: curator
Updated Branches:
  refs/heads/CURATOR-247 [created] 62f3c33cd


First pass at new (optional) definition of state LOST


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/344634ac
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/344634ac
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/344634ac

Branch: refs/heads/CURATOR-247
Commit: 344634ac6e34e61bc0cc7b41923a1df4089c7948
Parents: 7d97259
Author: randgalt <ra...@apache.org>
Authored: Fri Aug 21 12:10:24 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Aug 21 12:10:24 2015 -0500

----------------------------------------------------------------------
 .../framework/CuratorFrameworkFactory.java      | 19 +++++
 .../framework/api/UnhandledErrorListener.java   |  4 +-
 .../framework/imps/CuratorFrameworkImpl.java    | 10 ++-
 .../framework/state/ConnectionState.java        | 35 +++++++--
 .../framework/state/ConnectionStateManager.java | 75 +++++++++++++++-----
 5 files changed, 113 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/344634ac/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
index dcb2ee6..6209b06 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
@@ -31,6 +31,7 @@ import org.apache.curator.framework.imps.CuratorFrameworkImpl;
 import org.apache.curator.framework.imps.CuratorTempFrameworkImpl;
 import org.apache.curator.framework.imps.DefaultACLProvider;
 import org.apache.curator.framework.imps.GzipCompressionProvider;
+import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.utils.DefaultZookeeperFactory;
 import org.apache.curator.utils.ZookeeperFactory;
 import org.apache.zookeeper.CreateMode;
@@ -116,6 +117,7 @@ public class CuratorFrameworkFactory
         private ACLProvider aclProvider = DEFAULT_ACL_PROVIDER;
         private boolean canBeReadOnly = false;
         private boolean useContainerParentsIfAvailable = true;
+        private boolean enableSessionExpiredState = false;
 
         /**
          * Apply the current values and build a new CuratorFramework
@@ -343,6 +345,18 @@ public class CuratorFrameworkFactory
             return this;
         }
 
+        /**
+         * Changes the meaning of {@link ConnectionState#LOST} from it's pre Curator 3.0.0 meaning
+         * to a true lost session state. See the {@link ConnectionState#LOST} doc for details.
+         *
+         * @return this
+         */
+        public Builder enableSessionExpiredState()
+        {
+            this.enableSessionExpiredState = true;
+            return this;
+        }
+
         public ACLProvider getAclProvider()
         {
             return aclProvider;
@@ -398,6 +412,11 @@ public class CuratorFrameworkFactory
             return useContainerParentsIfAvailable;
         }
 
+        public boolean getEnableSessionExpiredState()
+        {
+            return enableSessionExpiredState;
+        }
+
         @Deprecated
         public String getAuthScheme()
         {

http://git-wip-us.apache.org/repos/asf/curator/blob/344634ac/curator-framework/src/main/java/org/apache/curator/framework/api/UnhandledErrorListener.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/UnhandledErrorListener.java b/curator-framework/src/main/java/org/apache/curator/framework/api/UnhandledErrorListener.java
index b463af2..3721d4b 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/api/UnhandledErrorListener.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/UnhandledErrorListener.java
@@ -24,9 +24,7 @@ import org.apache.curator.framework.state.ConnectionStateListener;
 public interface UnhandledErrorListener
 {
     /**
-     * Called when an exception is caught in a background thread, handler, etc. Before this
-     * listener is called, the error will have been logged and a {@link ConnectionState#LOST}
-     * event will have been queued for any {@link ConnectionStateListener}s.
+     * Called when an exception is caught in a background thread, handler, etc.
      *
      * @param message Source message
      * @param e exception

http://git-wip-us.apache.org/repos/asf/curator/blob/344634ac/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 41bb7cd..c64fb8f 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -83,6 +83,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
     private final NamespaceFacadeCache namespaceFacadeCache;
     private final NamespaceWatcherMap namespaceWatcherMap = new NamespaceWatcherMap(this);
     private final boolean useContainerParentsIfAvailable;
+    private final boolean enableSessionExpiredState;
 
     private volatile ExecutorService executorService;
     private final AtomicBoolean logAsErrorConnectionErrors = new AtomicBoolean(false);
@@ -119,11 +120,12 @@ public class CuratorFrameworkImpl implements CuratorFramework
         namespace = new NamespaceImpl(this, builder.getNamespace());
         threadFactory = getThreadFactory(builder);
         maxCloseWaitMs = builder.getMaxCloseWaitMs();
-        connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory());
+        connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getEnableSessionExpiredState(), builder.getSessionTimeoutMs());
         compressionProvider = builder.getCompressionProvider();
         aclProvider = builder.getAclProvider();
         state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
         useContainerParentsIfAvailable = builder.useContainerParentsIfAvailable();
+        enableSessionExpiredState = builder.getEnableSessionExpiredState();
 
         byte[] builderDefaultData = builder.getDefaultData();
         defaultData = (builderDefaultData != null) ? Arrays.copyOf(builderDefaultData, builderDefaultData.length) : new byte[0];
@@ -197,6 +199,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
         state = parent.state;
         authInfos = parent.authInfos;
         useContainerParentsIfAvailable = parent.useContainerParentsIfAvailable;
+        enableSessionExpiredState = parent.enableSessionExpiredState;
     }
 
     @Override
@@ -722,7 +725,10 @@ public class CuratorFrameworkImpl implements CuratorFramework
             return;
         }
 
-        doSyncForSuspendedConnection(client.getInstanceIndex());
+        if ( !enableSessionExpiredState )
+        {
+            doSyncForSuspendedConnection(client.getInstanceIndex());
+        }
     }
 
     private void doSyncForSuspendedConnection(final long instanceIndex)

http://git-wip-us.apache.org/repos/asf/curator/blob/344634ac/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
index 3ca1d66..49d0044 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
@@ -19,6 +19,8 @@
 package org.apache.curator.framework.state;
 
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
 
 /**
  * Represents state changes in the connection to ZK
@@ -39,8 +41,7 @@ public enum ConnectionState
 
     /**
      * There has been a loss of connection. Leaders, locks, etc. should suspend
-     * until the connection is re-established. If the connection times-out you will
-     * receive a {@link #LOST} notice
+     * until the connection is re-established.
      */
     SUSPENDED
     {
@@ -62,9 +63,29 @@ public enum ConnectionState
     },
 
     /**
-     * The connection is confirmed to be lost. Close any locks, leaders, etc. and
-     * attempt to re-create them. NOTE: it is possible to get a {@link #RECONNECTED}
-     * state after this but you should still consider any locks, etc. as dirty/unstable
+     * <p>
+     *     NOTE: the meaning of this state depends on how your CuratorFramework instance
+     *     is created.
+     * </p>
+     *
+     * <p>
+     *     The default meaning of LOST (and the only meaning prior to Curator 3.0.0) is:
+     *     The connection is confirmed to be lost (i.e. the retry policy has given up). Close any locks, leaders, etc. and
+     *     attempt to re-create them. NOTE: it is possible to get a {@link #RECONNECTED}
+     *     state after this but you should still consider any locks, etc. as dirty/unstable
+     * </p>
+     *
+     * <p>
+     *     <strong>Since 3.0.0</strong>, you can alter the meaning of LOST by calling
+     *     {@link CuratorFrameworkFactory.Builder#enableSessionExpiredState()}. In this mode,
+     *     Curator will set the LOST state only when it believes that the ZooKeeper session
+     *     has expired. ZooKeeper connections have a session. When the session expires, clients must take appropriate
+     *     action. In Curator, this is complicated by the fact that Curator internally manages the ZooKeeper
+     *     connection. In this mode, Curator will set the LOST state when any of the following occurs:
+     *     a) ZooKeeper returns a {@link Watcher.Event.KeeperState#Expired} or {@link KeeperException.Code#SESSIONEXPIRED};
+     *     b) Curator closes the internally managed ZooKeeper instance; c) The configured session timeout
+     *     elapses during a network partition.
+     * </p>
      */
     LOST
     {
@@ -87,7 +108,9 @@ public enum ConnectionState
         {
             return true;
         }
-    };
+    }
+
+    ;
     
     /**
      * Check if this state indicates that Curator has a connection to ZooKeeper

http://git-wip-us.apache.org/repos/asf/curator/blob/344634ac/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index 67ff13d..c0feb84 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -35,6 +35,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -65,6 +66,8 @@ public class ConnectionStateManager implements Closeable
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final BlockingQueue<ConnectionState> eventQueue = new ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE);
     private final CuratorFramework client;
+    private final boolean enableSessionExpiredState;
+    private final int sessionTimeoutMs;
     private final ListenerContainer<ConnectionStateListener> listeners = new ListenerContainer<ConnectionStateListener>();
     private final AtomicBoolean initialConnectMessageSent = new AtomicBoolean(false);
     private final ExecutorService service;
@@ -72,6 +75,8 @@ public class ConnectionStateManager implements Closeable
 
     // guarded by sync
     private ConnectionState currentConnectionState;
+    // guarded by sync
+    private long startOfSuspendedEpoch = 0;
 
     private enum State
     {
@@ -83,10 +88,14 @@ public class ConnectionStateManager implements Closeable
     /**
      * @param client        the client
      * @param threadFactory thread factory to use or null for a default
+     * @param enableSessionExpiredState if true, applies new meaning for LOST as described here: {@link ConnectionState#LOST}
+     * @param sessionTimeoutMs the ZK session timeout in milliseconds
      */
-    public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory)
+    public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory, boolean enableSessionExpiredState, int sessionTimeoutMs)
     {
         this.client = client;
+        this.enableSessionExpiredState = enableSessionExpiredState;
+        this.sessionTimeoutMs = sessionTimeoutMs;
         if ( threadFactory == null )
         {
             threadFactory = ThreadUtils.newThreadFactory("ConnectionStateManager");
@@ -137,7 +146,7 @@ public class ConnectionStateManager implements Closeable
 
     /**
      * Change to {@link ConnectionState#SUSPENDED} only if not already suspended and not lost
-     * 
+     *
      * @return true if connection is set to SUSPENDED
      */
     public synchronized boolean setToSuspended()
@@ -152,7 +161,7 @@ public class ConnectionStateManager implements Closeable
             return false;
         }
 
-        currentConnectionState = ConnectionState.SUSPENDED;
+        setCurrentConnectionState(ConnectionState.SUSPENDED);
         postState(ConnectionState.SUSPENDED);
 
         return true;
@@ -177,7 +186,7 @@ public class ConnectionStateManager implements Closeable
         {
             return false;
         }
-        currentConnectionState = newConnectionState;
+        setCurrentConnectionState(newConnectionState);
 
         ConnectionState localState = newConnectionState;
         boolean isNegativeMessage = ((newConnectionState == ConnectionState.LOST) || (newConnectionState == ConnectionState.SUSPENDED) || (newConnectionState == ConnectionState.READ_ONLY));
@@ -242,25 +251,34 @@ public class ConnectionStateManager implements Closeable
         {
             while ( !Thread.currentThread().isInterrupted() )
             {
-                final ConnectionState newState = eventQueue.take();
-
-                if ( listeners.size() == 0 )
+                final ConnectionState newState = eventQueue.poll(sessionTimeoutMs, TimeUnit.MILLISECONDS);
+                if ( newState != null )
                 {
-                    log.warn("There are no ConnectionStateListeners registered.");
-                }
+                    if ( listeners.size() == 0 )
+                    {
+                        log.warn("There are no ConnectionStateListeners registered.");
+                    }
 
-                listeners.forEach
-                    (
-                        new Function<ConnectionStateListener, Void>()
-                        {
-                            @Override
-                            public Void apply(ConnectionStateListener listener)
+                    listeners.forEach
+                        (
+                            new Function<ConnectionStateListener, Void>()
                             {
-                                listener.stateChanged(client, newState);
-                                return null;
+                                @Override
+                                public Void apply(ConnectionStateListener listener)
+                                {
+                                    listener.stateChanged(client, newState);
+                                    return null;
+                                }
                             }
-                        }
-                    );
+                        );
+                }
+                else if ( enableSessionExpiredState )
+                {
+                    synchronized(this)
+                    {
+                        checkSessionExpiration();
+                    }
+                }
             }
         }
         catch ( InterruptedException e )
@@ -268,4 +286,23 @@ public class ConnectionStateManager implements Closeable
             Thread.currentThread().interrupt();
         }
     }
+
+    private void checkSessionExpiration()
+    {
+        if ( (currentConnectionState == ConnectionState.SUSPENDED) && (startOfSuspendedEpoch != 0) )
+        {
+            long elapsedMs = System.currentTimeMillis() - startOfSuspendedEpoch;
+            if ( elapsedMs >= sessionTimeoutMs )
+            {
+                log.info(String.format("Session timeout has elapsed while SUSPENDED. Posting LOST event. Elapsed ms: %d", elapsedMs));
+                addStateChange(ConnectionState.LOST);
+            }
+        }
+    }
+
+    private void setCurrentConnectionState(ConnectionState newConnectionState)
+    {
+        currentConnectionState = newConnectionState;
+        startOfSuspendedEpoch = (currentConnectionState == ConnectionState.SUSPENDED) ? System.currentTimeMillis() : 0;
+    }
 }


[3/3] curator git commit: Continued work on new LOST behavior. Added some tests. To get correct behavior it's necessary to not retry connection failures. Retrying connection failures was never a good idea and here's a good opportunity to fix it as this r

Posted by ra...@apache.org.
Continued work on new LOST behavior. Added some tests. To get correct behavior it's necessary to not retry connection failures. Retrying connection failures was never a good idea and here's a good opportunity to fix it as this requires client action to enable


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/62f3c33c
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/62f3c33c
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/62f3c33c

Branch: refs/heads/CURATOR-247
Commit: 62f3c33cdb556eccf6fe1cc87ee74b3458431777
Parents: 2343daf
Author: randgalt <ra...@apache.org>
Authored: Fri Aug 21 17:35:44 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Aug 21 17:35:44 2015 -0500

----------------------------------------------------------------------
 .../org/apache/curator/ConnectionState.java     | 24 ++---
 .../apache/curator/CuratorZookeeperClient.java  | 58 +++++++-----
 .../main/java/org/apache/curator/RetryLoop.java | 12 +++
 .../framework/CuratorFrameworkFactory.java      |  2 +-
 .../framework/imps/CuratorFrameworkImpl.java    | 43 ++++++++-
 .../framework/state/ConnectionState.java        |  5 +
 .../framework/state/ConnectionStateManager.java | 13 ++-
 .../framework/imps/TestBlockUntilConnected.java |  1 +
 .../imps/TestEnabledSessionExpiredState.java    | 99 ++++++++++++++++++++
 .../apache/curator/test/BaseClassForTests.java  | 37 +++++++-
 .../java/org/apache/curator/test/Timing.java    |  2 +-
 11 files changed, 253 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-client/src/main/java/org/apache/curator/ConnectionState.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/ConnectionState.java b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
index d3900a1..1dfdbef 100644
--- a/curator-client/src/main/java/org/apache/curator/ConnectionState.java
+++ b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
@@ -171,6 +171,18 @@ class ConnectionState implements Watcher, Closeable
         return ensembleProvider;
     }
 
+    synchronized void reset() throws Exception
+    {
+        log.debug("reset");
+
+        instanceIndex.incrementAndGet();
+
+        isConnected.set(false);
+        connectionStartMs = System.currentTimeMillis();
+        zooKeeper.closeAndReset();
+        zooKeeper.getZooKeeper();   // initiate connection
+    }
+
     private synchronized void checkTimeouts() throws Exception
     {
         int minTimeout = Math.min(sessionTimeoutMs, connectionTimeoutMs);
@@ -206,18 +218,6 @@ class ConnectionState implements Watcher, Closeable
         }
     }
 
-    private synchronized void reset() throws Exception
-    {
-        log.debug("reset");
-
-        instanceIndex.incrementAndGet();
-
-        isConnected.set(false);
-        connectionStartMs = System.currentTimeMillis();
-        zooKeeper.closeAndReset();
-        zooKeeper.getZooKeeper();   // initiate connection
-    }
-
     private boolean checkState(Event.KeeperState state, boolean wasConnected)
     {
         boolean isConnected = wasConnected;

http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
index 09b28b2..fbb2f4c 100644
--- a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
+++ b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator;
 
 import com.google.common.base.Preconditions;
@@ -43,12 +44,12 @@ import java.util.concurrent.atomic.AtomicReference;
 @SuppressWarnings("UnusedDeclaration")
 public class CuratorZookeeperClient implements Closeable
 {
-    private final Logger                            log = LoggerFactory.getLogger(getClass());
-    private final ConnectionState                   state;
-    private final AtomicReference<RetryPolicy>      retryPolicy = new AtomicReference<RetryPolicy>();
-    private final int                               connectionTimeoutMs;
-    private final AtomicBoolean                     started = new AtomicBoolean(false);
-    private final AtomicReference<TracerDriver>     tracer = new AtomicReference<TracerDriver>(new DefaultTracerDriver());
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final ConnectionState state;
+    private final AtomicReference<RetryPolicy> retryPolicy = new AtomicReference<RetryPolicy>();
+    private final int connectionTimeoutMs;
+    private final AtomicBoolean started = new AtomicBoolean(false);
+    private final AtomicReference<TracerDriver> tracer = new AtomicReference<TracerDriver>(new DefaultTracerDriver());
 
     /**
      *
@@ -159,7 +160,7 @@ public class CuratorZookeeperClient implements Closeable
         Preconditions.checkState(started.get(), "Client is not started");
 
         log.debug("blockUntilConnectedOrTimedOut() start");
-        TimeTrace       trace = startTracer("blockUntilConnectedOrTimedOut");
+        TimeTrace trace = startTracer("blockUntilConnectedOrTimedOut");
 
         internalBlockUntilConnectedOrTimedOut();
 
@@ -176,7 +177,7 @@ public class CuratorZookeeperClient implements Closeable
      *
      * @throws IOException errors
      */
-    public void     start() throws Exception
+    public void start() throws Exception
     {
         log.debug("Starting");
 
@@ -192,7 +193,7 @@ public class CuratorZookeeperClient implements Closeable
     /**
      * Close the client
      */
-    public void     close()
+    public void close()
     {
         log.debug("Closing");
 
@@ -212,7 +213,7 @@ public class CuratorZookeeperClient implements Closeable
      *
      * @param policy new policy
      */
-    public void     setRetryPolicy(RetryPolicy policy)
+    public void setRetryPolicy(RetryPolicy policy)
     {
         Preconditions.checkNotNull(policy, "policy cannot be null");
 
@@ -234,7 +235,7 @@ public class CuratorZookeeperClient implements Closeable
      * @param name name of the event
      * @return the new tracer ({@link TimeTrace#commit()} must be called)
      */
-    public TimeTrace          startTracer(String name)
+    public TimeTrace startTracer(String name)
     {
         return new TimeTrace(name, tracer.get());
     }
@@ -244,7 +245,7 @@ public class CuratorZookeeperClient implements Closeable
      *
      * @return tracing driver
      */
-    public TracerDriver       getTracerDriver()
+    public TracerDriver getTracerDriver()
     {
         return tracer.get();
     }
@@ -254,7 +255,7 @@ public class CuratorZookeeperClient implements Closeable
      *
      * @param tracer new tracing driver
      */
-    public void               setTracerDriver(TracerDriver tracer)
+    public void setTracerDriver(TracerDriver tracer)
     {
         this.tracer.set(tracer);
     }
@@ -265,7 +266,7 @@ public class CuratorZookeeperClient implements Closeable
      *
      * @return connection string
      */
-    public String             getCurrentConnectionString()
+    public String getCurrentConnectionString()
     {
         return state.getEnsembleProvider().getConnectionString();
     }
@@ -281,6 +282,16 @@ public class CuratorZookeeperClient implements Closeable
     }
 
     /**
+     * For internal use only - reset the internally managed ZK handle
+     *
+     * @throws Exception errors
+     */
+    public void reset() throws Exception
+    {
+        state.reset();
+    }
+
+    /**
      * Every time a new {@link ZooKeeper} instance is allocated, the "instance index"
      * is incremented.
      *
@@ -291,22 +302,27 @@ public class CuratorZookeeperClient implements Closeable
         return state.getInstanceIndex();
     }
 
-    void        addParentWatcher(Watcher watcher)
+    public boolean retryConnectionTimeouts()
+    {
+        return true;
+    }
+
+    void addParentWatcher(Watcher watcher)
     {
         state.addParentWatcher(watcher);
     }
 
-    void        removeParentWatcher(Watcher watcher)
+    void removeParentWatcher(Watcher watcher)
     {
         state.removeParentWatcher(watcher);
     }
 
     void internalBlockUntilConnectedOrTimedOut() throws InterruptedException
     {
-        long            waitTimeMs = connectionTimeoutMs;
+        long waitTimeMs = connectionTimeoutMs;
         while ( !state.isConnected() && (waitTimeMs > 0) )
         {
-            final CountDownLatch            latch = new CountDownLatch(1);
+            final CountDownLatch latch = new CountDownLatch(1);
             Watcher tempWatcher = new Watcher()
             {
                 @Override
@@ -315,9 +331,9 @@ public class CuratorZookeeperClient implements Closeable
                     latch.countDown();
                 }
             };
-            
+
             state.addParentWatcher(tempWatcher);
-            long        startTimeMs = System.currentTimeMillis();
+            long startTimeMs = System.currentTimeMillis();
             try
             {
                 latch.await(1, TimeUnit.SECONDS);
@@ -326,7 +342,7 @@ public class CuratorZookeeperClient implements Closeable
             {
                 state.removeParentWatcher(tempWatcher);
             }
-            long        elapsed = Math.max(1, System.currentTimeMillis() - startTimeMs);
+            long elapsed = Math.max(1, System.currentTimeMillis() - startTimeMs);
             waitTimeMs -= elapsed;
         }
     }

http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-client/src/main/java/org/apache/curator/RetryLoop.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/RetryLoop.java b/curator-client/src/main/java/org/apache/curator/RetryLoop.java
index 065ebef..8d77cf7 100644
--- a/curator-client/src/main/java/org/apache/curator/RetryLoop.java
+++ b/curator-client/src/main/java/org/apache/curator/RetryLoop.java
@@ -98,11 +98,17 @@ public class RetryLoop
     {
         T               result = null;
         RetryLoop       retryLoop = client.newRetryLoop();
+        boolean         connectionFailed = false;
         while ( retryLoop.shouldContinue() )
         {
             try
             {
                 client.internalBlockUntilConnectedOrTimedOut();
+                if ( !client.isConnected() && !client.retryConnectionTimeouts() )
+                {
+                    connectionFailed = true;
+                    break;
+                }
                 
                 result = proc.call();
                 retryLoop.markComplete();
@@ -112,6 +118,12 @@ public class RetryLoop
                 retryLoop.takeException(e);
             }
         }
+
+        if ( connectionFailed )
+        {
+            throw new KeeperException.ConnectionLossException();
+        }
+
         return result;
     }
 

http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
index 6209b06..fad4fc2 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
@@ -117,7 +117,7 @@ public class CuratorFrameworkFactory
         private ACLProvider aclProvider = DEFAULT_ACL_PROVIDER;
         private boolean canBeReadOnly = false;
         private boolean useContainerParentsIfAvailable = true;
-        private boolean enableSessionExpiredState = false;
+        private boolean enableSessionExpiredState = Boolean.getBoolean("curator-enable-session-expired-state");
 
         /**
          * Apply the current values and build a new CuratorFramework

http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index c64fb8f..c359fdc 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -61,6 +61,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 public class CuratorFrameworkImpl implements CuratorFramework
@@ -84,6 +85,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
     private final NamespaceWatcherMap namespaceWatcherMap = new NamespaceWatcherMap(this);
     private final boolean useContainerParentsIfAvailable;
     private final boolean enableSessionExpiredState;
+    private final AtomicLong currentInstanceIndex = new AtomicLong(-1);
 
     private volatile ExecutorService executorService;
     private final AtomicBoolean logAsErrorConnectionErrors = new AtomicBoolean(false);
@@ -104,15 +106,31 @@ public class CuratorFrameworkImpl implements CuratorFramework
     public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder)
     {
         ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());
-        this.client = new CuratorZookeeperClient(localZookeeperFactory, builder.getEnsembleProvider(), builder.getSessionTimeoutMs(), builder.getConnectionTimeoutMs(), new Watcher()
+        this.client = new CuratorZookeeperClient
+        (
+            localZookeeperFactory,
+            builder.getEnsembleProvider(),
+            builder.getSessionTimeoutMs(),
+            builder.getConnectionTimeoutMs(),
+            new Watcher()
+            {
+                @Override
+                public void process(WatchedEvent watchedEvent)
+                {
+                    CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
+                    processEvent(event);
+                }
+            },
+            builder.getRetryPolicy(),
+            builder.canBeReadOnly()
+        )
         {
             @Override
-            public void process(WatchedEvent watchedEvent)
+            public boolean retryConnectionTimeouts()
             {
-                CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
-                processEvent(event);
+                return !enableSessionExpiredState;
             }
-        }, builder.getRetryPolicy(), builder.canBeReadOnly());
+        };
 
         listeners = new ListenerContainer<CuratorListener>();
         unhandledErrorListeners = new ListenerContainer<UnhandledErrorListener>();
@@ -675,14 +693,29 @@ public class CuratorFrameworkImpl implements CuratorFramework
         }
         else if ( state == Watcher.Event.KeeperState.SyncConnected )
         {
+            checkNewConnection();
             connectionStateManager.addStateChange(ConnectionState.RECONNECTED);
         }
         else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly )
         {
+            checkNewConnection();
             connectionStateManager.addStateChange(ConnectionState.READ_ONLY);
         }
     }
 
+    private void checkNewConnection()
+    {
+        if ( enableSessionExpiredState )
+        {
+            long instanceIndex = client.getInstanceIndex();
+            long newInstanceIndex = currentInstanceIndex.getAndSet(instanceIndex);
+            if ( (newInstanceIndex >= 0) && (instanceIndex != newInstanceIndex) )   // currentInstanceIndex is initially -1 - ignore this
+            {
+                connectionStateManager.addStateChange(ConnectionState.LOST);
+            }
+        }
+    }
+
     Watcher.Event.KeeperState codeToState(KeeperException.Code code)
     {
         switch ( code )

http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
index 49d0044..79f3b62 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
@@ -86,6 +86,11 @@ public enum ConnectionState
      *     b) Curator closes the internally managed ZooKeeper instance; c) The configured session timeout
      *     elapses during a network partition.
      * </p>
+     *
+     * <p>
+     *     NOTE: the new behavior for the LOST state can also be enabled via the command line
+     *     property "curator-enable-session-expired-state" (e.g. -Dcurator-enable-session-expired-state=true)
+     * </p>
      */
     LOST
     {

http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index c0feb84..553faac 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -251,7 +251,8 @@ public class ConnectionStateManager implements Closeable
         {
             while ( !Thread.currentThread().isInterrupted() )
             {
-                final ConnectionState newState = eventQueue.poll(sessionTimeoutMs, TimeUnit.MILLISECONDS);
+                int pollMaxMs = (sessionTimeoutMs * 2) / 3; // 2/3 of session timeout
+                final ConnectionState newState = eventQueue.poll(pollMaxMs, TimeUnit.MILLISECONDS);
                 if ( newState != null )
                 {
                     if ( listeners.size() == 0 )
@@ -294,7 +295,15 @@ public class ConnectionStateManager implements Closeable
             long elapsedMs = System.currentTimeMillis() - startOfSuspendedEpoch;
             if ( elapsedMs >= sessionTimeoutMs )
             {
-                log.info(String.format("Session timeout has elapsed while SUSPENDED. Posting LOST event. Elapsed ms: %d", elapsedMs));
+                log.info(String.format("Session timeout has elapsed while SUSPENDED. Posting LOST event and resetting the connection. Elapsed ms: %d", elapsedMs));
+                try
+                {
+                    client.getZookeeperClient().reset();
+                }
+                catch ( Exception e )
+                {
+                    log.error("Could not reset the connection", e);
+                }
                 addStateChange(ConnectionState.LOST);
             }
         }

http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
index f649afb..eeec797 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
@@ -114,6 +114,7 @@ public class TestBlockUntilConnected extends BaseClassForTests
         Timing timing = new Timing();
         CuratorFramework client = CuratorFrameworkFactory.builder().
             connectString(server.getConnectString()).
+            sessionTimeoutMs(timing.session()).
             retryPolicy(new RetryOneTime(1)).
             build();
 

http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
new file mode 100644
index 0000000..030a292
--- /dev/null
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
@@ -0,0 +1,99 @@
+package org.apache.curator.framework.imps;
+
+import com.google.common.collect.Queues;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.KillSession;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.KeeperException;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class TestEnabledSessionExpiredState extends BaseClassForTests
+{
+    private final Timing timing = new Timing();
+
+    private CuratorFramework client;
+    private BlockingQueue<ConnectionState> states;
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception
+    {
+        super.setup();
+
+        client = CuratorFrameworkFactory.builder()
+            .connectString(server.getConnectString())
+            .connectionTimeoutMs(timing.connection())
+            .sessionTimeoutMs(timing.session())
+            .enableSessionExpiredState()
+            .retryPolicy(new RetryOneTime(1))
+            .build();
+        client.start();
+
+        states = Queues.newLinkedBlockingQueue();
+        ConnectionStateListener listener = new ConnectionStateListener()
+        {
+            @Override
+            public void stateChanged(CuratorFramework client, ConnectionState newState)
+            {
+                states.add(newState);
+            }
+        };
+        client.getConnectionStateListenable().addListener(listener);
+    }
+
+    @AfterMethod
+    @Override
+    public void teardown() throws Exception
+    {
+        CloseableUtils.closeQuietly(client);
+
+        super.teardown();
+    }
+
+    @Test
+    public void testKillSession() throws Exception
+    {
+        Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED);
+
+        KillSession.kill(client.getZookeeperClient().getZooKeeper(), server.getConnectString());
+
+        Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED);
+        Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.LOST);
+        Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.RECONNECTED);
+    }
+
+    @Test
+    public void testReconnectWithoutExpiration() throws Exception
+    {
+        Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED);
+        server.stop();
+        try
+        {
+            client.checkExists().forPath("/");  // any API call that will invoke the retry policy, etc.
+        }
+        catch ( KeeperException.ConnectionLossException ignore )
+        {
+        }
+        Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED);
+        server.restart();
+        client.checkExists().forPath("/");
+        Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.RECONNECTED);
+    }
+
+    @Override
+    protected boolean enabledSessionExpiredStateAware()
+    {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
----------------------------------------------------------------------
diff --git a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
index 6ef3bb0..c9f3524 100644
--- a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
+++ b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
@@ -16,10 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.test;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.IInvokedMethod;
+import org.testng.IInvokedMethodListener;
 import org.testng.IRetryAnalyzer;
 import org.testng.ITestContext;
+import org.testng.ITestNGListener;
 import org.testng.ITestNGMethod;
 import org.testng.ITestResult;
 import org.testng.annotations.AfterMethod;
@@ -32,11 +38,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
 public class BaseClassForTests
 {
     protected TestingServer server;
+    private final Logger log = LoggerFactory.getLogger(getClass());
 
-    private static final int    RETRY_WAIT_MS = 5000;
+    private static final int RETRY_WAIT_MS = 5000;
     private static final String INTERNAL_PROPERTY_DONT_LOG_CONNECTION_ISSUES;
     private static final String INTERNAL_PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND;
     private static final String INTERNAL_RETRY_FAILED_TESTS;
+
     static
     {
         String logConnectionIssues = null;
@@ -70,8 +78,30 @@ public class BaseClassForTests
     @BeforeSuite(alwaysRun = true)
     public void beforeSuite(ITestContext context)
     {
+        if ( !enabledSessionExpiredStateAware() )
+        {
+            ITestNGListener listener = new IInvokedMethodListener()
+            {
+                @Override
+                public void beforeInvocation(IInvokedMethod method, ITestResult testResult)
+                {
+                    int invocationCount = method.getTestMethod().getCurrentInvocationCount();
+                    System.setProperty("curator-enable-session-expired-state", Boolean.toString(invocationCount == 1));
+                    log.info("curator-enable-session-expired-state: " + Boolean.toString(invocationCount == 1));
+                }
+
+                @Override
+                public void afterInvocation(IInvokedMethod method, ITestResult testResult)
+                {
+                    System.clearProperty("curator-enable-session-expired-state");
+                }
+            };
+            context.getSuite().addListener(listener);
+        }
+
         for ( ITestNGMethod method : context.getAllTestMethods() )
         {
+            method.setInvocationCount(enabledSessionExpiredStateAware() ? 1 : 2);
             method.setRetryAnalyzer(new RetryTest());
         }
     }
@@ -117,6 +147,11 @@ public class BaseClassForTests
         }
     }
 
+    protected boolean enabledSessionExpiredStateAware()
+    {
+        return false;
+    }
+
     private static class RetryTest implements IRetryAnalyzer
     {
         private final AtomicBoolean hasBeenRetried = new AtomicBoolean(!Boolean.getBoolean(INTERNAL_RETRY_FAILED_TESTS));

http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-test/src/main/java/org/apache/curator/test/Timing.java
----------------------------------------------------------------------
diff --git a/curator-test/src/main/java/org/apache/curator/test/Timing.java b/curator-test/src/main/java/org/apache/curator/test/Timing.java
index f29b1c5..753d62d 100644
--- a/curator-test/src/main/java/org/apache/curator/test/Timing.java
+++ b/curator-test/src/main/java/org/apache/curator/test/Timing.java
@@ -34,7 +34,7 @@ public class Timing
 
     private static final int DEFAULT_SECONDS = 10;
     private static final int DEFAULT_WAITING_MULTIPLE = 5;
-    private static final double SESSION_MULTIPLE = .25;
+    private static final double SESSION_MULTIPLE = 1.5;
 
     /**
      * Use the default base time


[2/3] curator git commit: Merge branch 'CURATOR-3.0' into CURATOR-247

Posted by ra...@apache.org.
Merge branch 'CURATOR-3.0' into CURATOR-247


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/2343daf2
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/2343daf2
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/2343daf2

Branch: refs/heads/CURATOR-247
Commit: 2343daf29388566b0efa0b0a2ad21574fb534a27
Parents: 344634a 2fc9e37
Author: randgalt <ra...@apache.org>
Authored: Fri Aug 21 15:11:59 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Aug 21 15:11:59 2015 -0500

----------------------------------------------------------------------
 .../recipes/queue/DistributedIdQueue.java       | 28 +++++++++++++++++++-
 .../recipes/queue/DistributedQueue.java         |  7 ++++-
 .../recipes/queue/TestDistributedIdQueue.java   |  2 +-
 .../recipes/queue/TestDistributedQueue.java     |  2 +-
 4 files changed, 35 insertions(+), 4 deletions(-)
----------------------------------------------------------------------