You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2015/10/09 02:12:18 UTC

[1/7] curator git commit: 1. EnsembleTracker should always be on, it now is 2. Removed DynamicEnsembleProvider. This should not be optional. EnsembleTracker now always publishes config changes which will end up calling ZooKeeper.updateServerList() 3. Tes

Repository: curator
Updated Branches:
  refs/heads/CURATOR-3.0 0fe4d969f -> 779ff5ea4


1. EnsembleTracker should always be on, it now is
2. Removed DynamicEnsembleProvider. This should not be optional. EnsembleTracker now always publishes config changes which will end up calling ZooKeeper.updateServerList()
3. Testing


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/26364c61
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/26364c61
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/26364c61

Branch: refs/heads/CURATOR-3.0
Commit: 26364c6186fc7c09a9462557b1ca791e9aa70006
Parents: a7076bc
Author: randgalt <ra...@apache.org>
Authored: Sat Sep 26 13:13:02 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sat Sep 26 13:13:02 2015 -0500

----------------------------------------------------------------------
 .../org/apache/curator/ConnectionState.java     |  32 +++-
 .../java/org/apache/curator/HandleHolder.java   |   4 +-
 .../ClassicConnectionHandlingPolicy.java        |   4 +-
 .../connection/ConnectionHandlingPolicy.java    |   6 +-
 .../StandardConnectionHandlingPolicy.java       |   4 +-
 .../curator/ensemble/EnsembleListener.java      |  24 ---
 .../curator/ensemble/EnsembleProvider.java      |   2 +
 .../dynamic/DynamicEnsembleProvider.java        |  61 ------
 .../exhibitor/ExhibitorEnsembleProvider.java    |   6 +
 .../ensemble/fixed/FixedEnsembleProvider.java   |  13 +-
 .../framework/ensemble/EnsembleTracker.java     | 191 -------------------
 .../framework/imps/CuratorFrameworkImpl.java    |   7 +
 .../curator/framework/imps/EnsembleTracker.java | 150 +++++++++++++++
 .../framework/imps/TestReconfiguration.java     |  57 +++++-
 .../locks/TestInterProcessSemaphoreCluster.java |   5 +
 15 files changed, 263 insertions(+), 303 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/26364c61/curator-client/src/main/java/org/apache/curator/ConnectionState.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/ConnectionState.java b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
index eea2ce0..4c1e6ad 100644
--- a/curator-client/src/main/java/org/apache/curator/ConnectionState.java
+++ b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
@@ -199,12 +199,14 @@ class ConnectionState implements Watcher, Closeable
 
     private synchronized void checkTimeouts() throws Exception
     {
-        Callable<Boolean> hasNewConnectionString  = new Callable<Boolean>()
+        final AtomicReference<String> newConnectionString = new AtomicReference<>();
+        Callable<String> hasNewConnectionString = new Callable<String>()
         {
             @Override
-            public Boolean call()
+            public String call()
             {
-                return zooKeeper.hasNewConnectionString();
+                newConnectionString.set(zooKeeper.getNewConnectionString());
+                return newConnectionString.get();
             }
         };
         int lastNegotiatedSessionTimeoutMs = getLastNegotiatedSessionTimeoutMs();
@@ -220,7 +222,7 @@ class ConnectionState implements Watcher, Closeable
 
             case NEW_CONNECTION_STRING:
             {
-                handleNewConnectionString();
+                handleNewConnectionString(newConnectionString.get());
                 break;
             }
 
@@ -298,22 +300,34 @@ class ConnectionState implements Watcher, Closeable
         }
         }
 
-        if ( checkNewConnectionString && zooKeeper.hasNewConnectionString() )
+        if ( checkNewConnectionString )
         {
-            handleNewConnectionString();
+            String newConnectionString = zooKeeper.getNewConnectionString();
+            if ( newConnectionString != null )
+            {
+                handleNewConnectionString(newConnectionString);
+            }
         }
 
         return isConnected;
     }
 
-    private void handleNewConnectionString()
+    private void handleNewConnectionString(String newConnectionString)
     {
-        log.info("Connection string changed");
+        log.info("Connection string changed to: " + newConnectionString);
         tracer.get().addCount("connection-string-changed", 1);
 
         try
         {
-            reset();
+            ZooKeeper zooKeeper = this.zooKeeper.getZooKeeper();
+            if ( zooKeeper == null )
+            {
+                log.warn("Could not update the connection string because getZooKeeper() returned null.");
+            }
+            else
+            {
+                zooKeeper.updateServerList(newConnectionString);
+            }
         }
         catch ( Exception e )
         {

http://git-wip-us.apache.org/repos/asf/curator/blob/26364c61/curator-client/src/main/java/org/apache/curator/HandleHolder.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/HandleHolder.java b/curator-client/src/main/java/org/apache/curator/HandleHolder.java
index 8652f0c..98b39ce 100644
--- a/curator-client/src/main/java/org/apache/curator/HandleHolder.java
+++ b/curator-client/src/main/java/org/apache/curator/HandleHolder.java
@@ -67,10 +67,10 @@ class HandleHolder
         return (helper != null) ? helper.getConnectionString() : null;
     }
 
-    boolean hasNewConnectionString() 
+    String getNewConnectionString()
     {
         String helperConnectionString = (helper != null) ? helper.getConnectionString() : null;
-        return (helperConnectionString != null) && !ensembleProvider.getConnectionString().equals(helperConnectionString);
+        return ((helperConnectionString != null) && !ensembleProvider.getConnectionString().equals(helperConnectionString)) ? helperConnectionString : null;
     }
 
     void closeAndClear() throws Exception

http://git-wip-us.apache.org/repos/asf/curator/blob/26364c61/curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java b/curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java
index 8116308..f620ffb 100644
--- a/curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java
+++ b/curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java
@@ -56,14 +56,14 @@ public class ClassicConnectionHandlingPolicy implements ConnectionHandlingPolicy
     }
 
     @Override
-    public CheckTimeoutsResult checkTimeouts(Callable<Boolean> hasNewConnectionString, long connectionStartMs, int sessionTimeoutMs, int connectionTimeoutMs) throws Exception
+    public CheckTimeoutsResult checkTimeouts(Callable<String> hasNewConnectionString, long connectionStartMs, int sessionTimeoutMs, int connectionTimeoutMs) throws Exception
     {
         CheckTimeoutsResult result = CheckTimeoutsResult.NOP;
         int minTimeout = Math.min(sessionTimeoutMs, connectionTimeoutMs);
         long elapsed = System.currentTimeMillis() - connectionStartMs;
         if ( elapsed >= minTimeout )
         {
-            if ( hasNewConnectionString.call() )
+            if ( hasNewConnectionString.call() != null )
             {
                 result = CheckTimeoutsResult.NEW_CONNECTION_STRING;
             }

http://git-wip-us.apache.org/repos/asf/curator/blob/26364c61/curator-client/src/main/java/org/apache/curator/connection/ConnectionHandlingPolicy.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/connection/ConnectionHandlingPolicy.java b/curator-client/src/main/java/org/apache/curator/connection/ConnectionHandlingPolicy.java
index c47577d..8f6a147 100644
--- a/curator-client/src/main/java/org/apache/curator/connection/ConnectionHandlingPolicy.java
+++ b/curator-client/src/main/java/org/apache/curator/connection/ConnectionHandlingPolicy.java
@@ -99,13 +99,13 @@ public interface ConnectionHandlingPolicy
      * Check timeouts. NOTE: this method is only called when an attempt to access to the ZooKeeper instances
      * is made and the connection has not completed.
      *
-     * @param hasNewConnectionString proc to call to check if there is a new connection string. Important: the internal state is cleared after
-     *                               this is called so you MUST handle the new connection string if <tt>true</tt> is returned
+     * @param getNewConnectionString proc to call to check if there is a new connection string. Important: the internal state is cleared after
+     *                               this is called so you MUST handle the new connection string if non null is returned
      * @param connectionStartMs the epoch/ms time that the connection was first initiated
      * @param sessionTimeoutMs the configured/negotiated session timeout in milliseconds
      * @param connectionTimeoutMs the configured connection timeout in milliseconds
      * @return result
      * @throws Exception errors
      */
-    CheckTimeoutsResult checkTimeouts(Callable<Boolean> hasNewConnectionString, long connectionStartMs, int sessionTimeoutMs, int connectionTimeoutMs) throws Exception;
+    CheckTimeoutsResult checkTimeouts(Callable<String> getNewConnectionString, long connectionStartMs, int sessionTimeoutMs, int connectionTimeoutMs) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/26364c61/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java b/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
index 9f311de..6995815 100644
--- a/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
+++ b/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
@@ -76,9 +76,9 @@ public class StandardConnectionHandlingPolicy implements ConnectionHandlingPolic
     }
 
     @Override
-    public CheckTimeoutsResult checkTimeouts(Callable<Boolean> hasNewConnectionString, long connectionStartMs, int sessionTimeoutMs, int connectionTimeoutMs) throws Exception
+    public CheckTimeoutsResult checkTimeouts(Callable<String> hasNewConnectionString, long connectionStartMs, int sessionTimeoutMs, int connectionTimeoutMs) throws Exception
     {
-        if ( hasNewConnectionString.call() )
+        if ( hasNewConnectionString.call() != null )
         {
             return CheckTimeoutsResult.NEW_CONNECTION_STRING;
         }

http://git-wip-us.apache.org/repos/asf/curator/blob/26364c61/curator-client/src/main/java/org/apache/curator/ensemble/EnsembleListener.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/ensemble/EnsembleListener.java b/curator-client/src/main/java/org/apache/curator/ensemble/EnsembleListener.java
deleted file mode 100644
index 8f963cd..0000000
--- a/curator-client/src/main/java/org/apache/curator/ensemble/EnsembleListener.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.ensemble;
-
-public interface EnsembleListener {
-
-    void connectionStringUpdated(String connectionString);
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/26364c61/curator-client/src/main/java/org/apache/curator/ensemble/EnsembleProvider.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/ensemble/EnsembleProvider.java b/curator-client/src/main/java/org/apache/curator/ensemble/EnsembleProvider.java
index b118294..c03726f 100644
--- a/curator-client/src/main/java/org/apache/curator/ensemble/EnsembleProvider.java
+++ b/curator-client/src/main/java/org/apache/curator/ensemble/EnsembleProvider.java
@@ -51,4 +51,6 @@ public interface EnsembleProvider extends Closeable
      * @throws IOException errors
      */
     public void         close() throws IOException;
+
+    public void setConnectionString(String connectionString);
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/26364c61/curator-client/src/main/java/org/apache/curator/ensemble/dynamic/DynamicEnsembleProvider.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/ensemble/dynamic/DynamicEnsembleProvider.java b/curator-client/src/main/java/org/apache/curator/ensemble/dynamic/DynamicEnsembleProvider.java
deleted file mode 100644
index 70b755f..0000000
--- a/curator-client/src/main/java/org/apache/curator/ensemble/dynamic/DynamicEnsembleProvider.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.ensemble.dynamic;
-
-import com.google.common.base.Preconditions;
-import org.apache.curator.ensemble.EnsembleListener;
-import org.apache.curator.ensemble.EnsembleProvider;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicReference;
-
-public class DynamicEnsembleProvider implements EnsembleProvider, EnsembleListener {
-
-    private final AtomicReference<String> connectionString = new AtomicReference<String>();
-
-    /**
-     * The connection string to use
-     *
-     * @param connectionString connection string
-     */
-    public DynamicEnsembleProvider(String connectionString)
-    {
-        this.connectionString.set(Preconditions.checkNotNull(connectionString, "connectionString cannot be null"));
-    }
-
-    @Override
-    public void start() throws Exception {
-        // NOP
-    }
-
-    @Override
-    public String getConnectionString() {
-        return connectionString.get();
-    }
-
-    @Override
-    public void close() throws IOException {
-        // NOP
-    }
-
-    @Override
-    public void connectionStringUpdated(String connectionString) {
-        this.connectionString.set(connectionString);
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/26364c61/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/ExhibitorEnsembleProvider.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/ExhibitorEnsembleProvider.java b/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/ExhibitorEnsembleProvider.java
index 02a01e5..4cbf5ee 100644
--- a/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/ExhibitorEnsembleProvider.java
+++ b/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/ExhibitorEnsembleProvider.java
@@ -145,6 +145,12 @@ public class ExhibitorEnsembleProvider implements EnsembleProvider
         return connectionString.get();
     }
 
+    @Override
+    public void setConnectionString(String connectionString)
+    {
+        log.info("setConnectionString received. Ignoring. " + connectionString);
+    }
+
     @VisibleForTesting
     protected void poll()
     {

http://git-wip-us.apache.org/repos/asf/curator/blob/26364c61/curator-client/src/main/java/org/apache/curator/ensemble/fixed/FixedEnsembleProvider.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/ensemble/fixed/FixedEnsembleProvider.java b/curator-client/src/main/java/org/apache/curator/ensemble/fixed/FixedEnsembleProvider.java
index 411c712..159497d 100644
--- a/curator-client/src/main/java/org/apache/curator/ensemble/fixed/FixedEnsembleProvider.java
+++ b/curator-client/src/main/java/org/apache/curator/ensemble/fixed/FixedEnsembleProvider.java
@@ -21,13 +21,14 @@ package org.apache.curator.ensemble.fixed;
 import com.google.common.base.Preconditions;
 import org.apache.curator.ensemble.EnsembleProvider;
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Standard ensemble provider that wraps a fixed connection string
  */
 public class FixedEnsembleProvider implements EnsembleProvider
 {
-    private final String connectionString;
+    private final AtomicReference<String> connectionString = new AtomicReference<>();
 
     /**
      * The connection string to use
@@ -36,7 +37,7 @@ public class FixedEnsembleProvider implements EnsembleProvider
      */
     public FixedEnsembleProvider(String connectionString)
     {
-        this.connectionString = Preconditions.checkNotNull(connectionString, "connectionString cannot be null");
+        this.connectionString.set(Preconditions.checkNotNull(connectionString, "connectionString cannot be null"));
     }
 
     @Override
@@ -52,8 +53,14 @@ public class FixedEnsembleProvider implements EnsembleProvider
     }
 
     @Override
+    public void setConnectionString(String connectionString)
+    {
+        this.connectionString.set(connectionString);
+    }
+
+    @Override
     public String getConnectionString()
     {
-        return connectionString;
+        return connectionString.get();
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/26364c61/curator-framework/src/main/java/org/apache/curator/framework/ensemble/EnsembleTracker.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/ensemble/EnsembleTracker.java b/curator-framework/src/main/java/org/apache/curator/framework/ensemble/EnsembleTracker.java
deleted file mode 100644
index 375e1f0..0000000
--- a/curator-framework/src/main/java/org/apache/curator/framework/ensemble/EnsembleTracker.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.curator.framework.ensemble;
-
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import org.apache.curator.ensemble.EnsembleListener;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.BackgroundCallback;
-import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.api.CuratorWatcher;
-import org.apache.curator.framework.listen.ListenerContainer;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.server.quorum.QuorumPeer;
-import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
-import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.ByteArrayInputStream;
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * Tracks changes to the ensemble and notifies registered {@link org.apache.curator.ensemble.EnsembleListener} instances.
- */
-public class EnsembleTracker implements Closeable
-{
-    private final Logger log = LoggerFactory.getLogger(getClass());
-    private final CuratorFramework client;
-    private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
-    private final ListenerContainer<EnsembleListener> listeners = new ListenerContainer<>();
-    private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
-    {
-        @Override
-        public void stateChanged(CuratorFramework client, ConnectionState newState)
-        {
-            if ( (newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED) )
-            {
-                try
-                {
-                    reset();
-                }
-                catch ( Exception e )
-                {
-                    log.error("Trying to reset after reconnection", e);
-                }
-            }
-        }
-    };
-
-    private final CuratorWatcher watcher = new CuratorWatcher()
-    {
-        @Override
-        public void process(WatchedEvent event) throws Exception
-        {
-            if ( event.getType() == Watcher.Event.EventType.NodeDataChanged )
-            {
-                reset();
-            }
-        }
-    };
-
-    private enum State
-    {
-        LATENT,
-        STARTED,
-        CLOSED
-    }
-
-    private final BackgroundCallback backgroundCallback = new BackgroundCallback()
-    {
-        @Override
-        public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
-        {
-            processBackgroundResult(event);
-        }
-    };
-
-    public EnsembleTracker(CuratorFramework client)
-    {
-        this.client = client;
-    }
-
-    public void start() throws Exception
-    {
-        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
-        client.getConnectionStateListenable().addListener(connectionStateListener);
-        reset();
-    }
-
-    @Override
-    public void close() throws IOException
-    {
-        if ( state.compareAndSet(State.STARTED, State.CLOSED) )
-        {
-            listeners.clear();
-        }
-        client.getConnectionStateListenable().removeListener(connectionStateListener);
-    }
-
-    /**
-     * Return the ensemble listenable
-     *
-     * @return listenable
-     */
-    public ListenerContainer<EnsembleListener> getListenable()
-    {
-        Preconditions.checkState(state.get() != State.CLOSED, "Closed");
-
-        return listeners;
-    }
-
-    private void reset() throws Exception
-    {
-        client.getConfig().usingWatcher(watcher).inBackground(backgroundCallback).forEnsemble();
-    }
-
-    private void processBackgroundResult(CuratorEvent event) throws Exception
-    {
-        switch ( event.getType() )
-        {
-            case GET_CONFIG:
-            {
-                if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
-                {
-                    processConfigData(event.getData());
-                }
-            }
-        }
-    }
-
-    private void processConfigData(byte[] data) throws Exception
-    {
-        Properties properties = new Properties();
-        properties.load(new ByteArrayInputStream(data));
-        QuorumVerifier qv = new QuorumMaj(properties);
-        StringBuilder sb = new StringBuilder();
-        for ( QuorumPeer.QuorumServer server : qv.getAllMembers().values() )
-        {
-            if ( sb.length() != 0 )
-            {
-                sb.append(",");
-            }
-            sb.append(server.clientAddr.getAddress().getHostAddress()).append(":").append(server.clientAddr.getPort());
-        }
-
-        final String connectionString = sb.toString();
-        listeners.forEach
-            (
-                new Function<EnsembleListener, Void>()
-                {
-                    @Override
-                    public Void apply(EnsembleListener listener)
-                    {
-                        try
-                        {
-                            listener.connectionStringUpdated(connectionString);
-                        }
-                        catch ( Exception e )
-                        {
-                            log.error("Calling listener", e);
-                        }
-                        return null;
-                    }
-                }
-            );
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/26364c61/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index da9067d..f2f578c 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -88,6 +88,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
     private final ConnectionStateErrorPolicy connectionStateErrorPolicy;
     private final AtomicLong currentInstanceIndex = new AtomicLong(-1);
     private final InternalConnectionHandler internalConnectionHandler;
+    private final EnsembleTracker ensembleTracker;
 
     private volatile ExecutorService executorService;
     private final AtomicBoolean logAsErrorConnectionErrors = new AtomicBoolean(false);
@@ -150,6 +151,8 @@ public class CuratorFrameworkImpl implements CuratorFramework
         failedDeleteManager = new FailedDeleteManager(this);
         failedRemoveWatcherManager = new FailedRemoveWatchManager(this);
         namespaceFacadeCache = new NamespaceFacadeCache(this);
+
+        ensembleTracker = new EnsembleTracker(this, builder.getEnsembleProvider());
     }
 
     private List<AuthInfo> buildAuths(CuratorFrameworkFactory.Builder builder)
@@ -217,6 +220,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
         useContainerParentsIfAvailable = parent.useContainerParentsIfAvailable;
         connectionStateErrorPolicy = parent.connectionStateErrorPolicy;
         internalConnectionHandler = parent.internalConnectionHandler;
+        ensembleTracker = null;
     }
 
     @Override
@@ -306,6 +310,8 @@ public class CuratorFrameworkImpl implements CuratorFramework
                     return null;
                 }
             });
+
+            ensembleTracker.start();
         }
         catch ( Exception e )
         {
@@ -351,6 +357,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
                 }
             }
 
+            ensembleTracker.close();
             listeners.clear();
             unhandledErrorListeners.clear();
             connectionStateManager.close();

http://git-wip-us.apache.org/repos/asf/curator/blob/26364c61/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
new file mode 100644
index 0000000..d8092fe
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.imps;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.curator.ensemble.EnsembleProvider;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.ByteArrayInputStream;
+import java.io.Closeable;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+@VisibleForTesting
+public class EnsembleTracker implements Closeable
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final CuratorFramework client;
+    private final EnsembleProvider ensembleProvider;
+    private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
+    private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
+    {
+        @Override
+        public void stateChanged(CuratorFramework client, ConnectionState newState)
+        {
+            if ( (newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED) )
+            {
+                try
+                {
+                    reset();
+                }
+                catch ( Exception e )
+                {
+                    log.error("Trying to reset after reconnection", e);
+                }
+            }
+        }
+    };
+
+    private final CuratorWatcher watcher = new CuratorWatcher()
+    {
+        @Override
+        public void process(WatchedEvent event) throws Exception
+        {
+            if ( event.getType() == Watcher.Event.EventType.NodeDataChanged )
+            {
+                reset();
+            }
+        }
+    };
+
+    private enum State
+    {
+        LATENT,
+        STARTED,
+        CLOSED
+    }
+
+    EnsembleTracker(CuratorFramework client, EnsembleProvider ensembleProvider)
+    {
+        this.client = client;
+        this.ensembleProvider = ensembleProvider;
+    }
+
+    public void start() throws Exception
+    {
+        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
+        client.getConnectionStateListenable().addListener(connectionStateListener);
+        reset();
+    }
+
+    @Override
+    public void close()
+    {
+        client.getConnectionStateListenable().removeListener(connectionStateListener);
+    }
+
+    private void reset() throws Exception
+    {
+        BackgroundCallback backgroundCallback = new BackgroundCallback()
+        {
+            @Override
+            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+            {
+                if ( event.getType() == CuratorEventType.GET_CONFIG )
+                {
+                    processConfigData(event.getData());
+                }
+            }
+        };
+        client.getConfig().usingWatcher(watcher).inBackground(backgroundCallback).forEnsemble();
+    }
+
+    @VisibleForTesting
+    public static String configToConnectionString(byte[] data) throws Exception
+    {
+        Properties properties = new Properties();
+        properties.load(new ByteArrayInputStream(data));
+        QuorumVerifier qv = new QuorumMaj(properties);
+        StringBuilder sb = new StringBuilder();
+        for ( QuorumPeer.QuorumServer server : qv.getAllMembers().values() )
+        {
+            if ( sb.length() != 0 )
+            {
+                sb.append(",");
+            }
+            sb.append(server.clientAddr.getAddress().getHostAddress()).append(":").append(server.clientAddr.getPort());
+        }
+
+        return sb.toString();
+    }
+
+    private void processConfigData(byte[] data) throws Exception
+    {
+        log.info("New config event received: " + Arrays.toString(data));
+        String connectionString = configToConnectionString(data);
+        ensembleProvider.setConnectionString(connectionString);
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/26364c61/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
index e7d2229..101360a 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
@@ -20,6 +20,7 @@
 package org.apache.curator.framework.imps;
 
 import com.google.common.collect.Lists;
+import org.apache.curator.ensemble.EnsembleProvider;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.BackgroundCallback;
@@ -43,17 +44,20 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 import java.io.ByteArrayInputStream;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class TestReconfiguration extends BaseClassForTests
 {
     private final Timing timing = new Timing();
     private TestingCluster cluster;
+    private EnsembleProvider ensembleProvider;
 
     @BeforeMethod
     @Override
@@ -72,6 +76,7 @@ public class TestReconfiguration extends BaseClassForTests
     public void teardown() throws Exception
     {
         CloseableUtils.closeQuietly(cluster);
+        ensembleProvider = null;
 
         super.teardown();
     }
@@ -151,9 +156,11 @@ public class TestReconfiguration extends BaseClassForTests
         try ( CuratorFramework client = newClient())
         {
             client.start();
-            QuorumVerifier quorumVerifier = toQuorumVerifier(client.getConfig().forEnsemble());
+            byte[] configData = client.getConfig().forEnsemble();
+            QuorumVerifier quorumVerifier = toQuorumVerifier(configData);
             System.out.println(quorumVerifier);
             assertConfig(quorumVerifier, cluster.getInstances());
+            Assert.assertEquals(EnsembleTracker.configToConnectionString(configData), ensembleProvider.getConnectionString());
         }
     }
 
@@ -176,10 +183,12 @@ public class TestReconfiguration extends BaseClassForTests
 
                 Assert.assertTrue(timing.awaitLatch(latch));
 
-                QuorumVerifier newConfig = toQuorumVerifier(client.getConfig().forEnsemble());
+                byte[] newConfigData = client.getConfig().forEnsemble();
+                QuorumVerifier newConfig = toQuorumVerifier(newConfigData);
                 List<InstanceSpec> newInstances = Lists.newArrayList(cluster.getInstances());
                 newInstances.addAll(newCluster.getInstances());
                 assertConfig(newConfig, newInstances);
+                Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfigData), ensembleProvider.getConnectionString());
             }
         }
     }
@@ -216,10 +225,12 @@ public class TestReconfiguration extends BaseClassForTests
                 Assert.assertTrue(timing.awaitLatch(callbackLatch));
                 Assert.assertTrue(timing.awaitLatch(latch));
 
-                QuorumVerifier newConfig = toQuorumVerifier(client.getConfig().forEnsemble());
+                byte[] newConfigData = client.getConfig().forEnsemble();
+                QuorumVerifier newConfig = toQuorumVerifier(newConfigData);
                 List<InstanceSpec> newInstances = Lists.newArrayList(cluster.getInstances());
                 newInstances.addAll(newCluster.getInstances());
                 assertConfig(newConfig, newInstances);
+                Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfigData), ensembleProvider.getConnectionString());
             }
         }
     }
@@ -254,11 +265,13 @@ public class TestReconfiguration extends BaseClassForTests
 
                 Assert.assertTrue(timing.awaitLatch(latch));
 
-                QuorumVerifier newConfig = toQuorumVerifier(client.getConfig().forEnsemble());
+                byte[] newConfigData = client.getConfig().forEnsemble();
+                QuorumVerifier newConfig = toQuorumVerifier(newConfigData);
                 ArrayList<InstanceSpec> newInstances = Lists.newArrayList(oldInstances);
                 newInstances.addAll(instances);
                 newInstances.remove(removeSpec);
                 assertConfig(newConfig, newInstances);
+                Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfigData), ensembleProvider.getConnectionString());
             }
         }
     }
@@ -290,15 +303,47 @@ public class TestReconfiguration extends BaseClassForTests
             client.reconfig().withNewMembers(toReconfigSpec(smallCluster)).forEnsemble();
 
             Assert.assertTrue(timing.awaitLatch(latch));
-            QuorumVerifier newConfig = toQuorumVerifier(client.getConfig().forEnsemble());
+            byte[] newConfigData = client.getConfig().forEnsemble();
+            QuorumVerifier newConfig = toQuorumVerifier(newConfigData);
             Assert.assertEquals(newConfig.getAllMembers().size(), 3);
             assertConfig(newConfig, smallCluster);
+            Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfigData), ensembleProvider.getConnectionString());
         }
     }
 
     private CuratorFramework newClient()
     {
-        return CuratorFrameworkFactory.newClient(cluster.getConnectString(), timing.session(), timing.connection(), new ExponentialBackoffRetry(timing.forSleepingABit().milliseconds(), 3));
+        final AtomicReference<String> connectString = new AtomicReference<>(cluster.getConnectString());
+        ensembleProvider = new EnsembleProvider()
+        {
+            @Override
+            public void start() throws Exception
+            {
+            }
+
+            @Override
+            public String getConnectionString()
+            {
+                return connectString.get();
+            }
+
+            @Override
+            public void close() throws IOException
+            {
+            }
+
+            @Override
+            public void setConnectionString(String connectionString)
+            {
+                connectString.set(connectionString);
+            }
+        };
+        return CuratorFrameworkFactory.builder()
+            .ensembleProvider(ensembleProvider)
+            .sessionTimeoutMs(timing.session())
+            .connectionTimeoutMs(timing.connection())
+            .retryPolicy(new ExponentialBackoffRetry(timing.forSleepingABit().milliseconds(), 3))
+            .build();
     }
 
     private CountDownLatch setChangeWaiter(CuratorFramework client) throws Exception

http://git-wip-us.apache.org/repos/asf/curator/blob/26364c61/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java
index f4cb7bb..ee49288 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java
@@ -65,6 +65,11 @@ public class TestInterProcessSemaphoreCluster
             final EnsembleProvider          provider = new EnsembleProvider()
             {
                 @Override
+                public void setConnectionString(String connectionString)
+                {
+                }
+
+                @Override
                 public void start() throws Exception
                 {
                 }


[4/7] curator git commit: fixed compile error

Posted by ra...@apache.org.
fixed compile error


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/2827ba81
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/2827ba81
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/2827ba81

Branch: refs/heads/CURATOR-3.0
Commit: 2827ba81b5c66ec4f864cc2e3582776e955158ac
Parents: 6e56e8a
Author: randgalt <ra...@apache.org>
Authored: Sat Sep 26 18:07:10 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sat Sep 26 18:07:10 2015 -0500

----------------------------------------------------------------------
 .../org/apache/curator/framework/imps/TestReconfiguration.java | 6 ++++++
 1 file changed, 6 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/2827ba81/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
index 101360a..0ec796b 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
@@ -322,6 +322,12 @@ public class TestReconfiguration extends BaseClassForTests
             }
 
             @Override
+            public boolean updateServerListEnabled()
+            {
+                return false;
+            }
+
+            @Override
             public String getConnectionString()
             {
                 return connectString.get();


[2/7] curator git commit: Added doc regarding EnsembleTracker

Posted by ra...@apache.org.
Added doc regarding EnsembleTracker


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/56d9ba66
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/56d9ba66
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/56d9ba66

Branch: refs/heads/CURATOR-3.0
Commit: 56d9ba665ef69944ef0ab9c65aa014b921fd39f9
Parents: 26364c6
Author: randgalt <ra...@apache.org>
Authored: Sat Sep 26 13:59:35 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sat Sep 26 13:59:35 2015 -0500

----------------------------------------------------------------------
 curator-framework/src/site/confluence/index.confluence | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/56d9ba66/curator-framework/src/site/confluence/index.confluence
----------------------------------------------------------------------
diff --git a/curator-framework/src/site/confluence/index.confluence b/curator-framework/src/site/confluence/index.confluence
index b79ece4..1f5f329 100644
--- a/curator-framework/src/site/confluence/index.confluence
+++ b/curator-framework/src/site/confluence/index.confluence
@@ -6,6 +6,7 @@ ZooKeeper and handles the complexity of managing connections to the ZooKeeper cl
 * Automatic connection management:
 ** There are potential error cases that require ZooKeeper clients to recreate a connection and/or retry operations. Curator
  automatically and transparently (mostly) handles these cases.
+** Watches for NodeDataChanged events and calls updateServerList() as needed.
 * Cleaner API:
 ** simplifies the raw ZooKeeper methods, events, etc.
 ** provides a modern, fluent interface


[3/7] curator git commit: ExhibitorEnsembleProvider is not compatible with updateServerList()

Posted by ra...@apache.org.
ExhibitorEnsembleProvider is not compatible with updateServerList()


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/6e56e8ae
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/6e56e8ae
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/6e56e8ae

Branch: refs/heads/CURATOR-3.0
Commit: 6e56e8ae9f04ffdd76505858dbbe5b1ff04dbd49
Parents: 56d9ba6
Author: randgalt <ra...@apache.org>
Authored: Sat Sep 26 18:03:06 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sat Sep 26 18:03:06 2015 -0500

----------------------------------------------------------------------
 .../org/apache/curator/ConnectionState.java     |  9 +++++-
 .../curator/ensemble/EnsembleProvider.java      | 12 ++++++++
 .../exhibitor/ExhibitorEnsembleProvider.java    | 30 ++++++++++++--------
 .../ensemble/fixed/FixedEnsembleProvider.java   |  6 ++++
 4 files changed, 44 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/6e56e8ae/curator-client/src/main/java/org/apache/curator/ConnectionState.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/ConnectionState.java b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
index 4c1e6ad..0b21643 100644
--- a/curator-client/src/main/java/org/apache/curator/ConnectionState.java
+++ b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
@@ -326,7 +326,14 @@ class ConnectionState implements Watcher, Closeable
             }
             else
             {
-                zooKeeper.updateServerList(newConnectionString);
+                if ( ensembleProvider.updateServerListEnabled() )
+                {
+                    zooKeeper.updateServerList(newConnectionString);
+                }
+                else
+                {
+                    reset();
+                }
             }
         }
         catch ( Exception e )

http://git-wip-us.apache.org/repos/asf/curator/blob/6e56e8ae/curator-client/src/main/java/org/apache/curator/ensemble/EnsembleProvider.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/ensemble/EnsembleProvider.java b/curator-client/src/main/java/org/apache/curator/ensemble/EnsembleProvider.java
index c03726f..4db8348 100644
--- a/curator-client/src/main/java/org/apache/curator/ensemble/EnsembleProvider.java
+++ b/curator-client/src/main/java/org/apache/curator/ensemble/EnsembleProvider.java
@@ -52,5 +52,17 @@ public interface EnsembleProvider extends Closeable
      */
     public void         close() throws IOException;
 
+    /**
+     * A new connection string event was received
+     *
+     * @param connectionString the new connection string
+     */
     public void setConnectionString(String connectionString);
+
+    /**
+     * Return true if this ensemble provider supports {@link ZooKeeper#updateServerList(String)}
+     *
+     * @return true/false
+     */
+    public boolean updateServerListEnabled();
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/6e56e8ae/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/ExhibitorEnsembleProvider.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/ExhibitorEnsembleProvider.java b/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/ExhibitorEnsembleProvider.java
index 4cbf5ee..4573724 100644
--- a/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/ExhibitorEnsembleProvider.java
+++ b/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/ExhibitorEnsembleProvider.java
@@ -116,19 +116,19 @@ public class ExhibitorEnsembleProvider implements EnsembleProvider
         Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
 
         service.scheduleWithFixedDelay
-        (
-            new Runnable()
-            {
-                @Override
-                public void run()
+            (
+                new Runnable()
                 {
-                    poll();
-                }
-            },
-            pollingMs,
-            pollingMs,
-            TimeUnit.MILLISECONDS
-        );
+                    @Override
+                    public void run()
+                    {
+                        poll();
+                    }
+                },
+                pollingMs,
+                pollingMs,
+                TimeUnit.MILLISECONDS
+            );
     }
 
     @Override
@@ -151,6 +151,12 @@ public class ExhibitorEnsembleProvider implements EnsembleProvider
         log.info("setConnectionString received. Ignoring. " + connectionString);
     }
 
+    @Override
+    public boolean updateServerListEnabled()
+    {
+        return false;
+    }
+
     @VisibleForTesting
     protected void poll()
     {

http://git-wip-us.apache.org/repos/asf/curator/blob/6e56e8ae/curator-client/src/main/java/org/apache/curator/ensemble/fixed/FixedEnsembleProvider.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/ensemble/fixed/FixedEnsembleProvider.java b/curator-client/src/main/java/org/apache/curator/ensemble/fixed/FixedEnsembleProvider.java
index 159497d..28ad1b6 100644
--- a/curator-client/src/main/java/org/apache/curator/ensemble/fixed/FixedEnsembleProvider.java
+++ b/curator-client/src/main/java/org/apache/curator/ensemble/fixed/FixedEnsembleProvider.java
@@ -63,4 +63,10 @@ public class FixedEnsembleProvider implements EnsembleProvider
     {
         return connectionString.get();
     }
+
+    @Override
+    public boolean updateServerListEnabled()
+    {
+        return true;
+    }
 }


[7/7] curator git commit: Merge branch 'CURATOR-3.0' into CURATOR-266

Posted by ra...@apache.org.
Merge branch 'CURATOR-3.0' into CURATOR-266

Conflicts:
	curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceFacade.java


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/779ff5ea
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/779ff5ea
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/779ff5ea

Branch: refs/heads/CURATOR-3.0
Commit: 779ff5ea459ce8d60c92db1c3fde2966d1cc3e3a
Parents: b89091e 0fe4d96
Author: randgalt <ra...@apache.org>
Authored: Thu Oct 8 19:11:56 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Thu Oct 8 19:11:56 2015 -0500

----------------------------------------------------------------------
 .../apache/curator/CuratorZookeeperClient.java  |   2 +-
 .../curator/framework/api/CreateBuilder.java    |  65 +----------
 .../framework/api/CreateBuilderMain.java        |  86 ++++++++++++++
 .../curator/framework/api/DeleteBuilder.java    |   4 +-
 .../framework/api/DeleteBuilderMain.java        |  23 ++++
 .../framework/imps/CreateBuilderImpl.java       |  61 ++++++++++
 .../framework/imps/DeleteBuilderImpl.java       |  21 ++++
 .../curator/framework/imps/NamespaceFacade.java |  13 ---
 .../curator/framework/imps/TestFramework.java   | 117 +++++++++++++++++++
 9 files changed, 314 insertions(+), 78 deletions(-)
----------------------------------------------------------------------



[5/7] curator git commit: continued work on tests, etc.

Posted by ra...@apache.org.
continued work on tests, etc.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/cb34e6f6
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/cb34e6f6
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/cb34e6f6

Branch: refs/heads/CURATOR-3.0
Commit: cb34e6f6a41b08c9d4e6179d9f893b0e48e7860c
Parents: 2827ba8
Author: randgalt <ra...@apache.org>
Authored: Sun Sep 27 13:31:32 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sun Sep 27 13:31:32 2015 -0500

----------------------------------------------------------------------
 .../ensemble/fixed/FixedEnsembleProvider.java   | 16 ++++++-
 .../framework/imps/CuratorFrameworkImpl.java    |  5 +++
 .../curator/framework/imps/EnsembleTracker.java | 45 ++++++++++++--------
 .../src/site/confluence/index.confluence        |  1 +
 .../framework/imps/TestFrameworkBackground.java |  9 ++--
 .../framework/imps/TestReconfiguration.java     |  9 +++-
 .../recipes/nodes/PersistentEphemeralNode.java  | 27 ++++++------
 .../curator/framework/imps/TestCleanState.java  |  9 ++++
 .../locks/TestInterProcessSemaphoreCluster.java |  6 +++
 .../nodes/TestPersistentEphemeralNode.java      | 16 ++++---
 src/site/confluence/utilities.confluence        |  7 +--
 11 files changed, 104 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/cb34e6f6/curator-client/src/main/java/org/apache/curator/ensemble/fixed/FixedEnsembleProvider.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/ensemble/fixed/FixedEnsembleProvider.java b/curator-client/src/main/java/org/apache/curator/ensemble/fixed/FixedEnsembleProvider.java
index 28ad1b6..5f486f4 100644
--- a/curator-client/src/main/java/org/apache/curator/ensemble/fixed/FixedEnsembleProvider.java
+++ b/curator-client/src/main/java/org/apache/curator/ensemble/fixed/FixedEnsembleProvider.java
@@ -20,6 +20,7 @@ package org.apache.curator.ensemble.fixed;
 
 import com.google.common.base.Preconditions;
 import org.apache.curator.ensemble.EnsembleProvider;
+import org.apache.zookeeper.ZooKeeper;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -29,6 +30,7 @@ import java.util.concurrent.atomic.AtomicReference;
 public class FixedEnsembleProvider implements EnsembleProvider
 {
     private final AtomicReference<String> connectionString = new AtomicReference<>();
+    private final boolean updateServerListEnabled;
 
     /**
      * The connection string to use
@@ -37,6 +39,18 @@ public class FixedEnsembleProvider implements EnsembleProvider
      */
     public FixedEnsembleProvider(String connectionString)
     {
+        this(connectionString, true);
+    }
+
+    /**
+     * The connection string to use
+     *
+     * @param connectionString connection string
+     * @param updateServerListEnabled if true, allow Curator to call {@link ZooKeeper#updateServerList(String)}
+     */
+    public FixedEnsembleProvider(String connectionString, boolean updateServerListEnabled)
+    {
+        this.updateServerListEnabled = updateServerListEnabled;
         this.connectionString.set(Preconditions.checkNotNull(connectionString, "connectionString cannot be null"));
     }
 
@@ -67,6 +81,6 @@ public class FixedEnsembleProvider implements EnsembleProvider
     @Override
     public boolean updateServerListEnabled()
     {
-        return true;
+        return updateServerListEnabled;
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/cb34e6f6/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index f2f578c..c3215ad 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -770,6 +770,11 @@ public class CuratorFrameworkImpl implements CuratorFramework
         connectionStateManager.addStateChange(newConnectionState);
     }
 
+    EnsembleTracker getEnsembleTracker()
+    {
+        return ensembleTracker;
+    }
+
     @SuppressWarnings({"ThrowableResultOfMethodCallIgnored"})
     private <DATA_TYPE> boolean checkBackgroundRetry(OperationAndData<DATA_TYPE> operationAndData, CuratorEvent event)
     {

http://git-wip-us.apache.org/repos/asf/curator/blob/cb34e6f6/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
index d8092fe..acd01ee 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
@@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.curator.ensemble.EnsembleProvider;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.CuratorEventType;
@@ -43,10 +44,10 @@ import java.util.Properties;
 import java.util.concurrent.atomic.AtomicReference;
 
 @VisibleForTesting
-public class EnsembleTracker implements Closeable
+public class EnsembleTracker implements Closeable, CuratorWatcher
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
-    private final CuratorFramework client;
+    private final WatcherRemoveCuratorFramework client;
     private final EnsembleProvider ensembleProvider;
     private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
     private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
@@ -68,18 +69,6 @@ public class EnsembleTracker implements Closeable
         }
     };
 
-    private final CuratorWatcher watcher = new CuratorWatcher()
-    {
-        @Override
-        public void process(WatchedEvent event) throws Exception
-        {
-            if ( event.getType() == Watcher.Event.EventType.NodeDataChanged )
-            {
-                reset();
-            }
-        }
-    };
-
     private enum State
     {
         LATENT,
@@ -89,7 +78,7 @@ public class EnsembleTracker implements Closeable
 
     EnsembleTracker(CuratorFramework client, EnsembleProvider ensembleProvider)
     {
-        this.client = client;
+        this.client = client.newWatcherRemoveCuratorFramework();
         this.ensembleProvider = ensembleProvider;
     }
 
@@ -103,7 +92,20 @@ public class EnsembleTracker implements Closeable
     @Override
     public void close()
     {
-        client.getConnectionStateListenable().removeListener(connectionStateListener);
+        if ( state.compareAndSet(State.STARTED, State.CLOSED) )
+        {
+            client.removeWatchers();
+            client.getConnectionStateListenable().removeListener(connectionStateListener);
+        }
+    }
+
+    @Override
+    public void process(WatchedEvent event) throws Exception
+    {
+        if ( event.getType() == Watcher.Event.EventType.NodeDataChanged )
+        {
+            reset();
+        }
     }
 
     private void reset() throws Exception
@@ -119,7 +121,7 @@ public class EnsembleTracker implements Closeable
                 }
             }
         };
-        client.getConfig().usingWatcher(watcher).inBackground(backgroundCallback).forEnsemble();
+        client.getConfig().usingWatcher(this).inBackground(backgroundCallback).forEnsemble();
     }
 
     @VisibleForTesting
@@ -145,6 +147,13 @@ public class EnsembleTracker implements Closeable
     {
         log.info("New config event received: " + Arrays.toString(data));
         String connectionString = configToConnectionString(data);
-        ensembleProvider.setConnectionString(connectionString);
+        if ( connectionString.trim().length() > 0 )
+        {
+            ensembleProvider.setConnectionString(connectionString);
+        }
+        else
+        {
+            log.debug("Ignoring new config as it is empty");
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/cb34e6f6/curator-framework/src/site/confluence/index.confluence
----------------------------------------------------------------------
diff --git a/curator-framework/src/site/confluence/index.confluence b/curator-framework/src/site/confluence/index.confluence
index 1f5f329..13df0de 100644
--- a/curator-framework/src/site/confluence/index.confluence
+++ b/curator-framework/src/site/confluence/index.confluence
@@ -7,6 +7,7 @@ ZooKeeper and handles the complexity of managing connections to the ZooKeeper cl
 ** There are potential error cases that require ZooKeeper clients to recreate a connection and/or retry operations. Curator
  automatically and transparently (mostly) handles these cases.
 ** Watches for NodeDataChanged events and calls updateServerList() as needed.
+** Watches are automatically removed by Curator recipes
 * Cleaner API:
 ** simplifies the raw ZooKeeper methods, events, etc.
 ** provides a modern, fluent interface

http://git-wip-us.apache.org/repos/asf/curator/blob/cb34e6f6/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
index 6575018..83dab6b 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
@@ -34,6 +34,8 @@ import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.zookeeper.KeeperException.Code;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.List;
@@ -46,6 +48,8 @@ import java.util.concurrent.atomic.AtomicReference;
 
 public class TestFrameworkBackground extends BaseClassForTests
 {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
     @Test
     public void testListenerConnectedAtStart() throws Exception
     {
@@ -160,11 +164,10 @@ public class TestFrameworkBackground extends BaseClassForTests
                 }
             };
             client.create().inBackground(callback).forPath("/one");
-            client.create().inBackground(callback).forPath("/one/two");
-            client.create().inBackground(callback).forPath("/one/two/three");
-
             Assert.assertEquals(paths.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), "/one");
+            client.create().inBackground(callback).forPath("/one/two");
             Assert.assertEquals(paths.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), "/one/two");
+            client.create().inBackground(callback).forPath("/one/two/three");
             Assert.assertEquals(paths.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), "/one/two/three");
         }
         finally

http://git-wip-us.apache.org/repos/asf/curator/blob/cb34e6f6/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
index 0ec796b..e399a4d 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
@@ -45,6 +45,10 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
@@ -380,11 +384,12 @@ public class TestReconfiguration extends BaseClassForTests
         }
     }
 
-    private List<String> toReconfigSpec(Collection<InstanceSpec> instances)
+    private List<String> toReconfigSpec(Collection<InstanceSpec> instances) throws Exception
     {
+        String localhost = new InetSocketAddress((InetAddress)null, 0).getAddress().getHostAddress();
         List<String> specs = Lists.newArrayList();
         for ( InstanceSpec instance : instances ) {
-            specs.add("server." + instance.getServerId() + "=localhost:" + instance.getElectionPort() + ":" + instance.getQuorumPort() + ";" + instance.getPort());
+            specs.add("server." + instance.getServerId() + "=" + localhost + ":" + instance.getElectionPort() + ":" + instance.getQuorumPort() + ";" + instance.getPort());
         }
         return specs;
     }

http://git-wip-us.apache.org/repos/asf/curator/blob/cb34e6f6/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
index 38c632a..f7a4ff4 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
@@ -114,25 +114,13 @@ public class PersistentEphemeralNode implements Closeable
         {
             if ( newState == ConnectionState.RECONNECTED )
             {
-                if ( debugReconnectLatch != null )
-                {
-                    try
-                    {
-                        debugReconnectLatch.await();
-                    }
-                    catch ( InterruptedException e )
-                    {
-                        Thread.currentThread().interrupt();
-                        e.printStackTrace();
-                    }
-                }
                 createNode();
             }
         }
     };
 
     @VisibleForTesting
-    volatile CountDownLatch debugReconnectLatch = null;
+    volatile CountDownLatch debugCreateNodeLatch = null;
 
     private enum State
     {
@@ -401,6 +389,19 @@ public class PersistentEphemeralNode implements Closeable
             return;
         }
 
+        if ( debugCreateNodeLatch != null )
+        {
+            try
+            {
+                debugCreateNodeLatch.await();
+            }
+            catch ( InterruptedException e )
+            {
+                Thread.currentThread().interrupt();
+                e.printStackTrace();
+            }
+        }
+
         try
         {
             String existingPath = nodePath.get();

http://git-wip-us.apache.org/repos/asf/curator/blob/cb34e6f6/curator-recipes/src/test/java/org/apache/curator/framework/imps/TestCleanState.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/imps/TestCleanState.java b/curator-recipes/src/test/java/org/apache/curator/framework/imps/TestCleanState.java
index 82de1fc..f90f463 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/imps/TestCleanState.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/imps/TestCleanState.java
@@ -35,6 +35,11 @@ public class TestCleanState
         try
         {
             CuratorFrameworkImpl internalClient = (CuratorFrameworkImpl)client;
+            EnsembleTracker ensembleTracker = internalClient.getEnsembleTracker();
+            if ( ensembleTracker != null )
+            {
+                ensembleTracker.close();
+            }
             ZooKeeper zooKeeper = internalClient.getZooKeeper();
             if ( zooKeeper != null )
             {
@@ -52,6 +57,10 @@ public class TestCleanState
                 }
             }
         }
+        catch ( IllegalStateException ignore )
+        {
+            // client already closed
+        }
         catch ( Exception e )
         {
             e.printStackTrace();    // not sure what to do here

http://git-wip-us.apache.org/repos/asf/curator/blob/cb34e6f6/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java
index ee49288..c06d042 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java
@@ -70,6 +70,12 @@ public class TestInterProcessSemaphoreCluster
                 }
 
                 @Override
+                public boolean updateServerListEnabled()
+                {
+                    return false;
+                }
+
+                @Override
                 public void start() throws Exception
                 {
                 }

http://git-wip-us.apache.org/repos/asf/curator/blob/cb34e6f6/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
index 4162886..0ee6dec 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
@@ -306,7 +306,6 @@ public class TestPersistentEphemeralNode extends BaseClassForTests
         CuratorFramework observer = newCurator();
 
         PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
-        node.debugReconnectLatch = new CountDownLatch(1);
         node.start();
         try
         {
@@ -317,11 +316,12 @@ public class TestPersistentEphemeralNode extends BaseClassForTests
             Trigger deletedTrigger = Trigger.deleted();
             observer.checkExists().usingWatcher(deletedTrigger).forPath(node.getActualPath());
 
+            node.debugCreateNodeLatch = new CountDownLatch(1);
             KillSession.kill(curator.getZookeeperClient().getZooKeeper());
 
             // Make sure the node got deleted
             assertTrue(deletedTrigger.firedWithin(timing.forSessionSleep().seconds(), TimeUnit.SECONDS));
-            node.debugReconnectLatch.countDown();
+            node.debugCreateNodeLatch.countDown();
         }
         finally
         {
@@ -336,7 +336,6 @@ public class TestPersistentEphemeralNode extends BaseClassForTests
         CuratorFramework observer = newCurator();
 
         PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
-        node.debugReconnectLatch = new CountDownLatch(1);
         node.start();
         try
         {
@@ -346,11 +345,12 @@ public class TestPersistentEphemeralNode extends BaseClassForTests
             Trigger deletedTrigger = Trigger.deleted();
             observer.checkExists().usingWatcher(deletedTrigger).forPath(node.getActualPath());
 
+            node.debugCreateNodeLatch = new CountDownLatch(1);
             KillSession.kill(curator.getZookeeperClient().getZooKeeper());
 
             // Make sure the node got deleted...
             assertTrue(deletedTrigger.firedWithin(timing.forSessionSleep().seconds(), TimeUnit.SECONDS));
-            node.debugReconnectLatch.countDown();
+            node.debugCreateNodeLatch.countDown();
 
             // Check for it to be recreated...
             Trigger createdTrigger = Trigger.created();
@@ -380,16 +380,16 @@ public class TestPersistentEphemeralNode extends BaseClassForTests
             // We should be able to disconnect multiple times and each time the node should be recreated.
             for ( int i = 0; i < 5; i++ )
             {
-                node.debugReconnectLatch = new CountDownLatch(1);
                 Trigger deletionTrigger = Trigger.deleted();
                 observer.checkExists().usingWatcher(deletionTrigger).forPath(path);
 
+                node.debugCreateNodeLatch = new CountDownLatch(1);
                 // Kill the session, thus cleaning up the node...
                 KillSession.kill(curator.getZookeeperClient().getZooKeeper());
 
                 // Make sure the node ended up getting deleted...
                 assertTrue(deletionTrigger.firedWithin(timing.multiple(1.5).forSessionSleep().seconds(), TimeUnit.SECONDS));
-                node.debugReconnectLatch.countDown();
+                node.debugCreateNodeLatch.countDown();
 
                 // Now put a watch in the background looking to see if it gets created...
                 Trigger creationTrigger = Trigger.created();
@@ -706,6 +706,10 @@ public class TestPersistentEphemeralNode extends BaseClassForTests
             {
                 latch.countDown();
             }
+            else if ( type != EventType.None )
+            {
+                Assert.fail("Unexpected watcher event: " + event);
+            }
         }
 
         public boolean firedWithin(long duration, TimeUnit unit)

http://git-wip-us.apache.org/repos/asf/curator/blob/cb34e6f6/src/site/confluence/utilities.confluence
----------------------------------------------------------------------
diff --git a/src/site/confluence/utilities.confluence b/src/site/confluence/utilities.confluence
index efacb3c..3a62fa5 100644
--- a/src/site/confluence/utilities.confluence
+++ b/src/site/confluence/utilities.confluence
@@ -38,7 +38,8 @@ Due to limitations in ZooKeeper's transport layer, a single queue will break if
 provides a facade over multiple distributed queues. It monitors the queues and if any one of them goes over a threshold, a new
 queue is added. Puts are distributed amongst the queues.
 
-h2. EnsembleTracker
+h2. WatcherRemoveCuratorFramework
 
-Utility to listen for ensemble/configuration changes via registered EnsembleListeners. Allocate a EnsembleTracker, add one or more listeners
-and start it.
+Curator has a utility that makes it easy to set watchers and remove them at a later date. It is used for all Curator recipes.
+From your CuratorFramework instance, call newWatcherRemoveCuratorFramework(). When using this proxy instance any watchers that are
+set are recorded. You can then call removeWatchers() to remove those watchers. See the Curator source code for usage details.


[6/7] curator git commit: Support getting at the cached config from the ensemble tracker

Posted by ra...@apache.org.
Support getting at the cached config from the ensemble tracker


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/b89091e9
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/b89091e9
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/b89091e9

Branch: refs/heads/CURATOR-3.0
Commit: b89091e9363e760aa34028bcfb57baf6ca921957
Parents: cb34e6f
Author: randgalt <ra...@apache.org>
Authored: Fri Oct 2 09:37:13 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Oct 2 09:37:13 2015 -0500

----------------------------------------------------------------------
 .../curator/framework/CuratorFramework.java     |  8 ++++++
 .../framework/imps/CuratorFrameworkImpl.java    |  7 +++++
 .../curator/framework/imps/EnsembleTracker.java | 27 +++++++++++++++-----
 .../curator/framework/imps/NamespaceFacade.java |  7 +++++
 .../framework/imps/WatcherRemovalFacade.java    |  7 +++++
 .../framework/imps/TestReconfiguration.java     | 12 ++++-----
 6 files changed, 55 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/b89091e9/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index 3d197a0..29c5f06 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -31,6 +31,7 @@ import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.framework.state.ConnectionStateErrorPolicy;
 import org.apache.curator.utils.EnsurePath;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 
 import java.io.Closeable;
 import java.util.concurrent.TimeUnit;
@@ -305,4 +306,11 @@ public interface CuratorFramework extends Closeable
      * @return error policy
      */
     public ConnectionStateErrorPolicy getConnectionStateErrorPolicy();
+
+    /**
+     * Current maintains a cached view of the Zookeeper quorum config.
+     *
+     * @return the current config
+     */
+    public QuorumVerifier getCurrentConfig();
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/b89091e9/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index c3215ad..db18594 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -50,6 +50,7 @@ import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.Arrays;
@@ -171,6 +172,12 @@ public class CuratorFrameworkImpl implements CuratorFramework
         return new WatcherRemovalFacade(this);
     }
 
+    @Override
+    public QuorumVerifier getCurrentConfig()
+    {
+        return (ensembleTracker != null) ? ensembleTracker.getCurrentConfig() : null;
+    }
+
     private ZookeeperFactory makeZookeeperFactory(final ZookeeperFactory actualZookeeperFactory)
     {
         return new ZookeeperFactory()

http://git-wip-us.apache.org/repos/asf/curator/blob/b89091e9/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
index acd01ee..a46fed1 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
@@ -21,6 +21,7 @@ package org.apache.curator.framework.imps;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
 import org.apache.curator.ensemble.EnsembleProvider;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.WatcherRemoveCuratorFramework;
@@ -50,6 +51,7 @@ public class EnsembleTracker implements Closeable, CuratorWatcher
     private final WatcherRemoveCuratorFramework client;
     private final EnsembleProvider ensembleProvider;
     private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
+    private final AtomicReference<QuorumMaj> currentConfig = new AtomicReference<>(new QuorumMaj(Maps.<Long, QuorumPeer.QuorumServer>newHashMap()));
     private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
     {
         @Override
@@ -108,6 +110,16 @@ public class EnsembleTracker implements Closeable, CuratorWatcher
         }
     }
 
+    /**
+     * Return the current quorum config
+     *
+     * @return config
+     */
+    public QuorumVerifier getCurrentConfig()
+    {
+        return currentConfig.get();
+    }
+
     private void reset() throws Exception
     {
         BackgroundCallback backgroundCallback = new BackgroundCallback()
@@ -125,13 +137,10 @@ public class EnsembleTracker implements Closeable, CuratorWatcher
     }
 
     @VisibleForTesting
-    public static String configToConnectionString(byte[] data) throws Exception
+    public static String configToConnectionString(QuorumVerifier data) throws Exception
     {
-        Properties properties = new Properties();
-        properties.load(new ByteArrayInputStream(data));
-        QuorumVerifier qv = new QuorumMaj(properties);
         StringBuilder sb = new StringBuilder();
-        for ( QuorumPeer.QuorumServer server : qv.getAllMembers().values() )
+        for ( QuorumPeer.QuorumServer server : data.getAllMembers().values() )
         {
             if ( sb.length() != 0 )
             {
@@ -146,7 +155,13 @@ public class EnsembleTracker implements Closeable, CuratorWatcher
     private void processConfigData(byte[] data) throws Exception
     {
         log.info("New config event received: " + Arrays.toString(data));
-        String connectionString = configToConnectionString(data);
+
+        Properties properties = new Properties();
+        properties.load(new ByteArrayInputStream(data));
+        QuorumMaj newConfig = new QuorumMaj(properties);
+        currentConfig.set(newConfig);
+
+        String connectionString = configToConnectionString(newConfig);
         if ( connectionString.trim().length() > 0 )
         {
             ensembleProvider.setConnectionString(connectionString);

http://git-wip-us.apache.org/repos/asf/curator/blob/b89091e9/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceFacade.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceFacade.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceFacade.java
index 60ef647..9935670 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceFacade.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceFacade.java
@@ -28,6 +28,7 @@ import org.apache.curator.framework.listen.Listenable;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.utils.EnsurePath;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 
 class NamespaceFacade extends CuratorFrameworkImpl
 {
@@ -49,6 +50,12 @@ class NamespaceFacade extends CuratorFrameworkImpl
     }
 
     @Override
+    public QuorumVerifier getCurrentConfig()
+    {
+        return client.getCurrentConfig();
+    }
+
+    @Override
     public CuratorFramework nonNamespaceView()
     {
         return usingNamespace(null);

http://git-wip-us.apache.org/repos/asf/curator/blob/b89091e9/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
index 47c2104..371fc63 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
@@ -30,6 +30,7 @@ import org.apache.curator.framework.listen.Listenable;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.utils.EnsurePath;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 
 class WatcherRemovalFacade extends CuratorFrameworkImpl implements WatcherRemoveCuratorFramework
 {
@@ -55,6 +56,12 @@ class WatcherRemovalFacade extends CuratorFrameworkImpl implements WatcherRemove
     }
 
     @Override
+    public QuorumVerifier getCurrentConfig()
+    {
+        return client.getCurrentConfig();
+    }
+
+    @Override
     public void removeWatchers()
     {
         removalManager.removeWatchers();

http://git-wip-us.apache.org/repos/asf/curator/blob/b89091e9/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
index e399a4d..7565590 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
@@ -45,10 +45,8 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
-import java.net.Inet4Address;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
@@ -164,7 +162,7 @@ public class TestReconfiguration extends BaseClassForTests
             QuorumVerifier quorumVerifier = toQuorumVerifier(configData);
             System.out.println(quorumVerifier);
             assertConfig(quorumVerifier, cluster.getInstances());
-            Assert.assertEquals(EnsembleTracker.configToConnectionString(configData), ensembleProvider.getConnectionString());
+            Assert.assertEquals(EnsembleTracker.configToConnectionString(quorumVerifier), ensembleProvider.getConnectionString());
         }
     }
 
@@ -192,7 +190,7 @@ public class TestReconfiguration extends BaseClassForTests
                 List<InstanceSpec> newInstances = Lists.newArrayList(cluster.getInstances());
                 newInstances.addAll(newCluster.getInstances());
                 assertConfig(newConfig, newInstances);
-                Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfigData), ensembleProvider.getConnectionString());
+                Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfig), ensembleProvider.getConnectionString());
             }
         }
     }
@@ -234,7 +232,7 @@ public class TestReconfiguration extends BaseClassForTests
                 List<InstanceSpec> newInstances = Lists.newArrayList(cluster.getInstances());
                 newInstances.addAll(newCluster.getInstances());
                 assertConfig(newConfig, newInstances);
-                Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfigData), ensembleProvider.getConnectionString());
+                Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfig), ensembleProvider.getConnectionString());
             }
         }
     }
@@ -275,7 +273,7 @@ public class TestReconfiguration extends BaseClassForTests
                 newInstances.addAll(instances);
                 newInstances.remove(removeSpec);
                 assertConfig(newConfig, newInstances);
-                Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfigData), ensembleProvider.getConnectionString());
+                Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfig), ensembleProvider.getConnectionString());
             }
         }
     }
@@ -311,7 +309,7 @@ public class TestReconfiguration extends BaseClassForTests
             QuorumVerifier newConfig = toQuorumVerifier(newConfigData);
             Assert.assertEquals(newConfig.getAllMembers().size(), 3);
             assertConfig(newConfig, smallCluster);
-            Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfigData), ensembleProvider.getConnectionString());
+            Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfig), ensembleProvider.getConnectionString());
         }
     }