You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by dr...@apache.org on 2015/08/17 18:54:46 UTC
[16/27] curator git commit: Moved EnsembleTracker and did some
refactoring
Moved EnsembleTracker and did some refactoring
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/d86f51f8
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/d86f51f8
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/d86f51f8
Branch: refs/heads/CURATOR-215
Commit: d86f51f8c2c473715cddfcd32f087e2510332b4f
Parents: 299a202
Author: randgalt <ra...@apache.org>
Authored: Sat May 9 09:47:04 2015 -0500
Committer: Scott Blum <dr...@apache.org>
Committed: Wed Aug 12 17:28:42 2015 -0400
----------------------------------------------------------------------
.../framework/ensemble/EnsembleTracker.java | 191 ++++++++++++++++++
.../curator/framework/imps/EnsembleTracker.java | 196 -------------------
.../framework/imps/TestReconfiguration.java | 1 +
3 files changed, 192 insertions(+), 196 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/d86f51f8/curator-framework/src/main/java/org/apache/curator/framework/ensemble/EnsembleTracker.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/ensemble/EnsembleTracker.java b/curator-framework/src/main/java/org/apache/curator/framework/ensemble/EnsembleTracker.java
new file mode 100644
index 0000000..375e1f0
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/ensemble/EnsembleTracker.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.ensemble;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import org.apache.curator.ensemble.EnsembleListener;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.ByteArrayInputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tracks changes to the ensemble and notifies registered {@link org.apache.curator.ensemble.EnsembleListener} instances.
+ */
+public class EnsembleTracker implements Closeable
+{
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private final CuratorFramework client;
+ private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
+ private final ListenerContainer<EnsembleListener> listeners = new ListenerContainer<>();
+ private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
+ {
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ if ( (newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED) )
+ {
+ try
+ {
+ reset();
+ }
+ catch ( Exception e )
+ {
+ log.error("Trying to reset after reconnection", e);
+ }
+ }
+ }
+ };
+
+ private final CuratorWatcher watcher = new CuratorWatcher()
+ {
+ @Override
+ public void process(WatchedEvent event) throws Exception
+ {
+ if ( event.getType() == Watcher.Event.EventType.NodeDataChanged )
+ {
+ reset();
+ }
+ }
+ };
+
+ private enum State
+ {
+ LATENT,
+ STARTED,
+ CLOSED
+ }
+
+ private final BackgroundCallback backgroundCallback = new BackgroundCallback()
+ {
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+ {
+ processBackgroundResult(event);
+ }
+ };
+
+ public EnsembleTracker(CuratorFramework client)
+ {
+ this.client = client;
+ }
+
+ public void start() throws Exception
+ {
+ Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
+ client.getConnectionStateListenable().addListener(connectionStateListener);
+ reset();
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ if ( state.compareAndSet(State.STARTED, State.CLOSED) )
+ {
+ listeners.clear();
+ }
+ client.getConnectionStateListenable().removeListener(connectionStateListener);
+ }
+
+ /**
+ * Return the ensemble listenable
+ *
+ * @return listenable
+ */
+ public ListenerContainer<EnsembleListener> getListenable()
+ {
+ Preconditions.checkState(state.get() != State.CLOSED, "Closed");
+
+ return listeners;
+ }
+
+ private void reset() throws Exception
+ {
+ client.getConfig().usingWatcher(watcher).inBackground(backgroundCallback).forEnsemble();
+ }
+
+ private void processBackgroundResult(CuratorEvent event) throws Exception
+ {
+ switch ( event.getType() )
+ {
+ case GET_CONFIG:
+ {
+ if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
+ {
+ processConfigData(event.getData());
+ }
+ }
+ }
+ }
+
+ private void processConfigData(byte[] data) throws Exception
+ {
+ Properties properties = new Properties();
+ properties.load(new ByteArrayInputStream(data));
+ QuorumVerifier qv = new QuorumMaj(properties);
+ StringBuilder sb = new StringBuilder();
+ for ( QuorumPeer.QuorumServer server : qv.getAllMembers().values() )
+ {
+ if ( sb.length() != 0 )
+ {
+ sb.append(",");
+ }
+ sb.append(server.clientAddr.getAddress().getHostAddress()).append(":").append(server.clientAddr.getPort());
+ }
+
+ final String connectionString = sb.toString();
+ listeners.forEach
+ (
+ new Function<EnsembleListener, Void>()
+ {
+ @Override
+ public Void apply(EnsembleListener listener)
+ {
+ try
+ {
+ listener.connectionStringUpdated(connectionString);
+ }
+ catch ( Exception e )
+ {
+ log.error("Calling listener", e);
+ }
+ return null;
+ }
+ }
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/d86f51f8/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
deleted file mode 100644
index 6688848..0000000
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.curator.framework.imps;
-
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import org.apache.curator.ensemble.EnsembleListener;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.BackgroundCallback;
-import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.api.CuratorWatcher;
-import org.apache.curator.framework.listen.ListenerContainer;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.server.quorum.QuorumPeer;
-import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
-import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.ByteArrayInputStream;
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * Tracks changes to the ensemble and notifies registered {@link org.apache.curator.ensemble.EnsembleListener} instances.
- */
-public class EnsembleTracker implements Closeable
-{
- private final Logger log = LoggerFactory.getLogger(getClass());
- private final CuratorFramework client;
- private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
- private final ListenerContainer<EnsembleListener> listeners = new ListenerContainer<EnsembleListener>();
- private final AtomicBoolean isConnected = new AtomicBoolean(true);
- private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
- {
- @Override
- public void stateChanged(CuratorFramework client, ConnectionState newState)
- {
- if ( (newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED) )
- {
- if ( isConnected.compareAndSet(false, true) )
- {
- try
- {
- reset();
- }
- catch ( Exception e )
- {
- log.error("Trying to reset after reconnection", e);
- }
- }
- }
- else
- {
- isConnected.set(false);
- }
- }
- };
-
- private final CuratorWatcher watcher = new CuratorWatcher()
- {
- @Override
- public void process(WatchedEvent event) throws Exception
- {
- reset();
- }
- };
-
- private enum State
- {
- LATENT,
- STARTED,
- CLOSED
- }
-
- private final BackgroundCallback backgroundCallback = new BackgroundCallback()
- {
- @Override
- public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
- {
- processBackgroundResult(event);
- }
- };
-
- public EnsembleTracker(CuratorFramework client)
- {
- this.client = client;
- }
-
- public void start() throws Exception
- {
- Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
- client.getConnectionStateListenable().addListener(connectionStateListener);
- reset();
- }
-
- @Override
- public void close() throws IOException
- {
- if ( state.compareAndSet(State.STARTED, State.CLOSED) )
- {
- listeners.clear();
- }
- client.getConnectionStateListenable().removeListener(connectionStateListener);
- }
-
- /**
- * Return the ensemble listenable
- *
- * @return listenable
- */
- public ListenerContainer<EnsembleListener> getListenable()
- {
- Preconditions.checkState(state.get() != State.CLOSED, "Closed");
-
- return listeners;
- }
-
- private void reset() throws Exception
- {
- client.getConfig().usingWatcher(watcher).inBackground(backgroundCallback).forEnsemble();
- }
-
- private void processBackgroundResult(CuratorEvent event) throws Exception
- {
- switch ( event.getType() )
- {
- case GET_CONFIG:
- {
- if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
- {
- processConfigData(event.getData());
- }
- }
- }
- }
-
- private void processConfigData(byte[] data) throws Exception
- {
- Properties properties = new Properties();
- properties.load(new ByteArrayInputStream(data));
- QuorumVerifier qv = new QuorumMaj(properties);
- StringBuilder sb = new StringBuilder();
- for ( QuorumPeer.QuorumServer server : qv.getAllMembers().values() )
- {
- if ( sb.length() != 0 )
- {
- sb.append(",");
- }
- sb.append(server.clientAddr.getAddress().getHostAddress()).append(":").append(server.clientAddr.getPort());
- }
-
- final String connectionString = sb.toString();
- listeners.forEach
- (
- new Function<EnsembleListener, Void>()
- {
- @Override
- public Void apply(EnsembleListener listener)
- {
- try
- {
- listener.connectionStringUpdated(connectionString);
- }
- catch ( Exception e )
- {
- log.error("Calling listener", e);
- }
- return null;
- }
- }
- );
- }
-}
http://git-wip-us.apache.org/repos/asf/curator/blob/d86f51f8/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
index 297cf9b..133e690 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
@@ -25,6 +25,7 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.ensemble.EnsembleTracker;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.InstanceSpec;
import org.apache.curator.test.TestingCluster;