You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2015/10/09 02:12:23 UTC

[6/7] curator git commit: Support getting at the cached config from the ensemble tracker

Support getting at the cached config from the ensemble tracker


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/b89091e9
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/b89091e9
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/b89091e9

Branch: refs/heads/CURATOR-3.0
Commit: b89091e9363e760aa34028bcfb57baf6ca921957
Parents: cb34e6f
Author: randgalt <ra...@apache.org>
Authored: Fri Oct 2 09:37:13 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Oct 2 09:37:13 2015 -0500

----------------------------------------------------------------------
 .../curator/framework/CuratorFramework.java     |  8 ++++++
 .../framework/imps/CuratorFrameworkImpl.java    |  7 +++++
 .../curator/framework/imps/EnsembleTracker.java | 27 +++++++++++++++-----
 .../curator/framework/imps/NamespaceFacade.java |  7 +++++
 .../framework/imps/WatcherRemovalFacade.java    |  7 +++++
 .../framework/imps/TestReconfiguration.java     | 12 ++++-----
 6 files changed, 55 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/b89091e9/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index 3d197a0..29c5f06 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -31,6 +31,7 @@ import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.framework.state.ConnectionStateErrorPolicy;
 import org.apache.curator.utils.EnsurePath;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 
 import java.io.Closeable;
 import java.util.concurrent.TimeUnit;
@@ -305,4 +306,11 @@ public interface CuratorFramework extends Closeable
      * @return error policy
      */
     public ConnectionStateErrorPolicy getConnectionStateErrorPolicy();
+
+    /**
+     * Current maintains a cached view of the Zookeeper quorum config.
+     *
+     * @return the current config
+     */
+    public QuorumVerifier getCurrentConfig();
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/b89091e9/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index c3215ad..db18594 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -50,6 +50,7 @@ import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.Arrays;
@@ -171,6 +172,12 @@ public class CuratorFrameworkImpl implements CuratorFramework
         return new WatcherRemovalFacade(this);
     }
 
+    @Override
+    public QuorumVerifier getCurrentConfig()
+    {
+        return (ensembleTracker != null) ? ensembleTracker.getCurrentConfig() : null;
+    }
+
     private ZookeeperFactory makeZookeeperFactory(final ZookeeperFactory actualZookeeperFactory)
     {
         return new ZookeeperFactory()

http://git-wip-us.apache.org/repos/asf/curator/blob/b89091e9/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
index acd01ee..a46fed1 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
@@ -21,6 +21,7 @@ package org.apache.curator.framework.imps;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
 import org.apache.curator.ensemble.EnsembleProvider;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.WatcherRemoveCuratorFramework;
@@ -50,6 +51,7 @@ public class EnsembleTracker implements Closeable, CuratorWatcher
     private final WatcherRemoveCuratorFramework client;
     private final EnsembleProvider ensembleProvider;
     private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
+    private final AtomicReference<QuorumMaj> currentConfig = new AtomicReference<>(new QuorumMaj(Maps.<Long, QuorumPeer.QuorumServer>newHashMap()));
     private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
     {
         @Override
@@ -108,6 +110,16 @@ public class EnsembleTracker implements Closeable, CuratorWatcher
         }
     }
 
+    /**
+     * Return the current quorum config
+     *
+     * @return config
+     */
+    public QuorumVerifier getCurrentConfig()
+    {
+        return currentConfig.get();
+    }
+
     private void reset() throws Exception
     {
         BackgroundCallback backgroundCallback = new BackgroundCallback()
@@ -125,13 +137,10 @@ public class EnsembleTracker implements Closeable, CuratorWatcher
     }
 
     @VisibleForTesting
-    public static String configToConnectionString(byte[] data) throws Exception
+    public static String configToConnectionString(QuorumVerifier data) throws Exception
     {
-        Properties properties = new Properties();
-        properties.load(new ByteArrayInputStream(data));
-        QuorumVerifier qv = new QuorumMaj(properties);
         StringBuilder sb = new StringBuilder();
-        for ( QuorumPeer.QuorumServer server : qv.getAllMembers().values() )
+        for ( QuorumPeer.QuorumServer server : data.getAllMembers().values() )
         {
             if ( sb.length() != 0 )
             {
@@ -146,7 +155,13 @@ public class EnsembleTracker implements Closeable, CuratorWatcher
     private void processConfigData(byte[] data) throws Exception
     {
         log.info("New config event received: " + Arrays.toString(data));
-        String connectionString = configToConnectionString(data);
+
+        Properties properties = new Properties();
+        properties.load(new ByteArrayInputStream(data));
+        QuorumMaj newConfig = new QuorumMaj(properties);
+        currentConfig.set(newConfig);
+
+        String connectionString = configToConnectionString(newConfig);
         if ( connectionString.trim().length() > 0 )
         {
             ensembleProvider.setConnectionString(connectionString);

http://git-wip-us.apache.org/repos/asf/curator/blob/b89091e9/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceFacade.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceFacade.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceFacade.java
index 60ef647..9935670 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceFacade.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceFacade.java
@@ -28,6 +28,7 @@ import org.apache.curator.framework.listen.Listenable;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.utils.EnsurePath;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 
 class NamespaceFacade extends CuratorFrameworkImpl
 {
@@ -49,6 +50,12 @@ class NamespaceFacade extends CuratorFrameworkImpl
     }
 
     @Override
+    public QuorumVerifier getCurrentConfig()
+    {
+        return client.getCurrentConfig();
+    }
+
+    @Override
     public CuratorFramework nonNamespaceView()
     {
         return usingNamespace(null);

http://git-wip-us.apache.org/repos/asf/curator/blob/b89091e9/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
index 47c2104..371fc63 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
@@ -30,6 +30,7 @@ import org.apache.curator.framework.listen.Listenable;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.utils.EnsurePath;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 
 class WatcherRemovalFacade extends CuratorFrameworkImpl implements WatcherRemoveCuratorFramework
 {
@@ -55,6 +56,12 @@ class WatcherRemovalFacade extends CuratorFrameworkImpl implements WatcherRemove
     }
 
     @Override
+    public QuorumVerifier getCurrentConfig()
+    {
+        return client.getCurrentConfig();
+    }
+
+    @Override
     public void removeWatchers()
     {
         removalManager.removeWatchers();

http://git-wip-us.apache.org/repos/asf/curator/blob/b89091e9/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
index e399a4d..7565590 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
@@ -45,10 +45,8 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
-import java.net.Inet4Address;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
@@ -164,7 +162,7 @@ public class TestReconfiguration extends BaseClassForTests
             QuorumVerifier quorumVerifier = toQuorumVerifier(configData);
             System.out.println(quorumVerifier);
             assertConfig(quorumVerifier, cluster.getInstances());
-            Assert.assertEquals(EnsembleTracker.configToConnectionString(configData), ensembleProvider.getConnectionString());
+            Assert.assertEquals(EnsembleTracker.configToConnectionString(quorumVerifier), ensembleProvider.getConnectionString());
         }
     }
 
@@ -192,7 +190,7 @@ public class TestReconfiguration extends BaseClassForTests
                 List<InstanceSpec> newInstances = Lists.newArrayList(cluster.getInstances());
                 newInstances.addAll(newCluster.getInstances());
                 assertConfig(newConfig, newInstances);
-                Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfigData), ensembleProvider.getConnectionString());
+                Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfig), ensembleProvider.getConnectionString());
             }
         }
     }
@@ -234,7 +232,7 @@ public class TestReconfiguration extends BaseClassForTests
                 List<InstanceSpec> newInstances = Lists.newArrayList(cluster.getInstances());
                 newInstances.addAll(newCluster.getInstances());
                 assertConfig(newConfig, newInstances);
-                Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfigData), ensembleProvider.getConnectionString());
+                Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfig), ensembleProvider.getConnectionString());
             }
         }
     }
@@ -275,7 +273,7 @@ public class TestReconfiguration extends BaseClassForTests
                 newInstances.addAll(instances);
                 newInstances.remove(removeSpec);
                 assertConfig(newConfig, newInstances);
-                Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfigData), ensembleProvider.getConnectionString());
+                Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfig), ensembleProvider.getConnectionString());
             }
         }
     }
@@ -311,7 +309,7 @@ public class TestReconfiguration extends BaseClassForTests
             QuorumVerifier newConfig = toQuorumVerifier(newConfigData);
             Assert.assertEquals(newConfig.getAllMembers().size(), 3);
             assertConfig(newConfig, smallCluster);
-            Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfigData), ensembleProvider.getConnectionString());
+            Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfig), ensembleProvider.getConnectionString());
         }
     }