You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2015/10/09 02:12:23 UTC
[6/7] curator git commit: Support getting at the cached config from
the ensemble tracker
Support getting at the cached config from the ensemble tracker
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/b89091e9
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/b89091e9
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/b89091e9
Branch: refs/heads/CURATOR-3.0
Commit: b89091e9363e760aa34028bcfb57baf6ca921957
Parents: cb34e6f
Author: randgalt <ra...@apache.org>
Authored: Fri Oct 2 09:37:13 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Oct 2 09:37:13 2015 -0500
----------------------------------------------------------------------
.../curator/framework/CuratorFramework.java | 8 ++++++
.../framework/imps/CuratorFrameworkImpl.java | 7 +++++
.../curator/framework/imps/EnsembleTracker.java | 27 +++++++++++++++-----
.../curator/framework/imps/NamespaceFacade.java | 7 +++++
.../framework/imps/WatcherRemovalFacade.java | 7 +++++
.../framework/imps/TestReconfiguration.java | 12 ++++-----
6 files changed, 55 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/b89091e9/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index 3d197a0..29c5f06 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -31,6 +31,7 @@ import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.framework.state.ConnectionStateErrorPolicy;
import org.apache.curator.utils.EnsurePath;
import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import java.io.Closeable;
import java.util.concurrent.TimeUnit;
@@ -305,4 +306,11 @@ public interface CuratorFramework extends Closeable
* @return error policy
*/
public ConnectionStateErrorPolicy getConnectionStateErrorPolicy();
+
+ /**
+ * Current maintains a cached view of the Zookeeper quorum config.
+ *
+ * @return the current config
+ */
+ public QuorumVerifier getCurrentConfig();
}
http://git-wip-us.apache.org/repos/asf/curator/blob/b89091e9/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index c3215ad..db18594 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -50,6 +50,7 @@ import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
@@ -171,6 +172,12 @@ public class CuratorFrameworkImpl implements CuratorFramework
return new WatcherRemovalFacade(this);
}
+ @Override
+ public QuorumVerifier getCurrentConfig()
+ {
+ return (ensembleTracker != null) ? ensembleTracker.getCurrentConfig() : null;
+ }
+
private ZookeeperFactory makeZookeeperFactory(final ZookeeperFactory actualZookeeperFactory)
{
return new ZookeeperFactory()
http://git-wip-us.apache.org/repos/asf/curator/blob/b89091e9/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
index acd01ee..a46fed1 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
@@ -21,6 +21,7 @@ package org.apache.curator.framework.imps;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
import org.apache.curator.ensemble.EnsembleProvider;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.WatcherRemoveCuratorFramework;
@@ -50,6 +51,7 @@ public class EnsembleTracker implements Closeable, CuratorWatcher
private final WatcherRemoveCuratorFramework client;
private final EnsembleProvider ensembleProvider;
private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
+ private final AtomicReference<QuorumMaj> currentConfig = new AtomicReference<>(new QuorumMaj(Maps.<Long, QuorumPeer.QuorumServer>newHashMap()));
private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
{
@Override
@@ -108,6 +110,16 @@ public class EnsembleTracker implements Closeable, CuratorWatcher
}
}
+ /**
+ * Return the current quorum config
+ *
+ * @return config
+ */
+ public QuorumVerifier getCurrentConfig()
+ {
+ return currentConfig.get();
+ }
+
private void reset() throws Exception
{
BackgroundCallback backgroundCallback = new BackgroundCallback()
@@ -125,13 +137,10 @@ public class EnsembleTracker implements Closeable, CuratorWatcher
}
@VisibleForTesting
- public static String configToConnectionString(byte[] data) throws Exception
+ public static String configToConnectionString(QuorumVerifier data) throws Exception
{
- Properties properties = new Properties();
- properties.load(new ByteArrayInputStream(data));
- QuorumVerifier qv = new QuorumMaj(properties);
StringBuilder sb = new StringBuilder();
- for ( QuorumPeer.QuorumServer server : qv.getAllMembers().values() )
+ for ( QuorumPeer.QuorumServer server : data.getAllMembers().values() )
{
if ( sb.length() != 0 )
{
@@ -146,7 +155,13 @@ public class EnsembleTracker implements Closeable, CuratorWatcher
private void processConfigData(byte[] data) throws Exception
{
log.info("New config event received: " + Arrays.toString(data));
- String connectionString = configToConnectionString(data);
+
+ Properties properties = new Properties();
+ properties.load(new ByteArrayInputStream(data));
+ QuorumMaj newConfig = new QuorumMaj(properties);
+ currentConfig.set(newConfig);
+
+ String connectionString = configToConnectionString(newConfig);
if ( connectionString.trim().length() > 0 )
{
ensembleProvider.setConnectionString(connectionString);
http://git-wip-us.apache.org/repos/asf/curator/blob/b89091e9/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceFacade.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceFacade.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceFacade.java
index 60ef647..9935670 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceFacade.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceFacade.java
@@ -28,6 +28,7 @@ import org.apache.curator.framework.listen.Listenable;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.utils.EnsurePath;
import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
class NamespaceFacade extends CuratorFrameworkImpl
{
@@ -49,6 +50,12 @@ class NamespaceFacade extends CuratorFrameworkImpl
}
@Override
+ public QuorumVerifier getCurrentConfig()
+ {
+ return client.getCurrentConfig();
+ }
+
+ @Override
public CuratorFramework nonNamespaceView()
{
return usingNamespace(null);
http://git-wip-us.apache.org/repos/asf/curator/blob/b89091e9/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
index 47c2104..371fc63 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
@@ -30,6 +30,7 @@ import org.apache.curator.framework.listen.Listenable;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.utils.EnsurePath;
import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
class WatcherRemovalFacade extends CuratorFrameworkImpl implements WatcherRemoveCuratorFramework
{
@@ -55,6 +56,12 @@ class WatcherRemovalFacade extends CuratorFrameworkImpl implements WatcherRemove
}
@Override
+ public QuorumVerifier getCurrentConfig()
+ {
+ return client.getCurrentConfig();
+ }
+
+ @Override
public void removeWatchers()
{
removalManager.removeWatchers();
http://git-wip-us.apache.org/repos/asf/curator/blob/b89091e9/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
index e399a4d..7565590 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
@@ -45,10 +45,8 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.io.ByteArrayInputStream;
import java.io.IOException;
-import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
@@ -164,7 +162,7 @@ public class TestReconfiguration extends BaseClassForTests
QuorumVerifier quorumVerifier = toQuorumVerifier(configData);
System.out.println(quorumVerifier);
assertConfig(quorumVerifier, cluster.getInstances());
- Assert.assertEquals(EnsembleTracker.configToConnectionString(configData), ensembleProvider.getConnectionString());
+ Assert.assertEquals(EnsembleTracker.configToConnectionString(quorumVerifier), ensembleProvider.getConnectionString());
}
}
@@ -192,7 +190,7 @@ public class TestReconfiguration extends BaseClassForTests
List<InstanceSpec> newInstances = Lists.newArrayList(cluster.getInstances());
newInstances.addAll(newCluster.getInstances());
assertConfig(newConfig, newInstances);
- Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfigData), ensembleProvider.getConnectionString());
+ Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfig), ensembleProvider.getConnectionString());
}
}
}
@@ -234,7 +232,7 @@ public class TestReconfiguration extends BaseClassForTests
List<InstanceSpec> newInstances = Lists.newArrayList(cluster.getInstances());
newInstances.addAll(newCluster.getInstances());
assertConfig(newConfig, newInstances);
- Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfigData), ensembleProvider.getConnectionString());
+ Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfig), ensembleProvider.getConnectionString());
}
}
}
@@ -275,7 +273,7 @@ public class TestReconfiguration extends BaseClassForTests
newInstances.addAll(instances);
newInstances.remove(removeSpec);
assertConfig(newConfig, newInstances);
- Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfigData), ensembleProvider.getConnectionString());
+ Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfig), ensembleProvider.getConnectionString());
}
}
}
@@ -311,7 +309,7 @@ public class TestReconfiguration extends BaseClassForTests
QuorumVerifier newConfig = toQuorumVerifier(newConfigData);
Assert.assertEquals(newConfig.getAllMembers().size(), 3);
assertConfig(newConfig, smallCluster);
- Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfigData), ensembleProvider.getConnectionString());
+ Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfig), ensembleProvider.getConnectionString());
}
}