You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by dr...@apache.org on 2015/08/17 18:54:34 UTC
[04/27] curator git commit: [CURATOR-160] Add builders and dsl for
ZooKeeper's config and reconfig methods.
[CURATOR-160] Add builders and dsl for ZooKeeper's config and reconfig methods.
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/2bd53fc2
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/2bd53fc2
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/2bd53fc2
Branch: refs/heads/CURATOR-215
Commit: 2bd53fc28fbaa70ed380af0579853db83b63a663
Parents: 143044b
Author: Ioannis Canellos <io...@gmail.com>
Authored: Thu Nov 6 17:34:47 2014 +0200
Committer: Scott Blum <dr...@apache.org>
Committed: Wed Aug 12 16:17:19 2015 -0400
----------------------------------------------------------------------
.../curator/framework/CuratorFramework.java | 12 +
.../framework/api/AsyncReconfigurable.java | 29 ++
.../curator/framework/api/DataCallbackable.java | 32 ++
.../curator/framework/api/GetConfigBuilder.java | 27 ++
.../api/IncrementalReconfigBuilder.java | 33 ++
.../apache/curator/framework/api/Joinable.java | 40 +++
.../apache/curator/framework/api/Leaveable.java | 38 +++
.../curator/framework/api/Memberable.java | 40 +++
.../api/NonIncrementalReconfigBuilder.java | 32 ++
.../curator/framework/api/ReconfigBuilder.java | 26 ++
.../framework/api/SyncReconfigurable.java | 30 ++
.../framework/imps/CuratorFrameworkImpl.java | 10 +
.../framework/imps/GetConfigBuilderImpl.java | 80 +++++
.../framework/imps/ReconfigBuilderImpl.java | 182 +++++++++++
.../framework/imps/TestReconfiguration.java | 303 +++++++++++++++++++
15 files changed, 914 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/2bd53fc2/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index 9c23ddb..181e4e8 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -122,6 +122,18 @@ public interface CuratorFramework extends Closeable
public SetACLBuilder setACL();
/**
+ * Start a reconfig builder
+ * @return builder object
+ */
+ public ReconfigBuilder reconfig();
+
+ /**
+ * Start a getConfig builder
+ * @return
+ */
+ public GetConfigBuilder getConfig();
+
+ /**
* Start a transaction builder
*
* @return builder object
http://git-wip-us.apache.org/repos/asf/curator/blob/2bd53fc2/curator-framework/src/main/java/org/apache/curator/framework/api/AsyncReconfigurable.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/AsyncReconfigurable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AsyncReconfigurable.java
new file mode 100644
index 0000000..fc7fd57
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AsyncReconfigurable.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api;
+
+public interface AsyncReconfigurable {
+
+ /**
+ * Sets the configuration version to use.
+ * @param config The version of the configuration.
+ * @throws Exception
+ */
+ void fromConfig(long config) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/2bd53fc2/curator-framework/src/main/java/org/apache/curator/framework/api/DataCallbackable.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/DataCallbackable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/DataCallbackable.java
new file mode 100644
index 0000000..75ded65
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/DataCallbackable.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api;
+
+import org.apache.zookeeper.AsyncCallback.DataCallback;
+
+public interface DataCallbackable<T> {
+
+ /**
+ * Passes a callback and a context object to the config/reconfig command.
+ * @param callback The async callback to use.
+ * @param ctx An object that will be passed to the callback.
+ * @return this
+ */
+ T usingDataCallback(DataCallback callback, Object ctx);
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/2bd53fc2/curator-framework/src/main/java/org/apache/curator/framework/api/GetConfigBuilder.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/GetConfigBuilder.java b/curator-framework/src/main/java/org/apache/curator/framework/api/GetConfigBuilder.java
new file mode 100644
index 0000000..c7c013b
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/GetConfigBuilder.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api;
+
+public interface GetConfigBuilder extends
+ Watchable<GetConfigBuilder>,
+ DataCallbackable<Void>,
+ Statable<byte[]> {
+}
+
+
http://git-wip-us.apache.org/repos/asf/curator/blob/2bd53fc2/curator-framework/src/main/java/org/apache/curator/framework/api/IncrementalReconfigBuilder.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/IncrementalReconfigBuilder.java b/curator-framework/src/main/java/org/apache/curator/framework/api/IncrementalReconfigBuilder.java
new file mode 100644
index 0000000..0ad6426
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/IncrementalReconfigBuilder.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api;
+
+/**
+ * An incremental reconfiguration builder.
+ * This builder has access only to the incremental reconfiguration methods join and leave, so that we prevent
+ * mixing concepts that can't be used together.
+ * @param <T>
+ */
+public interface IncrementalReconfigBuilder<T> extends
+ Joinable<IncrementalReconfigBuilder<T>>,
+ Leaveable<IncrementalReconfigBuilder<T>>,
+ DataCallbackable<AsyncReconfigurable>,
+ Statable<SyncReconfigurable> {
+
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/2bd53fc2/curator-framework/src/main/java/org/apache/curator/framework/api/Joinable.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/Joinable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/Joinable.java
new file mode 100644
index 0000000..ff1b3c5
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/Joinable.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api;
+
+import java.util.Collection;
+
+public interface Joinable<T> {
+
+ /**
+ * Adds a server to join the ensemble.
+ * The expected format is server.[id]=[hostname]:[peer port]:[election port]:[type];[client port]
+ * @param server The server to join.
+ * @return this.
+ */
+ T join(String server);
+
+ /**
+ * Adds a collection of servers to the ensemble.
+ * The expected format is server.[id]=[hostname]:[peer port]:[election port]:[type];[client port]
+ * @param servers The collection of servers to join
+ * @return this
+ */
+ T join(Collection<String> servers);
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/2bd53fc2/curator-framework/src/main/java/org/apache/curator/framework/api/Leaveable.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/Leaveable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/Leaveable.java
new file mode 100644
index 0000000..8560d65
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/Leaveable.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api;
+
+import java.util.Collection;
+
+public interface Leaveable<T> {
+
+ /**
+ * Sets a server to leave the ensemble.
+ * @param server The server id.
+ * @return this
+ */
+ T leave(String server);
+
+ /**
+ * Sets a collection of servers to leave the ensemble.
+ * @param servers The collection of server ids.
+ * @return this.
+ */
+ T leave(Collection<String> servers);
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/2bd53fc2/curator-framework/src/main/java/org/apache/curator/framework/api/Memberable.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/Memberable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/Memberable.java
new file mode 100644
index 0000000..5b62dba
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/Memberable.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api;
+
+import java.util.Collection;
+
+public interface Memberable<T> {
+
+ /**
+ * Sets a member that is meant to be part of the ensemble.
+ * The expected format is server.[id]=[hostname]:[peer port]:[election port]:[type];[client port]
+ * @param server The server to add as a member of the ensemble.
+ * @return this.
+ */
+ T withMember(String server);
+
+ /**
+ * Sets a collection of members member that is meant to be part of the ensemble.
+ * The expected format is server.[id]=[hostname]:[peer port]:[election port]:[type];[client port]
+ * @param servers The collection of server to set as a members of the ensemble.
+ * @return this.
+ */
+ T withMembers(Collection<String> servers);
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/2bd53fc2/curator-framework/src/main/java/org/apache/curator/framework/api/NonIncrementalReconfigBuilder.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/NonIncrementalReconfigBuilder.java b/curator-framework/src/main/java/org/apache/curator/framework/api/NonIncrementalReconfigBuilder.java
new file mode 100644
index 0000000..2f6a9c6
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/NonIncrementalReconfigBuilder.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api;
+
+/**
+ * An non-incremental reconfiguration builder.
+ * This builder has access only to the non-incremental reconfiguration methods withMembers, so that we prevent
+ * mixing concepts that can't be used together.
+ * @param <T>
+ */
+public interface NonIncrementalReconfigBuilder<T> extends
+ Memberable<NonIncrementalReconfigBuilder<T>>,
+ DataCallbackable<AsyncReconfigurable>,
+ Statable<SyncReconfigurable> {
+
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/2bd53fc2/curator-framework/src/main/java/org/apache/curator/framework/api/ReconfigBuilder.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/ReconfigBuilder.java b/curator-framework/src/main/java/org/apache/curator/framework/api/ReconfigBuilder.java
new file mode 100644
index 0000000..0e420a1
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/ReconfigBuilder.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api;
+
+public interface ReconfigBuilder extends
+ Joinable<IncrementalReconfigBuilder<byte[]>>,
+ Leaveable<IncrementalReconfigBuilder<byte[]>>,
+ Memberable<NonIncrementalReconfigBuilder<byte[]>> {
+
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/2bd53fc2/curator-framework/src/main/java/org/apache/curator/framework/api/SyncReconfigurable.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/SyncReconfigurable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/SyncReconfigurable.java
new file mode 100644
index 0000000..bd7b96b
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/SyncReconfigurable.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api;
+
+public interface SyncReconfigurable {
+
+ /**
+ * Sets the configuration version to use.
+ * @param config The version of the configuration.
+ * @return The configuration data.
+ * @throws Exception
+ */
+ byte[] fromConfig(long config) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/2bd53fc2/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 5034ed9..b9b9c31 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -398,6 +398,16 @@ public class CuratorFrameworkImpl implements CuratorFramework
}
@Override
+ public ReconfigBuilder reconfig() {
+ return new ReconfigBuilderImpl(this);
+ }
+
+ @Override
+ public GetConfigBuilder getConfig() {
+ return new GetConfigBuilderImpl(this);
+ }
+
+ @Override
public CuratorTransaction inTransaction()
{
Preconditions.checkState(getState() == CuratorFrameworkState.STARTED, "instance must be started before calling this method");
http://git-wip-us.apache.org/repos/asf/curator/blob/2bd53fc2/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java
new file mode 100644
index 0000000..a56894d
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.api.GetConfigBuilder;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+
+public class GetConfigBuilderImpl implements GetConfigBuilder {
+
+ private final CuratorFrameworkImpl client;
+ private boolean watched;
+ private Watcher watcher;
+
+ public GetConfigBuilderImpl(CuratorFrameworkImpl client) {
+ this.client = client;
+ }
+
+ @Override
+ public Void usingDataCallback(AsyncCallback.DataCallback callback, Object ctx) {
+ try {
+ if (watcher != null) {
+ client.getZooKeeper().getConfig(watcher, callback, ctx);
+ } else {
+ client.getZooKeeper().getConfig(watched, callback, ctx);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ }
+
+ @Override
+ public byte[] storingStatIn(Stat stat) {
+ try {
+ if (watcher != null) {
+ return client.getZooKeeper().getConfig(watcher, stat);
+ } else {
+ return client.getZooKeeper().getConfig(watched, stat);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public GetConfigBuilder watched() {
+ this.watched = true;
+ return this;
+ }
+
+ @Override
+ public GetConfigBuilder usingWatcher(Watcher watcher) {
+ this.watcher = watcher;
+ return null;
+ }
+
+ @Override
+ public GetConfigBuilder usingWatcher(final CuratorWatcher watcher) {
+ throw new UnsupportedOperationException("GetConfigBuilder doesn't support CuratorWatcher, please use Watcher instead.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/2bd53fc2/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java
new file mode 100644
index 0000000..7b39be6
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.framework.api.AsyncReconfigurable;
+import org.apache.curator.framework.api.IncrementalReconfigBuilder;
+import org.apache.curator.framework.api.NonIncrementalReconfigBuilder;
+import org.apache.curator.framework.api.ReconfigBuilder;
+import org.apache.curator.framework.api.SyncReconfigurable;
+import org.apache.zookeeper.AsyncCallback.DataCallback;
+import org.apache.zookeeper.data.Stat;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+
+public class ReconfigBuilderImpl implements ReconfigBuilder {
+
+ private final CuratorFrameworkImpl client;
+
+ private static class IncrementalReconfigBuilderImpl<T> implements IncrementalReconfigBuilder<T> {
+
+ private final CuratorFrameworkImpl client;
+ private List<String> joiningServers = new LinkedList<String>();
+ private List<String> leavingServers = new LinkedList<String>();
+
+ private IncrementalReconfigBuilderImpl(CuratorFrameworkImpl client) {
+ this.client = client;
+ }
+
+ @Override
+ public IncrementalReconfigBuilderImpl<T> join(String server) {
+ joiningServers.add(server);
+ return this;
+ }
+
+ @Override
+ public IncrementalReconfigBuilder<T> join(Collection<String> servers) {
+ joiningServers.addAll(servers);
+ return this;
+ }
+
+ @Override
+ public IncrementalReconfigBuilderImpl<T> leave(String server) {
+ leavingServers.add(server);
+ return this;
+ }
+
+ @Override
+ public IncrementalReconfigBuilder<T> leave(Collection<String> servers) {
+ leavingServers.addAll(servers);
+ return this;
+ }
+
+ @Override
+ public SyncReconfigurable storingStatIn(final Stat stat) {
+ return new SyncReconfigurable() {
+ @Override
+ public byte[] fromConfig(long config) throws Exception {
+ return client
+ .getZooKeeper()
+ .reconfig(joiningServers.isEmpty() ? null : joiningServers,
+ leavingServers.isEmpty() ? null : leavingServers,
+ null,
+ config, stat);
+ }
+ };
+ }
+
+ @Override
+ public AsyncReconfigurable usingDataCallback(final DataCallback callback, final Object ctx) {
+ return new AsyncReconfigurable() {
+ @Override
+ public void fromConfig(long config) throws Exception {
+ client.getZooKeeper()
+ .reconfig(joiningServers.isEmpty() ? null : joiningServers,
+ leavingServers.isEmpty() ? null : leavingServers,
+ null,
+ config, callback, ctx);
+ }
+ };
+ }
+ }
+
+ private static class NonIncrementalReconfigBuilderImpl<T> implements NonIncrementalReconfigBuilder<T> {
+
+ private final CuratorFrameworkImpl client;
+ private List<String> newMembers = new LinkedList<String>();
+
+ private NonIncrementalReconfigBuilderImpl(CuratorFrameworkImpl client) {
+ this.client = client;
+ }
+
+ private NonIncrementalReconfigBuilderImpl(CuratorFrameworkImpl client, List<String> newMembers) {
+ this.client = client;
+ this.newMembers = newMembers;
+ }
+
+ @Override
+ public NonIncrementalReconfigBuilder<T> withMember(String server) {
+ newMembers.add(server);
+ return this;
+ }
+
+ @Override
+ public NonIncrementalReconfigBuilder<T> withMembers(Collection servers) {
+ newMembers.addAll(servers);
+ return this;
+ }
+
+ @Override
+ public SyncReconfigurable storingStatIn(final Stat stat) {
+ return new SyncReconfigurable() {
+ @Override
+ public byte[] fromConfig(long config) throws Exception {
+ return client.getZooKeeper().reconfig(null, null, newMembers, config, stat);
+ }
+ };
+ }
+
+ @Override
+ public AsyncReconfigurable usingDataCallback(final DataCallback callback, final Object ctx) {
+ return new AsyncReconfigurable() {
+ @Override
+ public void fromConfig(long config) throws Exception {
+ client.getZooKeeper().reconfig(null, null, newMembers, config, callback, ctx);
+ }
+ };
+ }
+ }
+
+
+ public ReconfigBuilderImpl(CuratorFrameworkImpl client) {
+ this.client = client;
+ }
+
+ @Override
+ public IncrementalReconfigBuilder<byte[]> join(String server) {
+ return new IncrementalReconfigBuilderImpl(client).join(server);
+ }
+
+ @Override
+ public IncrementalReconfigBuilder<byte[]> join(Collection<String> servers) {
+ return new IncrementalReconfigBuilderImpl(client).join(servers);
+ }
+
+ @Override
+ public IncrementalReconfigBuilder<byte[]> leave(String server) {
+ return new IncrementalReconfigBuilderImpl(client).leave(server);
+ }
+
+ @Override
+ public IncrementalReconfigBuilder<byte[]> leave(Collection<String> servers) {
+ return new IncrementalReconfigBuilderImpl(client).leave(servers);
+ }
+
+ @Override
+ public NonIncrementalReconfigBuilder<byte[]> withMember(String server) {
+ return new NonIncrementalReconfigBuilderImpl(client).withMember(server);
+ }
+
+ @Override
+ public NonIncrementalReconfigBuilder<byte[]> withMembers(Collection<String> servers) {
+ return new NonIncrementalReconfigBuilderImpl<byte[]>(client).withMembers(servers);
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/2bd53fc2/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
new file mode 100644
index 0000000..6918825
--- /dev/null
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingCluster;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class TestReconfiguration {
+
+ static TestingCluster cluster;
+
+ @BeforeClass
+ public void setup() throws Exception {
+ cluster = new TestingCluster(5);
+ cluster.start();
+ }
+
+ @AfterClass
+ public void tearDown() throws IOException {
+ cluster.close();
+ }
+
+ @Test
+ public void testSyncIncremental() throws Exception {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), new RetryOneTime(1));
+ client.start();
+ client.blockUntilConnected();
+ try {
+ Stat stat = new Stat();
+ byte[] bytes = client.getConfig().storingStatIn(stat);
+ Assert.assertNotNull(bytes);
+ QuorumVerifier qv = getQuorumVerifier(bytes);
+ Assert.assertEquals(5, qv.getAllMembers().size());
+ String server1 = getServerString(qv, cluster, 1L);
+ String server2 = getServerString(qv, cluster, 2L);
+
+ //Remove Servers
+ bytes = client.reconfig().leave("1").storingStatIn(stat).fromConfig(qv.getVersion());
+ qv = getQuorumVerifier(bytes);
+ Assert.assertEquals(4, qv.getAllMembers().size());
+ bytes = client.reconfig().leave("2").storingStatIn(stat).fromConfig(qv.getVersion());
+ qv = getQuorumVerifier(bytes);
+ Assert.assertEquals(3, qv.getAllMembers().size());
+
+ //Add Servers
+ bytes = client.reconfig().join("server.1=" + server1).storingStatIn(stat).fromConfig(qv.getVersion());
+ qv = getQuorumVerifier(bytes);
+ Assert.assertEquals(4, qv.getAllMembers().size());
+ bytes = client.reconfig().join("server.2=" + server2).storingStatIn(stat).fromConfig(qv.getVersion());
+ qv = getQuorumVerifier(bytes);
+ Assert.assertEquals(5, qv.getAllMembers().size());
+ } finally {
+ client.close();
+ }
+ }
+
+ @Test
+ public void testAsyncIncremental() throws Exception {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), new RetryOneTime(1));
+ client.start();
+ client.blockUntilConnected();
+ try {
+ final AtomicReference<byte[]> bytes = new AtomicReference<byte[]>();
+ final AsyncCallback.DataCallback callback = new AsyncCallback.DataCallback() {
+ @Override
+ public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
+ bytes.set(data);
+ ((CountDownLatch)ctx).countDown();
+ }
+ };
+
+ CountDownLatch latch = new CountDownLatch(1);
+ client.getConfig().usingDataCallback(callback, latch);
+ latch.await(5, TimeUnit.SECONDS);
+ Assert.assertNotNull(bytes.get());
+ QuorumVerifier qv = getQuorumVerifier(bytes.get());
+ Assert.assertEquals(5, qv.getAllMembers().size());
+ String server1 = getServerString(qv, cluster, 1L);
+ String server2 = getServerString(qv, cluster, 2L);
+
+
+ //Remove Servers
+ latch = new CountDownLatch(1);
+ client.reconfig().leave("1").usingDataCallback(callback, latch).fromConfig(qv.getVersion());
+ latch.await(5, TimeUnit.SECONDS);
+ qv = getQuorumVerifier(bytes.get());
+ Assert.assertEquals(4, qv.getAllMembers().size());
+ latch = new CountDownLatch(1);
+ client.reconfig().leave("2").usingDataCallback(callback, latch).fromConfig(qv.getVersion());
+ latch.await(5, TimeUnit.SECONDS);
+ qv = getQuorumVerifier(bytes.get());
+ Assert.assertEquals(3, qv.getAllMembers().size());
+
+ //Add Servers
+ latch = new CountDownLatch(1);
+ client.reconfig().join("server.1=" + server1).usingDataCallback(callback, latch).fromConfig(qv.getVersion());
+ latch.await(5, TimeUnit.SECONDS);
+ qv = getQuorumVerifier(bytes.get());
+ Assert.assertEquals(4, qv.getAllMembers().size());
+ latch = new CountDownLatch(1);
+ client.reconfig().join("server.2=" + server2).usingDataCallback(callback, latch).fromConfig(qv.getVersion());
+ latch.await(5, TimeUnit.SECONDS);
+ qv = getQuorumVerifier(bytes.get());
+ Assert.assertEquals(5, qv.getAllMembers().size());
+ } finally {
+ client.close();
+ }
+ }
+
+ @Test
+ public void testSyncNonIncremental() throws Exception {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), new RetryOneTime(1));
+ client.start();
+ client.blockUntilConnected();
+ try {
+ Stat stat = new Stat();
+ byte[] bytes = client.getConfig().storingStatIn(stat);
+ Assert.assertNotNull(bytes);
+ QuorumVerifier qv = getQuorumVerifier(bytes);
+ Assert.assertEquals(5, qv.getAllMembers().size());
+ String server1 = getServerString(qv, cluster, 1L);
+ String server2 = getServerString(qv, cluster, 2L);
+ String server3 = getServerString(qv, cluster, 3L);
+ String server4 = getServerString(qv, cluster, 4L);
+ String server5 = getServerString(qv, cluster, 5L);
+
+ //Remove Servers
+ bytes = client.reconfig()
+ .withMember("server.2="+server2)
+ .withMember("server.3="+server3)
+ .withMember("server.4="+server4)
+ .withMember("server.5="+server5)
+ .storingStatIn(stat).fromConfig(qv.getVersion());
+ qv = getQuorumVerifier(bytes);
+ Assert.assertEquals(4, qv.getAllMembers().size());
+ bytes = client.reconfig()
+ .withMember("server.3=" + server3)
+ .withMember("server.4=" + server4)
+ .withMember("server.5=" + server5)
+ .storingStatIn(stat).fromConfig(qv.getVersion());
+
+ qv = getQuorumVerifier(bytes);
+ Assert.assertEquals(3, qv.getAllMembers().size());
+
+ //Add Servers
+ bytes = client.reconfig()
+ .withMember("server.1="+server1)
+ .withMember("server.3=" + server3)
+ .withMember("server.4="+server4)
+ .withMember("server.5="+server5)
+ .storingStatIn(stat).fromConfig(qv.getVersion());
+ qv = getQuorumVerifier(bytes);
+ Assert.assertEquals(4, qv.getAllMembers().size());
+ bytes = client.reconfig()
+ .withMember("server.1="+server1)
+ .withMember("server.2="+server2)
+ .withMember("server.3=" + server3)
+ .withMember("server.4="+server4)
+ .withMember("server.5="+server5)
+ .storingStatIn(stat).fromConfig(qv.getVersion());
+ qv = getQuorumVerifier(bytes);
+ Assert.assertEquals(5, qv.getAllMembers().size());
+ } finally {
+ client.close();
+ }
+ }
+
+ @Test
+ public void testAsyncNonIncremental() throws Exception {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), new RetryOneTime(1));
+ client.start();
+ client.blockUntilConnected();
+ try {
+ final AtomicReference<byte[]> bytes = new AtomicReference<byte[]>();
+ final AsyncCallback.DataCallback callback = new AsyncCallback.DataCallback() {
+ @Override
+ public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
+ bytes.set(data);
+ ((CountDownLatch)ctx).countDown();
+ }
+ };
+
+ CountDownLatch latch = new CountDownLatch(1);
+ client.getConfig().usingDataCallback(callback, latch);
+ latch.await(5, TimeUnit.SECONDS);
+ Assert.assertNotNull(bytes.get());
+ QuorumVerifier qv = getQuorumVerifier(bytes.get());
+ Assert.assertEquals(5, qv.getAllMembers().size());
+ String server1 = getServerString(qv, cluster, 1L);
+ String server2 = getServerString(qv, cluster, 2L);
+ String server3 = getServerString(qv, cluster, 3L);
+ String server4 = getServerString(qv, cluster, 4L);
+ String server5 = getServerString(qv, cluster, 5L);
+
+
+ //Remove Servers
+ latch = new CountDownLatch(1);
+ client.reconfig()
+ .withMember("server.2=" + server2)
+ .withMember("server.3="+server3)
+ .withMember("server.4="+server4)
+ .withMember("server.5="+server5)
+ .usingDataCallback(callback, latch).fromConfig(qv.getVersion());
+ latch.await(5, TimeUnit.SECONDS);
+ qv = getQuorumVerifier(bytes.get());
+ Assert.assertEquals(4, qv.getAllMembers().size());
+ latch = new CountDownLatch(1);
+ client.reconfig()
+ .withMember("server.3="+server3)
+ .withMember("server.4=" + server4)
+ .withMember("server.5=" + server5)
+ .usingDataCallback(callback, latch).fromConfig(qv.getVersion());
+ latch.await(5, TimeUnit.SECONDS);
+ qv = getQuorumVerifier(bytes.get());
+ Assert.assertEquals(3, qv.getAllMembers().size());
+
+ //Add Servers
+ latch = new CountDownLatch(1);
+ client.reconfig()
+ .withMember("server.1="+server1)
+ .withMember("server.3=" + server3)
+ .withMember("server.4=" + server4)
+ .withMember("server.5=" + server5)
+ .usingDataCallback(callback, latch).fromConfig(qv.getVersion());
+ latch.await(5, TimeUnit.SECONDS);
+ qv = getQuorumVerifier(bytes.get());
+ Assert.assertEquals(4, qv.getAllMembers().size());
+ latch = new CountDownLatch(1);
+ client.reconfig()
+ .withMember("server.1="+server1)
+ .withMember("server.2="+server2)
+ .withMember("server.3="+server3)
+ .withMember("server.4=" + server4)
+ .withMember("server.5=" + server5)
+ .usingDataCallback(callback, latch).fromConfig(qv.getVersion());
+ latch.await(5, TimeUnit.SECONDS);
+ qv = getQuorumVerifier(bytes.get());
+ Assert.assertEquals(5, qv.getAllMembers().size());
+ } finally {
+ client.close();
+ }
+ }
+
+
+ static QuorumVerifier getQuorumVerifier(byte[] bytes) throws Exception {
+ Properties properties = new Properties();
+ properties.load(new StringReader(new String(bytes)));
+ return new QuorumMaj(properties);
+ }
+
+ static InstanceSpec getInstance(TestingCluster cluster, int id) {
+ for (InstanceSpec spec : cluster.getInstances()) {
+ if (spec.getServerId() == id) {
+ return spec;
+ }
+ }
+ throw new IllegalStateException("InstanceSpec with id:" + id + " not found");
+ }
+
+ static String getServerString(QuorumVerifier qv, TestingCluster cluster, long id) throws Exception {
+ String str = qv.getAllMembers().get(id).toString();
+ //check if connection string is already there.
+ if (str.contains(";")) {
+ return str;
+ } else {
+ return str + ";" + getInstance(cluster, (int) id).getConnectString();
+ }
+ }
+}
\ No newline at end of file