You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2015/10/09 02:09:57 UTC
[03/10] curator git commit: wip - adding() API was misnamed. Also,
it's mutually exclusive with join/leave
wip - adding() API was misnamed. Also, it's mutually exclusive with join/leave
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/d42ef172
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/d42ef172
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/d42ef172
Branch: refs/heads/CURATOR-3.0
Commit: d42ef172e57af17ed42d7c2c4e2d9a7a0c520f3c
Parents: 4c3c837
Author: randgalt <ra...@apache.org>
Authored: Fri Sep 25 21:07:44 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Sep 25 21:07:44 2015 -0500
----------------------------------------------------------------------
.../api/JoinStatConfigEnsembleable.java | 2 +-
.../api/LeaveStatConfigEnsembleable.java | 2 +-
.../curator/framework/api/Membersable.java | 43 +++
.../framework/api/ReconfigBuilderMain.java | 6 +-
.../api/StatConfigureEnsembleable.java | 26 ++
.../framework/imps/ReconfigBuilderImpl.java | 370 +++----------------
.../framework/imps/TestReconfiguration.java | 205 +++++++---
.../framework/imps/TestReconfigurationX.java | 33 +-
.../org/apache/curator/test/TestingCluster.java | 40 +-
9 files changed, 337 insertions(+), 390 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/d42ef172/curator-framework/src/main/java/org/apache/curator/framework/api/JoinStatConfigEnsembleable.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/JoinStatConfigEnsembleable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/JoinStatConfigEnsembleable.java
index 7ab51e2..c20387c 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/api/JoinStatConfigEnsembleable.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/JoinStatConfigEnsembleable.java
@@ -25,7 +25,7 @@ package org.apache.curator.framework.api;
* mixing concepts that can't be used together.
*/
public interface JoinStatConfigEnsembleable extends
- Joinable<ConfigureEnsembleable>,
+ Joinable<LeaveStatConfigEnsembleable>,
ConfigureEnsembleable,
Statable<ConfigureEnsembleable>
{
http://git-wip-us.apache.org/repos/asf/curator/blob/d42ef172/curator-framework/src/main/java/org/apache/curator/framework/api/LeaveStatConfigEnsembleable.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/LeaveStatConfigEnsembleable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/LeaveStatConfigEnsembleable.java
index 1464d26..b80bd00 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/api/LeaveStatConfigEnsembleable.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/LeaveStatConfigEnsembleable.java
@@ -25,7 +25,7 @@ package org.apache.curator.framework.api;
* mixing concepts that can't be used together.
*/
public interface LeaveStatConfigEnsembleable extends
- Leaveable<Statable<ConfigureEnsembleable>>,
+ Leaveable<JoinStatConfigEnsembleable>,
ConfigureEnsembleable,
Statable<ConfigureEnsembleable>
{
http://git-wip-us.apache.org/repos/asf/curator/blob/d42ef172/curator-framework/src/main/java/org/apache/curator/framework/api/Membersable.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/Membersable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/Membersable.java
new file mode 100644
index 0000000..e1f8d9e
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/Membersable.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.api;
+
+import java.util.List;
+
+public interface Membersable<T>
+{
+ /**
+ * Sets one or more members that are meant to be the ensemble.
+ * The expected format is server.[id]=[hostname]:[peer port]:[election port]:[type];[client port]
+ *
+ * @param server The server joining.
+ * @return this
+ */
+ T withNewMembers(String... server);
+
+ /**
+ * Sets one or more members that are meant to be the ensemble.
+ * The expected format is server.[id]=[hostname]:[peer port]:[election port]:[type];[client port]
+ *
+ * @param servers The servers joining.
+ * @return this
+ */
+ T withNewMembers(List<String> servers);
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/d42ef172/curator-framework/src/main/java/org/apache/curator/framework/api/ReconfigBuilderMain.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/ReconfigBuilderMain.java b/curator-framework/src/main/java/org/apache/curator/framework/api/ReconfigBuilderMain.java
index b86af2d..b9d1be3 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/api/ReconfigBuilderMain.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/ReconfigBuilderMain.java
@@ -20,8 +20,8 @@
package org.apache.curator.framework.api;
public interface ReconfigBuilderMain extends
- Joinable<LeaveAddStatConfigEnsembleable>,
- Leaveable<JoinAddStatConfigEnsembleable>,
- Addable<JoinLeaveStatConfigEnsembleable>
+ Joinable<LeaveStatConfigEnsembleable>,
+ Leaveable<JoinStatConfigEnsembleable>,
+ Membersable<StatConfigureEnsembleable>
{
}
http://git-wip-us.apache.org/repos/asf/curator/blob/d42ef172/curator-framework/src/main/java/org/apache/curator/framework/api/StatConfigureEnsembleable.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/StatConfigureEnsembleable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/StatConfigureEnsembleable.java
new file mode 100644
index 0000000..8b61ab9
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/StatConfigureEnsembleable.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.api;
+
+public interface StatConfigureEnsembleable extends
+ Statable<ConfigureEnsembleable>,
+ ConfigureEnsembleable
+{
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/curator/blob/d42ef172/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java
index 832272b..e786883 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java
@@ -31,18 +31,14 @@ import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
-public class ReconfigBuilderImpl implements
- ReconfigBuilder,
- ReconfigBuilderMain,
- ConfigureEnsembleable,
- BackgroundOperation<Void>,Statable<Ensembleable<byte[]>>,Ensembleable<byte[]>
+public class ReconfigBuilderImpl implements ReconfigBuilder, BackgroundOperation<Void>
{
private final CuratorFrameworkImpl client;
private Backgrounding backgrounding = new Backgrounding();
private Stat responseStat;
private long fromConfig = -1;
- private List<String> adding;
+ private List<String> newMembers;
private List<String> joining;
private List<String> leaving;
@@ -51,8 +47,7 @@ public class ReconfigBuilderImpl implements
this.client = client;
}
- @Override
- public byte[] forEnsemble() throws Exception
+ private byte[] forEnsemble() throws Exception
{
if ( backgrounding.inBackground() )
{
@@ -66,152 +61,6 @@ public class ReconfigBuilderImpl implements
}
@Override
- public Ensembleable<byte[]> storingStatIn(Stat stat)
- {
- responseStat = stat;
- return this;
- }
-
- @Override
- public Ensembleable<byte[]> fromConfig(long config) throws Exception
- {
- fromConfig = config;
- return this;
- }
-
- @Override
- public JoinLeaveStatConfigEnsembleable adding(String... server)
- {
- return adding((server != null) ? Arrays.asList(server) : null);
- }
-
- @Override
- public JoinLeaveStatConfigEnsembleable adding(List<String> servers)
- {
- this.adding = (servers != null) ? ImmutableList.copyOf(servers) : ImmutableList.<String>of();
-
- return new JoinLeaveStatConfigEnsembleable()
- {
- @Override
- public byte[] forEnsemble() throws Exception
- {
- return ReconfigBuilderImpl.this.forEnsemble();
- }
-
- @Override
- public ConfigureEnsembleable storingStatIn(Stat stat)
- {
- return new ConfigureEnsembleable()
- {
- @Override
- public Ensembleable<byte[]> fromConfig(long config) throws Exception
- {
- return ReconfigBuilderImpl.this.fromConfig(config);
- }
-
- @Override
- public byte[] forEnsemble() throws Exception
- {
- return ReconfigBuilderImpl.this.forEnsemble();
- }
- };
- }
-
- @Override
- public Ensembleable<byte[]> fromConfig(long config) throws Exception
- {
- return ReconfigBuilderImpl.this.fromConfig(config);
- }
-
- @Override
- public LeaveStatConfigEnsembleable joining(String... server)
- {
- return joining((server != null) ? Arrays.asList(server) : null);
- }
-
- @Override
- public LeaveStatConfigEnsembleable joining(List<String> servers)
- {
- return new LeaveStatConfigEnsembleable()
- {
- @Override
- public byte[] forEnsemble() throws Exception
- {
- return ReconfigBuilderImpl.this.forEnsemble();
- }
-
- @Override
- public ConfigureEnsembleable storingStatIn(Stat stat)
- {
- return new InternalConfigureEnsembleable();
- }
-
- @Override
- public Ensembleable<byte[]> fromConfig(long config) throws Exception
- {
- return ReconfigBuilderImpl.this.fromConfig(config);
- }
-
- @Override
- public Statable<ConfigureEnsembleable> leaving(List<String> servers)
- {
- return ReconfigBuilderImpl.this.leaving(servers);
- }
-
- @Override
- public Statable<ConfigureEnsembleable> leaving(String... server)
- {
- return ReconfigBuilderImpl.this.leaving(server);
- }
- };
- }
-
- @Override
- public JoinStatConfigEnsembleable leaving(String... server)
- {
- return leaving((server != null) ? Arrays.asList(server) : null);
- }
-
- @Override
- public JoinStatConfigEnsembleable leaving(List<String> servers)
- {
- return new JoinStatConfigEnsembleable()
- {
- @Override
- public byte[] forEnsemble() throws Exception
- {
- return ReconfigBuilderImpl.this.forEnsemble();
- }
-
- @Override
- public ConfigureEnsembleable storingStatIn(Stat stat)
- {
- return new InternalConfigureEnsembleable();
- }
-
- @Override
- public Ensembleable<byte[]> fromConfig(long config) throws Exception
- {
- return ReconfigBuilderImpl.this.fromConfig(config);
- }
-
- @Override
- public ConfigureEnsembleable joining(List<String> servers)
- {
- return ReconfigBuilderImpl.this.joining(servers);
- }
-
- @Override
- public ConfigureEnsembleable joining(String... server)
- {
- return ReconfigBuilderImpl.this.joining(server);
- }
- };
- }
- };
- }
-
- @Override
public ReconfigBuilderMain inBackground()
{
backgrounding = new Backgrounding(true);
@@ -254,19 +103,25 @@ public class ReconfigBuilderImpl implements
}
@Override
- public LeaveAddStatConfigEnsembleable joining(String... server)
+ public StatConfigureEnsembleable withNewMembers(String... server)
{
- return joining((server != null) ? Arrays.asList(server) : null);
+ return withNewMembers((server != null) ? Arrays.asList(server) : null);
}
@Override
- public LeaveAddStatConfigEnsembleable joining(List<String> servers)
+ public StatConfigureEnsembleable withNewMembers(List<String> servers)
{
- joining = (servers != null) ? ImmutableList.copyOf(servers) : ImmutableList.<String>of();
-
- return new LeaveAddStatConfigEnsembleable()
+ newMembers = (servers != null) ? ImmutableList.copyOf(servers) : ImmutableList.<String>of();
+ return new StatConfigureEnsembleable()
{
@Override
+ public Ensembleable<byte[]> fromConfig(long config) throws Exception
+ {
+ fromConfig = config;
+ return this;
+ }
+
+ @Override
public byte[] forEnsemble() throws Exception
{
return ReconfigBuilderImpl.this.forEnsemble();
@@ -275,115 +130,71 @@ public class ReconfigBuilderImpl implements
@Override
public ConfigureEnsembleable storingStatIn(Stat stat)
{
- return new InternalConfigureEnsembleable();
+ responseStat = stat;
+ return this;
}
+ };
+ }
+
+ @Override
+ public LeaveStatConfigEnsembleable joining(String... server)
+ {
+ return joining((server != null) ? Arrays.asList(server) : null);
+ }
+
+ @Override
+ public LeaveStatConfigEnsembleable joining(List<String> servers)
+ {
+ joining = (servers != null) ? ImmutableList.copyOf(servers) : ImmutableList.<String>of();
+ return new LeaveStatConfigEnsembleable()
+ {
@Override
- public Ensembleable<byte[]> fromConfig(long config) throws Exception
+ public byte[] forEnsemble() throws Exception
{
- return ReconfigBuilderImpl.this.fromConfig(config);
+ return ReconfigBuilderImpl.this.forEnsemble();
}
@Override
- public LeaveStatConfigEnsembleable adding(String... server)
+ public ConfigureEnsembleable storingStatIn(Stat stat)
{
- return adding((server != null) ? Arrays.asList(server) : null);
+ responseStat = stat;
+ return this;
}
@Override
- public LeaveStatConfigEnsembleable adding(List<String> servers)
+ public Ensembleable<byte[]> fromConfig(long config) throws Exception
{
- return new LeaveStatConfigEnsembleable()
- {
- @Override
- public byte[] forEnsemble() throws Exception
- {
- return ReconfigBuilderImpl.this.forEnsemble();
- }
-
- @Override
- public ConfigureEnsembleable storingStatIn(Stat stat)
- {
- return new InternalConfigureEnsembleable();
- }
-
- @Override
- public Ensembleable<byte[]> fromConfig(long config) throws Exception
- {
- return ReconfigBuilderImpl.this.fromConfig(config);
- }
-
- @Override
- public Statable<ConfigureEnsembleable> leaving(List<String> servers)
- {
- return ReconfigBuilderImpl.this.leaving(servers);
- }
-
- @Override
- public Statable<ConfigureEnsembleable> leaving(String... server)
- {
- return ReconfigBuilderImpl.this.leaving(server);
- }
- };
+ fromConfig = config;
+ return this;
}
@Override
- public AddStatConfigEnsembleable leaving(String... server)
+ public JoinStatConfigEnsembleable leaving(String... server)
{
- return leaving((server != null) ? Arrays.asList(server) : null);
+ return ReconfigBuilderImpl.this.leaving(server);
}
@Override
- public AddStatConfigEnsembleable leaving(List<String> servers)
+ public JoinStatConfigEnsembleable leaving(List<String> servers)
{
- return new AddStatConfigEnsembleable()
- {
- @Override
- public byte[] forEnsemble() throws Exception
- {
- return ReconfigBuilderImpl.this.forEnsemble();
- }
-
- @Override
- public ConfigureEnsembleable storingStatIn(Stat stat)
- {
- return new InternalConfigureEnsembleable();
- }
-
- @Override
- public ConfigureEnsembleable fromConfig(long config) throws Exception
- {
- return new InternalConfigureEnsembleable();
- }
-
- @Override
- public Statable<ConfigureEnsembleable> adding(List<String> servers)
- {
- return ReconfigBuilderImpl.this.adding(servers);
- }
-
- @Override
- public Statable<ConfigureEnsembleable> adding(String... server)
- {
- return ReconfigBuilderImpl.this.adding(server);
- }
- };
+ return ReconfigBuilderImpl.this.leaving(servers);
}
};
}
@Override
- public JoinAddStatConfigEnsembleable leaving(String... server)
+ public JoinStatConfigEnsembleable leaving(String... server)
{
return leaving((server != null) ? Arrays.asList(server) : null);
}
@Override
- public JoinAddStatConfigEnsembleable leaving(List<String> servers)
+ public JoinStatConfigEnsembleable leaving(List<String> servers)
{
leaving = (servers != null) ? ImmutableList.copyOf(servers) : ImmutableList.<String>of();
- return new JoinAddStatConfigEnsembleable()
+ return new JoinStatConfigEnsembleable()
{
@Override
public byte[] forEnsemble() throws Exception
@@ -394,81 +205,27 @@ public class ReconfigBuilderImpl implements
@Override
public ConfigureEnsembleable storingStatIn(Stat stat)
{
- return new InternalConfigureEnsembleable();
+ responseStat = stat;
+ return this;
}
@Override
public Ensembleable<byte[]> fromConfig(long config) throws Exception
{
- return ReconfigBuilderImpl.this.fromConfig(config);
+ fromConfig = config;
+ return this;
}
@Override
- public JoinStatConfigurable adding(String... server)
- {
- return adding((server != null) ? Arrays.asList(server) : null);
- }
-
- @Override
- public JoinStatConfigurable adding(List<String> servers)
- {
- return new JoinStatConfigurable()
- {
- @Override
- public ConfigureEnsembleable joining(List<String> servers)
- {
- return ReconfigBuilderImpl.this.joining(servers);
- }
-
- @Override
- public ConfigureEnsembleable joining(String... server)
- {
- return ReconfigBuilderImpl.this.joining(server);
- }
- };
- }
-
- @Override
- public AddStatConfigEnsembleable joining(String... server)
+ public LeaveStatConfigEnsembleable joining(String... server)
{
return joining((server != null) ? Arrays.asList(server) : null);
}
@Override
- public AddStatConfigEnsembleable joining(List<String> servers)
+ public LeaveStatConfigEnsembleable joining(List<String> servers)
{
- return new AddStatConfigEnsembleable()
- {
- @Override
- public byte[] forEnsemble() throws Exception
- {
- return ReconfigBuilderImpl.this.forEnsemble();
- }
-
- @Override
- public ConfigureEnsembleable storingStatIn(Stat stat)
- {
- return new InternalConfigureEnsembleable();
- }
-
- @Override
- public Ensembleable<byte[]> fromConfig(long config) throws Exception
- {
- return ReconfigBuilderImpl.this.fromConfig(config);
- }
-
- @Override
- public Statable<ConfigureEnsembleable> adding(List<String> servers)
- {
- return ReconfigBuilderImpl.this.adding(servers);
- }
-
- @Override
- public Statable<ConfigureEnsembleable> adding(String... server)
- {
- return ReconfigBuilderImpl.this.adding(server);
- }
- };
+ return ReconfigBuilderImpl.this.joining(servers);
}
};
}
@@ -491,7 +248,7 @@ public class ReconfigBuilderImpl implements
client.processBackgroundOperation(data, event);
}
};
- client.getZooKeeper().reconfig(joining, leaving, adding, fromConfig, callback, backgrounding.getContext());
+ client.getZooKeeper().reconfig(joining, leaving, newMembers, fromConfig, callback, backgrounding.getContext());
}
private byte[] ensembleInForeground() throws Exception
@@ -505,26 +262,11 @@ public class ReconfigBuilderImpl implements
@Override
public byte[] call() throws Exception
{
- return client.getZooKeeper().reconfig(joining, leaving, adding, fromConfig, responseStat);
+ return client.getZooKeeper().reconfig(joining, leaving, newMembers, fromConfig, responseStat);
}
}
);
trace.commit();
return responseData;
}
-
- private class InternalConfigureEnsembleable implements ConfigureEnsembleable
- {
- @Override
- public Ensembleable<byte[]> fromConfig(long config) throws Exception
- {
- return ReconfigBuilderImpl.this.fromConfig(config);
- }
-
- @Override
- public byte[] forEnsemble() throws Exception
- {
- return ReconfigBuilderImpl.this.forEnsemble();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/curator/blob/d42ef172/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
index d4c89be..99e5a2e 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
@@ -19,12 +19,17 @@
package org.apache.curator.framework.imps;
+import com.google.common.collect.Lists;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.InstanceSpec;
import org.apache.curator.test.TestingCluster;
+import org.apache.curator.test.Timing;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
@@ -37,7 +42,12 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.io.ByteArrayInputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
public class TestReconfiguration extends BaseClassForTests
{
@@ -84,75 +94,53 @@ public class TestReconfiguration extends BaseClassForTests
// ---------
- client.reconfig().adding().forEnsemble();
client.reconfig().leaving().forEnsemble();
client.reconfig().joining().forEnsemble();
- client.reconfig().adding().leaving().forEnsemble();
- client.reconfig().adding().joining().forEnsemble();
client.reconfig().leaving().joining().forEnsemble();
+ client.reconfig().joining().leaving().forEnsemble();
+ client.reconfig().withNewMembers().forEnsemble();
- client.reconfig().adding().fromConfig(0).forEnsemble();
client.reconfig().leaving().fromConfig(0).forEnsemble();
client.reconfig().joining().fromConfig(0).forEnsemble();
- client.reconfig().adding().leaving().fromConfig(0).forEnsemble();
- client.reconfig().adding().joining().fromConfig(0).forEnsemble();
client.reconfig().leaving().joining().fromConfig(0).forEnsemble();
+ client.reconfig().joining().leaving().fromConfig(0).forEnsemble();
+ client.reconfig().withNewMembers().fromConfig(0).forEnsemble();
- client.reconfig().adding().fromConfig(0).forEnsemble();
- client.reconfig().leaving().fromConfig(0).forEnsemble();
- client.reconfig().joining().fromConfig(0).forEnsemble();
- client.reconfig().adding().leaving().fromConfig(0).forEnsemble();
- client.reconfig().adding().joining().fromConfig(0).forEnsemble();
- client.reconfig().leaving().joining().fromConfig(0).forEnsemble();
-
- client.reconfig().adding().storingStatIn(stat).forEnsemble();
client.reconfig().leaving().storingStatIn(stat).forEnsemble();
client.reconfig().joining().storingStatIn(stat).forEnsemble();
- client.reconfig().adding().leaving().storingStatIn(stat).forEnsemble();
- client.reconfig().adding().joining().storingStatIn(stat).forEnsemble();
client.reconfig().leaving().joining().storingStatIn(stat).forEnsemble();
+ client.reconfig().joining().leaving().storingStatIn(stat).forEnsemble();
+ client.reconfig().withNewMembers().storingStatIn(stat).forEnsemble();
- client.reconfig().adding().storingStatIn(stat).fromConfig(0).forEnsemble();
client.reconfig().leaving().storingStatIn(stat).fromConfig(0).forEnsemble();
client.reconfig().joining().storingStatIn(stat).fromConfig(0).forEnsemble();
- client.reconfig().adding().leaving().storingStatIn(stat).fromConfig(0).forEnsemble();
- client.reconfig().adding().joining().storingStatIn(stat).fromConfig(0).forEnsemble();
client.reconfig().leaving().joining().storingStatIn(stat).fromConfig(0).forEnsemble();
+ client.reconfig().joining().leaving().storingStatIn(stat).fromConfig(0).forEnsemble();
+ client.reconfig().withNewMembers().storingStatIn(stat).forEnsemble();
- client.reconfig().inBackground().adding().forEnsemble();
client.reconfig().inBackground().leaving().forEnsemble();
client.reconfig().inBackground().joining().forEnsemble();
- client.reconfig().inBackground().adding().leaving().forEnsemble();
- client.reconfig().inBackground().adding().joining().forEnsemble();
client.reconfig().inBackground().leaving().joining().forEnsemble();
+ client.reconfig().inBackground().joining().leaving().forEnsemble();
+ client.reconfig().inBackground().withNewMembers().forEnsemble();
- client.reconfig().inBackground().adding().fromConfig(0).forEnsemble();
- client.reconfig().inBackground().leaving().fromConfig(0).forEnsemble();
- client.reconfig().inBackground().joining().fromConfig(0).forEnsemble();
- client.reconfig().inBackground().adding().leaving().fromConfig(0).forEnsemble();
- client.reconfig().inBackground().adding().joining().fromConfig(0).forEnsemble();
- client.reconfig().inBackground().leaving().joining().fromConfig(0).forEnsemble();
-
- client.reconfig().inBackground().adding().fromConfig(0).forEnsemble();
client.reconfig().inBackground().leaving().fromConfig(0).forEnsemble();
client.reconfig().inBackground().joining().fromConfig(0).forEnsemble();
- client.reconfig().inBackground().adding().leaving().fromConfig(0).forEnsemble();
- client.reconfig().inBackground().adding().joining().fromConfig(0).forEnsemble();
client.reconfig().inBackground().leaving().joining().fromConfig(0).forEnsemble();
+ client.reconfig().inBackground().joining().leaving().fromConfig(0).forEnsemble();
+ client.reconfig().inBackground().withNewMembers().fromConfig(0).forEnsemble();
- client.reconfig().inBackground().adding().storingStatIn(stat).forEnsemble();
client.reconfig().inBackground().leaving().storingStatIn(stat).forEnsemble();
client.reconfig().inBackground().joining().storingStatIn(stat).forEnsemble();
- client.reconfig().inBackground().adding().leaving().storingStatIn(stat).forEnsemble();
- client.reconfig().inBackground().adding().joining().storingStatIn(stat).forEnsemble();
client.reconfig().inBackground().leaving().joining().storingStatIn(stat).forEnsemble();
+ client.reconfig().inBackground().joining().leaving().storingStatIn(stat).forEnsemble();
+ client.reconfig().inBackground().withNewMembers().storingStatIn(stat).forEnsemble();
- client.reconfig().inBackground().adding().storingStatIn(stat).fromConfig(0).forEnsemble();
client.reconfig().inBackground().leaving().storingStatIn(stat).fromConfig(0).forEnsemble();
client.reconfig().inBackground().joining().storingStatIn(stat).fromConfig(0).forEnsemble();
- client.reconfig().inBackground().adding().leaving().storingStatIn(stat).fromConfig(0).forEnsemble();
- client.reconfig().inBackground().adding().joining().storingStatIn(stat).fromConfig(0).forEnsemble();
client.reconfig().inBackground().leaving().joining().storingStatIn(stat).fromConfig(0).forEnsemble();
+ client.reconfig().inBackground().joining().leaving().storingStatIn(stat).fromConfig(0).forEnsemble();
+ client.reconfig().inBackground().withNewMembers().storingStatIn(stat).forEnsemble();
}
@Test
@@ -163,33 +151,154 @@ public class TestReconfiguration extends BaseClassForTests
client.start();
QuorumVerifier quorumVerifier = toQuorumVerifier(client.getConfig().forEnsemble());
System.out.println(quorumVerifier);
+ assertConfig(quorumVerifier, cluster.getInstances());
+ }
+ }
+
+ @Test
+ public void testAdd() throws Exception
+ {
+ Timing timing = new Timing();
+ try ( CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), new RetryOneTime(1)) )
+ {
+ client.start();
+
+ QuorumVerifier oldConfig = toQuorumVerifier(client.getConfig().forEnsemble());
+ assertConfig(oldConfig, cluster.getInstances());
- for ( InstanceSpec instance : cluster.getInstances() )
+ CountDownLatch latch = setChangeWaiter(client);
+ try ( TestingCluster newCluster = new TestingCluster(1, false) )
{
- QuorumPeer.QuorumServer quorumServer = quorumVerifier.getAllMembers().get((long)instance.getServerId());
- Assert.assertNotNull(quorumServer);
- Assert.assertEquals(quorumServer.clientAddr.getPort(), instance.getPort());
+ newCluster.start();
+
+ client.reconfig().joining(toReconfigSpec(newCluster.getInstances())).fromConfig(oldConfig.getVersion()).forEnsemble();
+
+ Assert.assertTrue(timing.awaitLatch(latch));
+
+ QuorumVerifier newConfig = toQuorumVerifier(client.getConfig().forEnsemble());
+ List<InstanceSpec> newInstances = Lists.newArrayList(cluster.getInstances());
+ newInstances.addAll(newCluster.getInstances());
+ assertConfig(newConfig, newInstances);
}
}
}
@Test
- public void testAdd1Sync() throws Exception
+ public void testAddAsync() throws Exception
{
+ Timing timing = new Timing();
try ( CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), new RetryOneTime(1)) )
{
client.start();
- Watcher watcher = new Watcher()
+ QuorumVerifier oldConfig = toQuorumVerifier(client.getConfig().forEnsemble());
+ assertConfig(oldConfig, cluster.getInstances());
+
+ CountDownLatch latch = setChangeWaiter(client);
+ try ( TestingCluster newCluster = new TestingCluster(1, false) )
{
- @Override
- public void process(WatchedEvent event)
+ newCluster.start();
+
+ final CountDownLatch callbackLatch = new CountDownLatch(1);
+ BackgroundCallback callback = new BackgroundCallback()
{
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+ {
+ if ( event.getType() == CuratorEventType.RECONFIG )
+ {
+ callbackLatch.countDown();
+ }
+ }
+ };
+ client.reconfig().inBackground(callback).joining(toReconfigSpec(newCluster.getInstances())).fromConfig(oldConfig.getVersion()).forEnsemble();
+
+ Assert.assertTrue(timing.awaitLatch(callbackLatch));
+ Assert.assertTrue(timing.awaitLatch(latch));
+ QuorumVerifier newConfig = toQuorumVerifier(client.getConfig().forEnsemble());
+ List<InstanceSpec> newInstances = Lists.newArrayList(cluster.getInstances());
+ newInstances.addAll(newCluster.getInstances());
+ assertConfig(newConfig, newInstances);
+ }
+ }
+ }
+
+ @Test
+ public void testAddAndRemove() throws Exception
+ {
+ Timing timing = new Timing();
+ try ( CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), new RetryOneTime(1)) )
+ {
+ client.start();
+
+ QuorumVerifier oldConfig = toQuorumVerifier(client.getConfig().forEnsemble());
+ assertConfig(oldConfig, cluster.getInstances());
+
+ CountDownLatch latch = setChangeWaiter(client);
+
+ try ( TestingCluster newCluster = new TestingCluster(1, false) )
+ {
+ newCluster.start();
+
+ Collection<InstanceSpec> oldInstances = cluster.getInstances();
+ InstanceSpec us = cluster.findConnectionInstance(client.getZookeeperClient().getZooKeeper());
+ InstanceSpec removeSpec = oldInstances.iterator().next();
+ if ( us.equals(removeSpec) ) {
+ Iterator<InstanceSpec> iterator = oldInstances.iterator();
+ iterator.next();
+ removeSpec = iterator.next();
}
- };
- client.getConfig().usingWatcher(watcher).forEnsemble();
+
+ Collection<InstanceSpec> instances = newCluster.getInstances();
+ client.reconfig().leaving(Integer.toString(removeSpec.getServerId())).joining(toReconfigSpec(instances)).fromConfig(oldConfig.getVersion()).forEnsemble();
+
+ Assert.assertTrue(timing.awaitLatch(latch));
+
+ QuorumVerifier newConfig = toQuorumVerifier(client.getConfig().forEnsemble());
+ ArrayList<InstanceSpec> newInstances = Lists.newArrayList(oldInstances);
+ newInstances.addAll(instances);
+ newInstances.remove(removeSpec);
+ assertConfig(newConfig, newInstances);
+ }
+ }
+ }
+
+ private CountDownLatch setChangeWaiter(CuratorFramework client) throws Exception
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+ Watcher watcher = new Watcher()
+ {
+ @Override
+ public void process(WatchedEvent event)
+ {
+ if ( event.getType() == Event.EventType.NodeDataChanged )
+ {
+ latch.countDown();
+ }
+ }
+ };
+ client.getConfig().usingWatcher(watcher).forEnsemble();
+ return latch;
+ }
+
+ private void assertConfig(QuorumVerifier config, Collection<InstanceSpec> instances)
+ {
+ for ( InstanceSpec instance : instances )
+ {
+ QuorumPeer.QuorumServer quorumServer = config.getAllMembers().get((long)instance.getServerId());
+ Assert.assertNotNull(quorumServer, String.format("Looking for %s - found %s", instance.getServerId(), config.getAllMembers()));
+ Assert.assertEquals(quorumServer.clientAddr.getPort(), instance.getPort());
+ }
+ }
+
+ private List<String> toReconfigSpec(Collection<InstanceSpec> instances)
+ {
+ List<String> specs = Lists.newArrayList();
+ for ( InstanceSpec instance : instances ) {
+ specs.add("server." + instance.getServerId() + "=localhost:" + instance.getElectionPort() + ":" + instance.getQuorumPort() + ";" + instance.getPort());
}
+ return specs;
}
private static QuorumVerifier toQuorumVerifier(byte[] bytes) throws Exception
http://git-wip-us.apache.org/repos/asf/curator/blob/d42ef172/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfigurationX.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfigurationX.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfigurationX.java
index 2268055..7554ddd 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfigurationX.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfigurationX.java
@@ -19,6 +19,7 @@
package org.apache.curator.framework.imps;
+import com.google.common.collect.ImmutableList;
import org.apache.curator.ensemble.EnsembleListener;
import org.apache.curator.ensemble.dynamic.DynamicEnsembleProvider;
import org.apache.curator.framework.CuratorFramework;
@@ -63,7 +64,13 @@ public class TestReconfigurationX
@BeforeMethod
public void setup() throws Exception
{
- cluster = new TestingCluster(5);
+ ImmutableList.Builder<InstanceSpec> builder = ImmutableList.builder();
+ for ( int i = 1; i <= 5; ++i )
+ {
+ builder.add(new InstanceSpec(null, -1, -1, -1, true, i, -1, -1));
+ }
+
+ cluster = new TestingCluster(builder.build());
cluster.start();
connectionString1to5 = cluster.getConnectString();
@@ -208,7 +215,7 @@ public class TestReconfigurationX
//Remove Servers
bytes = client.reconfig()
- .adding("server.2=" + server2,
+ .withNewMembers("server.2=" + server2,
"server.3=" + server3,
"server.4=" + server4,
"server.5=" + server5)
@@ -220,7 +227,7 @@ public class TestReconfigurationX
Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5);
bytes = client.reconfig()
- .adding("server.3=" + server3,
+ .withNewMembers("server.3=" + server3,
"server.4=" + server4,
"server.5=" + server5)
.storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble();
@@ -233,7 +240,7 @@ public class TestReconfigurationX
//Add Servers
bytes = client.reconfig()
- .adding("server.2=" + server2,
+ .withNewMembers("server.2=" + server2,
"server.3=" + server3,
"server.4=" + server4,
"server.5=" + server5)
@@ -245,7 +252,7 @@ public class TestReconfigurationX
Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5);
bytes = client.reconfig()
- .adding("server.1=" + server1,
+ .withNewMembers("server.1=" + server1,
"server.2=" + server2,
"server.3=" + server3,
"server.4=" + server4,
@@ -287,7 +294,7 @@ public class TestReconfigurationX
//Remove Servers
client.reconfig().inBackground(callback, latch)
- .adding("server.2=" + server2,
+ .withNewMembers("server.2=" + server2,
"server.3=" + server3,
"server.4=" + server4,
"server.5=" + server5)
@@ -298,7 +305,7 @@ public class TestReconfigurationX
Assert.assertEquals(qv.getAllMembers().size(), 4);
client.reconfig().inBackground(callback, latch)
- .adding("server.3=" + server3,
+ .withNewMembers("server.3=" + server3,
"server.4=" + server4,
"server.5=" + server5)
.fromConfig(qv.getVersion()).forEnsemble();
@@ -309,7 +316,7 @@ public class TestReconfigurationX
//Add Servers
client.reconfig().inBackground(callback, latch)
- .adding("server.2=" + server2,
+ .withNewMembers("server.2=" + server2,
"server.3=" + server3,
"server.4=" + server4,
"server.5=" + server5)
@@ -320,7 +327,7 @@ public class TestReconfigurationX
Assert.assertEquals(qv.getAllMembers().size(), 4);
client.reconfig().inBackground(callback, latch)
- .adding("server.1=" + server1,
+ .withNewMembers("server.1=" + server1,
"server.2=" + server2,
"server.3=" + server3,
"server.4=" + server4,
@@ -332,14 +339,14 @@ public class TestReconfigurationX
Assert.assertEquals(qv.getAllMembers().size(), 5);
}
- static QuorumVerifier getQuorumVerifier(byte[] bytes) throws Exception
+ private static QuorumVerifier getQuorumVerifier(byte[] bytes) throws Exception
{
Properties properties = new Properties();
properties.load(new StringReader(new String(bytes)));
return new QuorumMaj(properties);
}
- static InstanceSpec getInstance(TestingCluster cluster, int id)
+ private static InstanceSpec getInstance(TestingCluster cluster, int id)
{
for ( InstanceSpec spec : cluster.getInstances() )
{
@@ -351,7 +358,7 @@ public class TestReconfigurationX
throw new IllegalStateException("InstanceSpec with id:" + id + " not found");
}
- static String getServerString(QuorumVerifier qv, TestingCluster cluster, long id) throws Exception
+ private static String getServerString(QuorumVerifier qv, TestingCluster cluster, long id) throws Exception
{
String str = qv.getAllMembers().get(id).toString();
//check if connection string is already there.
@@ -365,7 +372,7 @@ public class TestReconfigurationX
}
}
- static String getConnectionString(TestingCluster cluster, long... ids) throws Exception
+ private static String getConnectionString(TestingCluster cluster, long... ids) throws Exception
{
StringBuilder sb = new StringBuilder();
Map<Long, InstanceSpec> specs = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/curator/blob/d42ef172/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java
----------------------------------------------------------------------
diff --git a/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java b/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java
index e2a1ae8..b8dada8 100644
--- a/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java
+++ b/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java
@@ -52,6 +52,18 @@ public class TestingCluster implements Closeable
}
/**
+ * Creates an ensemble comprised of <code>n</code> servers. Each server will use
+ * a temp directory and random ports
+ *
+ * @param instanceQty number of servers to create in the ensemble
+ * @param resetServerIds if true, server Ids are reset first
+ */
+ public TestingCluster(int instanceQty, boolean resetServerIds)
+ {
+ this(makeSpecs(instanceQty, resetServerIds));
+ }
+
+ /**
* Creates an ensemble using the given server specs
*
* @param specs the server specs
@@ -99,17 +111,17 @@ public class TestingCluster implements Closeable
public Collection<InstanceSpec> getInstances()
{
Iterable<InstanceSpec> transformed = Iterables.transform
- (
- servers,
- new Function<TestingZooKeeperServer, InstanceSpec>()
- {
- @Override
- public InstanceSpec apply(TestingZooKeeperServer server)
+ (
+ servers,
+ new Function<TestingZooKeeperServer, InstanceSpec>()
{
- return server.getInstanceSpec();
+ @Override
+ public InstanceSpec apply(TestingZooKeeperServer server)
+ {
+ return server.getInstanceSpec();
+ }
}
- }
- );
+ );
return Lists.newArrayList(transformed);
}
@@ -244,7 +256,15 @@ public class TestingCluster implements Closeable
private static Map<InstanceSpec, Collection<InstanceSpec>> makeSpecs(int instanceQty)
{
- InstanceSpec.reset();
+ return makeSpecs(instanceQty, true);
+ }
+
+ private static Map<InstanceSpec, Collection<InstanceSpec>> makeSpecs(int instanceQty, boolean resetServerIds)
+ {
+ if ( resetServerIds )
+ {
+ InstanceSpec.reset();
+ }
ImmutableList.Builder<InstanceSpec> builder = ImmutableList.builder();
for ( int i = 0; i < instanceQty; ++i )
{