You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by dr...@apache.org on 2015/08/17 18:54:37 UTC
[07/27] curator git commit: [CURATOR-160] Add EnsembleListener and
EnsembleTracker. Implement a DynamicEnsembleProvider. TestReconfiguration now
also tests the DynamicEnsembleProvider.
[CURATOR-160] Add EnsembleListener and EnsembleTracker. Implement a DynamicEnsembleProvider. TestReconfiguration now also tests the DynamicEnsembleProvider.
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/4ec5ffe3
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/4ec5ffe3
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/4ec5ffe3
Branch: refs/heads/CURATOR-215
Commit: 4ec5ffe3a9d1c14a0a5acbe1ebb5552a284a8908
Parents: ec4083f
Author: Ioannis Canellos <io...@gmail.com>
Authored: Tue Nov 11 16:35:57 2014 +0200
Committer: Scott Blum <dr...@apache.org>
Committed: Wed Aug 12 17:08:33 2015 -0400
----------------------------------------------------------------------
.../curator/ensemble/EnsembleListener.java | 24 +
.../dynamic/DynamicEnsembleProvider.java | 61 +++
.../curator/framework/imps/EnsembleTracker.java | 167 +++++++
.../framework/imps/TestReconfiguration.java | 489 +++++++++++--------
4 files changed, 538 insertions(+), 203 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/4ec5ffe3/curator-client/src/main/java/org/apache/curator/ensemble/EnsembleListener.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/ensemble/EnsembleListener.java b/curator-client/src/main/java/org/apache/curator/ensemble/EnsembleListener.java
new file mode 100644
index 0000000..8f963cd
--- /dev/null
+++ b/curator-client/src/main/java/org/apache/curator/ensemble/EnsembleListener.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.ensemble;
+
+public interface EnsembleListener {
+
+ void connectionStringUpdated(String connectionString);
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/4ec5ffe3/curator-client/src/main/java/org/apache/curator/ensemble/dynamic/DynamicEnsembleProvider.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/ensemble/dynamic/DynamicEnsembleProvider.java b/curator-client/src/main/java/org/apache/curator/ensemble/dynamic/DynamicEnsembleProvider.java
new file mode 100644
index 0000000..70b755f
--- /dev/null
+++ b/curator-client/src/main/java/org/apache/curator/ensemble/dynamic/DynamicEnsembleProvider.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.ensemble.dynamic;
+
+import com.google.common.base.Preconditions;
+import org.apache.curator.ensemble.EnsembleListener;
+import org.apache.curator.ensemble.EnsembleProvider;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class DynamicEnsembleProvider implements EnsembleProvider, EnsembleListener {
+
+ private final AtomicReference<String> connectionString = new AtomicReference<String>();
+
+ /**
+ * The connection string to use
+ *
+ * @param connectionString connection string
+ */
+ public DynamicEnsembleProvider(String connectionString)
+ {
+ this.connectionString.set(Preconditions.checkNotNull(connectionString, "connectionString cannot be null"));
+ }
+
+ @Override
+ public void start() throws Exception {
+ // NOP
+ }
+
+ @Override
+ public String getConnectionString() {
+ return connectionString.get();
+ }
+
+ @Override
+ public void close() throws IOException {
+ // NOP
+ }
+
+ @Override
+ public void connectionStringUpdated(String connectionString) {
+ this.connectionString.set(connectionString);
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/4ec5ffe3/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
new file mode 100644
index 0000000..a789e42
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import org.apache.curator.ensemble.EnsembleListener;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tracks changes to the ensemble and notifies registered {@link org.apache.curator.ensemble.EnsembleListener} instances.
+ */
+public class EnsembleTracker implements Closeable {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private final CuratorFramework client;
+ private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+ private final ListenerContainer<EnsembleListener> listeners = new ListenerContainer<EnsembleListener>();
+ private final AtomicBoolean isConnected = new AtomicBoolean(true);
+ private final ConnectionStateListener connectionStateListener = new ConnectionStateListener() {
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState) {
+ if ((newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED)) {
+ if (isConnected.compareAndSet(false, true)) {
+ try {
+ reset();
+ } catch (Exception e) {
+ log.error("Trying to reset after reconnection", e);
+ }
+ }
+ } else {
+ isConnected.set(false);
+ }
+ }
+ };
+
+ private final CuratorWatcher watcher = new CuratorWatcher() {
+ @Override
+ public void process(WatchedEvent event) throws Exception {
+ reset();
+ }
+ };
+
+
+ private enum State {
+ LATENT,
+ STARTED,
+ CLOSED
+ }
+
+ private final BackgroundCallback backgroundCallback = new BackgroundCallback() {
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
+ processBackgroundResult(event);
+ }
+ };
+
+
+ public EnsembleTracker(CuratorFramework client) {
+ this.client = client;
+ }
+
+ public void start() throws Exception {
+ Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
+ client.getConnectionStateListenable().addListener(connectionStateListener);
+ reset();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (state.compareAndSet(State.STARTED, State.CLOSED)) {
+ listeners.clear();
+ }
+ client.getConnectionStateListenable().removeListener(connectionStateListener);
+ }
+
+ /**
+ * Return the ensemble listenable
+ *
+ * @return listenable
+ */
+ public ListenerContainer<EnsembleListener> getListenable()
+ {
+ Preconditions.checkState(state.get() != State.CLOSED, "Closed");
+
+ return listeners;
+ }
+
+ private void reset() throws Exception {
+ client.getConfig().usingWatcher(watcher).inBackground(backgroundCallback).forEnsemble();
+ }
+
+ private void processBackgroundResult(CuratorEvent event) throws Exception {
+ switch (event.getType()) {
+ case GET_CONFIG: {
+ if (event.getResultCode() == KeeperException.Code.OK.intValue()) {
+ processConfigData(event.getData());
+ }
+ }
+ }
+ }
+
+ private void processConfigData(byte[] data) throws Exception {
+ Properties properties = new Properties();
+ properties.load(new ByteArrayInputStream(data));
+ QuorumVerifier qv = new QuorumMaj(properties);
+ StringBuilder sb = new StringBuilder();
+ for (QuorumPeer.QuorumServer server : qv.getAllMembers().values()) {
+ if (sb.length() != 0) {
+ sb.append(",");
+ }
+ sb.append(server.clientAddr.getAddress().getHostAddress()).append(":").append(server.clientAddr.getPort());
+ }
+
+ final String connectionString = sb.toString();
+ listeners.forEach
+ (
+ new Function<EnsembleListener, Void>() {
+ @Override
+ public Void apply(EnsembleListener listener) {
+ try {
+ listener.connectionStringUpdated(connectionString);
+ } catch (Exception e) {
+ log.error("Calling listener", e);
+ }
+ return null;
+ }
+ }
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/4ec5ffe3/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
index e8896ae..faec551 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
@@ -18,6 +18,8 @@
*/
package org.apache.curator.framework.imps;
+import org.apache.curator.ensemble.EnsembleListener;
+import org.apache.curator.ensemble.dynamic.DynamicEnsembleProvider;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
@@ -35,246 +37,287 @@ import org.testng.annotations.Test;
import java.io.IOException;
import java.io.StringReader;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
public class TestReconfiguration {
TestingCluster cluster;
+ DynamicEnsembleProvider dynamicEnsembleProvider;
+ WaitOnDelegateListener waitOnDelegateListener;
+ EnsembleTracker ensembleTracker;
+ CuratorFramework client;
+
+ String connectionString1to5;
+ String connectionString2to5;
+ String connectionString3to5;
@BeforeMethod
public void setup() throws Exception {
cluster = new TestingCluster(5);
cluster.start();
+
+ connectionString1to5 = cluster.getConnectString();
+ connectionString2to5 = getConnectionString(cluster, 2,3,4,5);
+ connectionString3to5 = getConnectionString(cluster, 3,4,5);
+
+ dynamicEnsembleProvider = new DynamicEnsembleProvider(connectionString1to5);
+ client = CuratorFrameworkFactory.builder()
+ .ensembleProvider(dynamicEnsembleProvider)
+ .retryPolicy(new RetryOneTime(1))
+ .build();
+ client.start();
+ client.blockUntilConnected();
+
+ //Wrap around the dynamic ensemble provider, so that we can wait until it has received the event.
+ waitOnDelegateListener = new WaitOnDelegateListener(dynamicEnsembleProvider);
+ ensembleTracker = new EnsembleTracker(client);
+ ensembleTracker.getListenable().addListener(waitOnDelegateListener);
+ ensembleTracker.start();
+ //Wait for the initial event.
+ waitOnDelegateListener.waitForEvent();
}
@AfterMethod
public void tearDown() throws IOException {
+ ensembleTracker.close();
+ client.close();
cluster.close();
}
@Test
public void testSyncIncremental() throws Exception {
- CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), new RetryOneTime(1));
- client.start();
- client.blockUntilConnected();
- try {
- Stat stat = new Stat();
- byte[] bytes = client.getConfig().storingStatIn(stat).forEnsemble();
- Assert.assertNotNull(bytes);
- QuorumVerifier qv = getQuorumVerifier(bytes);
- Assert.assertEquals(qv.getAllMembers().size(), 5);
- String server1 = getServerString(qv, cluster, 1L);
- String server2 = getServerString(qv, cluster, 2L);
-
- //Remove Servers
- bytes = client.reconfig().leaving("1").storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble();
- qv = getQuorumVerifier(bytes);
- Assert.assertEquals(qv.getAllMembers().size(), 4);
- bytes = client.reconfig().leaving("2").storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble();
- qv = getQuorumVerifier(bytes);
- Assert.assertEquals(qv.getAllMembers().size(), 3);
-
- //Add Servers
- bytes = client.reconfig().joining("server.1=" + server1).storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble();
- qv = getQuorumVerifier(bytes);
- Assert.assertEquals(qv.getAllMembers().size(), 4);
- bytes = client.reconfig().joining("server.2=" + server2).storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble();
- qv = getQuorumVerifier(bytes);
- Assert.assertEquals(qv.getAllMembers().size(), 5);
- } finally {
- client.close();
- }
+ Stat stat = new Stat();
+ byte[] bytes = client.getConfig().storingStatIn(stat).forEnsemble();
+ Assert.assertNotNull(bytes);
+ QuorumVerifier qv = getQuorumVerifier(bytes);
+ Assert.assertEquals(qv.getAllMembers().size(), 5);
+ String server1 = getServerString(qv, cluster, 1L);
+ String server2 = getServerString(qv, cluster, 2L);
+
+ //Remove Servers
+ bytes = client.reconfig().leaving("1").storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble();
+ qv = getQuorumVerifier(bytes);
+ Assert.assertEquals(qv.getAllMembers().size(), 4);
+
+ waitOnDelegateListener.waitForEvent();
+ Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5);
+
+ bytes = client.reconfig().leaving("2").storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble();
+ qv = getQuorumVerifier(bytes);
+ Assert.assertEquals(qv.getAllMembers().size(), 3);
+
+ waitOnDelegateListener.waitForEvent();
+ Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString3to5);
+
+ //Add Servers
+ bytes = client.reconfig().joining("server.2=" + server2).storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble();
+ qv = getQuorumVerifier(bytes);
+ Assert.assertEquals(qv.getAllMembers().size(), 4);
+
+ waitOnDelegateListener.waitForEvent();
+ Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5);
+
+ bytes = client.reconfig().joining("server.1=" + server1).storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble();
+ qv = getQuorumVerifier(bytes);
+ Assert.assertEquals(qv.getAllMembers().size(), 5);
+
+ waitOnDelegateListener.waitForEvent();
+ Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString1to5);
}
@Test
public void testAsyncIncremental() throws Exception {
- CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), new RetryOneTime(1));
- client.start();
- client.blockUntilConnected();
- try {
- final AtomicReference<byte[]> bytes = new AtomicReference<byte[]>();
- final BackgroundCallback callback = new BackgroundCallback() {
- @Override
- public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
- bytes.set(event.getData());
- ((CountDownLatch)event.getContext()).countDown();
+ final AtomicReference<byte[]> bytes = new AtomicReference<byte[]>();
+ final BackgroundCallback callback = new BackgroundCallback() {
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
+ bytes.set(event.getData());
+ //We only need the latch on getConfig.
+ if (event.getContext() != null) {
+ ((CountDownLatch) event.getContext()).countDown();
}
+ }
- };
-
- CountDownLatch latch = new CountDownLatch(1);
- client.getConfig().inBackground(callback, latch).forEnsemble();
- latch.await(5, TimeUnit.SECONDS);
- Assert.assertNotNull(bytes.get());
- QuorumVerifier qv = getQuorumVerifier(bytes.get());
- Assert.assertEquals(qv.getAllMembers().size(), 5);
- String server1 = getServerString(qv, cluster, 1L);
- String server2 = getServerString(qv, cluster, 2L);
-
-
- //Remove Servers
- latch = new CountDownLatch(1);
- client.reconfig().leaving("1").inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble();
- latch.await(5, TimeUnit.SECONDS);
- qv = getQuorumVerifier(bytes.get());
- Assert.assertEquals(qv.getAllMembers().size(), 4);
- latch = new CountDownLatch(1);
- client.reconfig().leaving("2").inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble();
- latch.await(5, TimeUnit.SECONDS);
- qv = getQuorumVerifier(bytes.get());
- Assert.assertEquals(qv.getAllMembers().size(), 3);
-
- //Add Servers
- latch = new CountDownLatch(1);
- client.reconfig().joining("server.1=" + server1).inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble();
- latch.await(5, TimeUnit.SECONDS);
- qv = getQuorumVerifier(bytes.get());
- Assert.assertEquals(qv.getAllMembers().size(), 4);
- latch = new CountDownLatch(1);
- client.reconfig().joining("server.2=" + server2).inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble();
- latch.await(5, TimeUnit.SECONDS);
- qv = getQuorumVerifier(bytes.get());
- Assert.assertEquals(qv.getAllMembers().size(), 5);
- } finally {
- client.close();
- }
+ };
+
+ CountDownLatch latch = new CountDownLatch(1);
+ client.getConfig().inBackground(callback, latch).forEnsemble();
+ latch.await(5, TimeUnit.SECONDS);
+ Assert.assertNotNull(bytes.get());
+ QuorumVerifier qv = getQuorumVerifier(bytes.get());
+ Assert.assertEquals(qv.getAllMembers().size(), 5);
+ String server1 = getServerString(qv, cluster, 1L);
+ String server2 = getServerString(qv, cluster, 2L);
+
+
+ //Remove Servers
+ client.reconfig().leaving("1").inBackground(callback).fromConfig(qv.getVersion()).forEnsemble();
+ waitOnDelegateListener.waitForEvent();
+ Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5);
+ qv = getQuorumVerifier(bytes.get());
+ Assert.assertEquals(qv.getAllMembers().size(), 4);
+
+ client.reconfig().leaving("2").inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble();
+ waitOnDelegateListener.waitForEvent();
+ Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString3to5);
+ qv = getQuorumVerifier(bytes.get());
+ Assert.assertEquals(qv.getAllMembers().size(), 3);
+
+ //Add Servers
+ client.reconfig().joining("server.2=" + server2).inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble();
+ waitOnDelegateListener.waitForEvent();
+ Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5);
+ qv = getQuorumVerifier(bytes.get());
+ Assert.assertEquals(qv.getAllMembers().size(), 4);
+
+
+ client.reconfig().joining("server.1=" + server1).inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble();
+ waitOnDelegateListener.waitForEvent();
+ Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString1to5);
+ qv = getQuorumVerifier(bytes.get());
+ Assert.assertEquals(qv.getAllMembers().size(), 5);
}
@Test
public void testSyncNonIncremental() throws Exception {
- CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), new RetryOneTime(1));
- client.start();
- client.blockUntilConnected();
- try {
- Stat stat = new Stat();
- byte[] bytes = client.getConfig().storingStatIn(stat).forEnsemble();
- Assert.assertNotNull(bytes);
- QuorumVerifier qv = getQuorumVerifier(bytes);
- Assert.assertEquals(qv.getAllMembers().size(), 5);
- String server1 = getServerString(qv, cluster, 1L);
- String server2 = getServerString(qv, cluster, 2L);
- String server3 = getServerString(qv, cluster, 3L);
- String server4 = getServerString(qv, cluster, 4L);
- String server5 = getServerString(qv, cluster, 5L);
-
- //Remove Servers
- bytes = client.reconfig()
- .withMembers("server.2=" + server2,
- "server.3=" + server3,
- "server.4=" + server4,
- "server.5=" + server5)
- .storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble();
- qv = getQuorumVerifier(bytes);
- Assert.assertEquals(qv.getAllMembers().size(), 4);
- bytes = client.reconfig()
- .withMembers("server.3=" + server3,
- "server.4=" + server4,
- "server.5=" + server5)
- .storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble();
-
- qv = getQuorumVerifier(bytes);
- Assert.assertEquals(qv.getAllMembers().size(), 3);
-
- //Add Servers
- bytes = client.reconfig()
- .withMembers("server.1=" + server1,
- "server.3=" + server3,
- "server.4=" + server4,
- "server.5=" + server5)
- .storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble();
- qv = getQuorumVerifier(bytes);
- Assert.assertEquals(qv.getAllMembers().size(), 4);
- bytes = client.reconfig()
- .withMembers("server.1=" + server1,
- "server.2=" + server2,
- "server.3=" + server3,
- "server.4=" + server4,
- "server.5=" + server5)
- .storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble();
- qv = getQuorumVerifier(bytes);
- Assert.assertEquals(qv.getAllMembers().size(), 5);
- } finally {
- client.close();
- }
+ Stat stat = new Stat();
+ byte[] bytes = client.getConfig().storingStatIn(stat).forEnsemble();
+ Assert.assertNotNull(bytes);
+ QuorumVerifier qv = getQuorumVerifier(bytes);
+ Assert.assertEquals(qv.getAllMembers().size(), 5);
+ String server1 = getServerString(qv, cluster, 1L);
+ String server2 = getServerString(qv, cluster, 2L);
+ String server3 = getServerString(qv, cluster, 3L);
+ String server4 = getServerString(qv, cluster, 4L);
+ String server5 = getServerString(qv, cluster, 5L);
+
+ //Remove Servers
+ bytes = client.reconfig()
+ .withMembers("server.2=" + server2,
+ "server.3=" + server3,
+ "server.4=" + server4,
+ "server.5=" + server5)
+ .storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble();
+ qv = getQuorumVerifier(bytes);
+ Assert.assertEquals(qv.getAllMembers().size(), 4);
+
+ waitOnDelegateListener.waitForEvent();
+ Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5);
+
+ bytes = client.reconfig()
+ .withMembers("server.3=" + server3,
+ "server.4=" + server4,
+ "server.5=" + server5)
+ .storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble();
+
+ qv = getQuorumVerifier(bytes);
+ Assert.assertEquals(qv.getAllMembers().size(), 3);
+
+ waitOnDelegateListener.waitForEvent();
+ Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString3to5);
+
+ //Add Servers
+ bytes = client.reconfig()
+ .withMembers("server.2=" + server2,
+ "server.3=" + server3,
+ "server.4=" + server4,
+ "server.5=" + server5)
+ .storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble();
+ qv = getQuorumVerifier(bytes);
+ Assert.assertEquals(qv.getAllMembers().size(), 4);
+
+ waitOnDelegateListener.waitForEvent();
+ Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5);
+
+ bytes = client.reconfig()
+ .withMembers("server.1=" + server1,
+ "server.2=" + server2,
+ "server.3=" + server3,
+ "server.4=" + server4,
+ "server.5=" + server5)
+ .storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble();
+ qv = getQuorumVerifier(bytes);
+ Assert.assertEquals(qv.getAllMembers().size(), 5);
+
+ waitOnDelegateListener.waitForEvent();
+ Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString1to5);
}
@Test
public void testAsyncNonIncremental() throws Exception {
- CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), new RetryOneTime(1));
- client.start();
- client.blockUntilConnected();
- try {
- final AtomicReference<byte[]> bytes = new AtomicReference<byte[]>();
- final BackgroundCallback callback = new BackgroundCallback() {
- @Override
- public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
- bytes.set(event.getData());
- ((CountDownLatch)event.getContext()).countDown();
- }
+ final AtomicReference<byte[]> bytes = new AtomicReference<byte[]>();
+ final BackgroundCallback callback = new BackgroundCallback() {
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
+ bytes.set(event.getData());
+ ((CountDownLatch) event.getContext()).countDown();
+ }
- };
-
- CountDownLatch latch = new CountDownLatch(1);
- client.getConfig().inBackground(callback, latch).forEnsemble();
- latch.await(5, TimeUnit.SECONDS);
- Assert.assertNotNull(bytes.get());
- QuorumVerifier qv = getQuorumVerifier(bytes.get());
- Assert.assertEquals(qv.getAllMembers().size(), 5);
- String server1 = getServerString(qv, cluster, 1L);
- String server2 = getServerString(qv, cluster, 2L);
- String server3 = getServerString(qv, cluster, 3L);
- String server4 = getServerString(qv, cluster, 4L);
- String server5 = getServerString(qv, cluster, 5L);
-
- //Remove Servers
- latch = new CountDownLatch(1);
- client.reconfig()
- .withMembers("server.2=" + server2,
- "server.3=" + server3,
- "server.4=" + server4,
- "server.5=" + server5)
- .inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble();
- latch.await(5, TimeUnit.SECONDS);
- qv = getQuorumVerifier(bytes.get());
- Assert.assertEquals(qv.getAllMembers().size(), 4);
- latch = new CountDownLatch(1);
- client.reconfig()
- .withMembers("server.3=" + server3,
- "server.4=" + server4,
- "server.5=" + server5)
- .inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble();
- latch.await(5, TimeUnit.SECONDS);
- qv = getQuorumVerifier(bytes.get());
- Assert.assertEquals(qv.getAllMembers().size(), 3);
-
- //Add Servers
- latch = new CountDownLatch(1);
- client.reconfig()
- .withMembers("server.1=" + server1,
- "server.3=" + server3,
- "server.4=" + server4,
- "server.5=" + server5)
- .inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble();
- latch.await(5, TimeUnit.SECONDS);
- qv = getQuorumVerifier(bytes.get());
- Assert.assertEquals(qv.getAllMembers().size(), 4);
- latch = new CountDownLatch(1);
- client.reconfig()
- .withMembers("server.1=" + server1,
- "server.2=" + server2,
- "server.3=" + server3,
- "server.4=" + server4,
- "server.5=" + server5)
- .inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble();
- latch.await(5, TimeUnit.SECONDS);
- qv = getQuorumVerifier(bytes.get());
- Assert.assertEquals(qv.getAllMembers().size(), 5);
- } finally {
- client.close();
- }
+ };
+
+ CountDownLatch latch = new CountDownLatch(1);
+ client.getConfig().inBackground(callback, latch).forEnsemble();
+ latch.await(5, TimeUnit.SECONDS);
+ Assert.assertNotNull(bytes.get());
+ QuorumVerifier qv = getQuorumVerifier(bytes.get());
+ Assert.assertEquals(qv.getAllMembers().size(), 5);
+ String server1 = getServerString(qv, cluster, 1L);
+ String server2 = getServerString(qv, cluster, 2L);
+ String server3 = getServerString(qv, cluster, 3L);
+ String server4 = getServerString(qv, cluster, 4L);
+ String server5 = getServerString(qv, cluster, 5L);
+
+ //Remove Servers
+ client.reconfig()
+ .withMembers("server.2=" + server2,
+ "server.3=" + server3,
+ "server.4=" + server4,
+ "server.5=" + server5)
+ .inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble();
+ waitOnDelegateListener.waitForEvent();
+ Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5);
+ qv = getQuorumVerifier(bytes.get());
+ Assert.assertEquals(qv.getAllMembers().size(), 4);
+
+ client.reconfig()
+ .withMembers("server.3=" + server3,
+ "server.4=" + server4,
+ "server.5=" + server5)
+ .inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble();
+ waitOnDelegateListener.waitForEvent();
+ Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString3to5);
+ qv = getQuorumVerifier(bytes.get());
+ Assert.assertEquals(qv.getAllMembers().size(), 3);
+
+ //Add Servers
+ client.reconfig()
+ .withMembers("server.2=" + server2,
+ "server.3=" + server3,
+ "server.4=" + server4,
+ "server.5=" + server5)
+ .inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble();
+ waitOnDelegateListener.waitForEvent();
+ Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5);
+ qv = getQuorumVerifier(bytes.get());
+ Assert.assertEquals(qv.getAllMembers().size(), 4);
+
+ client.reconfig()
+ .withMembers("server.1=" + server1,
+ "server.2=" + server2,
+ "server.3=" + server3,
+ "server.4=" + server4,
+ "server.5=" + server5)
+ .inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble();
+ waitOnDelegateListener.waitForEvent();
+ Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString1to5);
+ qv = getQuorumVerifier(bytes.get());
+ Assert.assertEquals(qv.getAllMembers().size(), 5);
}
@@ -302,4 +345,44 @@ public class TestReconfiguration {
return str + ";" + getInstance(cluster, (int) id).getConnectString();
}
}
+
+ static String getConnectionString(TestingCluster cluster, long... ids) throws Exception {
+ StringBuilder sb = new StringBuilder();
+ Map<Long, InstanceSpec> specs = new HashMap<Long, InstanceSpec>();
+ for (InstanceSpec spec : cluster.getInstances()) {
+ specs.put(new Long(spec.getServerId()), spec);
+ }
+ for (long id : ids) {
+ if (sb.length() != 0) {
+ sb.append(",");
+ }
+ sb.append(specs.get(id).getConnectString());
+ }
+ return sb.toString();
+ }
+
+ //Simple EnsembleListener that can wait until the delegate handles the event.
+ private static class WaitOnDelegateListener implements EnsembleListener {
+ private CountDownLatch latch = new CountDownLatch(1);
+
+ private final EnsembleListener delegate;
+
+ private WaitOnDelegateListener(EnsembleListener delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void connectionStringUpdated(String connectionString) {
+ delegate.connectionStringUpdated(connectionString);
+ latch.countDown();
+ }
+
+ public void waitForEvent() throws InterruptedException, TimeoutException {
+ if (latch.await(5, TimeUnit.SECONDS)) {
+ latch = new CountDownLatch(1);
+ } else {
+ throw new TimeoutException("Failed to receive event in time.");
+ }
+ }
+ };
}
\ No newline at end of file