You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by dr...@apache.org on 2015/08/17 18:54:46 UTC

[16/27] curator git commit: Moved EnsembleTracker and did some refactoring

Moved EnsembleTracker and did some refactoring


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/d86f51f8
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/d86f51f8
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/d86f51f8

Branch: refs/heads/CURATOR-215
Commit: d86f51f8c2c473715cddfcd32f087e2510332b4f
Parents: 299a202
Author: randgalt <ra...@apache.org>
Authored: Sat May 9 09:47:04 2015 -0500
Committer: Scott Blum <dr...@apache.org>
Committed: Wed Aug 12 17:28:42 2015 -0400

----------------------------------------------------------------------
 .../framework/ensemble/EnsembleTracker.java     | 191 ++++++++++++++++++
 .../curator/framework/imps/EnsembleTracker.java | 196 -------------------
 .../framework/imps/TestReconfiguration.java     |   1 +
 3 files changed, 192 insertions(+), 196 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/d86f51f8/curator-framework/src/main/java/org/apache/curator/framework/ensemble/EnsembleTracker.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/ensemble/EnsembleTracker.java b/curator-framework/src/main/java/org/apache/curator/framework/ensemble/EnsembleTracker.java
new file mode 100644
index 0000000..375e1f0
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/ensemble/EnsembleTracker.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.ensemble;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import org.apache.curator.ensemble.EnsembleListener;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.ByteArrayInputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tracks changes to the ensemble and notifies registered {@link org.apache.curator.ensemble.EnsembleListener} instances.
+ */
+public class EnsembleTracker implements Closeable
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final CuratorFramework client;
+    private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
+    private final ListenerContainer<EnsembleListener> listeners = new ListenerContainer<>();
+    private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
+    {
+        @Override
+        public void stateChanged(CuratorFramework client, ConnectionState newState)
+        {
+            if ( (newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED) )
+            {
+                try
+                {
+                    reset();
+                }
+                catch ( Exception e )
+                {
+                    log.error("Trying to reset after reconnection", e);
+                }
+            }
+        }
+    };
+
+    private final CuratorWatcher watcher = new CuratorWatcher()
+    {
+        @Override
+        public void process(WatchedEvent event) throws Exception
+        {
+            if ( event.getType() == Watcher.Event.EventType.NodeDataChanged )
+            {
+                reset();
+            }
+        }
+    };
+
+    private enum State
+    {
+        LATENT,
+        STARTED,
+        CLOSED
+    }
+
+    private final BackgroundCallback backgroundCallback = new BackgroundCallback()
+    {
+        @Override
+        public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+        {
+            processBackgroundResult(event);
+        }
+    };
+
+    public EnsembleTracker(CuratorFramework client)
+    {
+        this.client = client;
+    }
+
+    public void start() throws Exception
+    {
+        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
+        client.getConnectionStateListenable().addListener(connectionStateListener);
+        reset();
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+        if ( state.compareAndSet(State.STARTED, State.CLOSED) )
+        {
+            listeners.clear();
+        }
+        client.getConnectionStateListenable().removeListener(connectionStateListener);
+    }
+
+    /**
+     * Return the ensemble listenable
+     *
+     * @return listenable
+     */
+    public ListenerContainer<EnsembleListener> getListenable()
+    {
+        Preconditions.checkState(state.get() != State.CLOSED, "Closed");
+
+        return listeners;
+    }
+
+    private void reset() throws Exception
+    {
+        client.getConfig().usingWatcher(watcher).inBackground(backgroundCallback).forEnsemble();
+    }
+
+    private void processBackgroundResult(CuratorEvent event) throws Exception
+    {
+        switch ( event.getType() )
+        {
+            case GET_CONFIG:
+            {
+                if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
+                {
+                    processConfigData(event.getData());
+                }
+            }
+        }
+    }
+
+    private void processConfigData(byte[] data) throws Exception
+    {
+        Properties properties = new Properties();
+        properties.load(new ByteArrayInputStream(data));
+        QuorumVerifier qv = new QuorumMaj(properties);
+        StringBuilder sb = new StringBuilder();
+        for ( QuorumPeer.QuorumServer server : qv.getAllMembers().values() )
+        {
+            if ( sb.length() != 0 )
+            {
+                sb.append(",");
+            }
+            sb.append(server.clientAddr.getAddress().getHostAddress()).append(":").append(server.clientAddr.getPort());
+        }
+
+        final String connectionString = sb.toString();
+        listeners.forEach
+            (
+                new Function<EnsembleListener, Void>()
+                {
+                    @Override
+                    public Void apply(EnsembleListener listener)
+                    {
+                        try
+                        {
+                            listener.connectionStringUpdated(connectionString);
+                        }
+                        catch ( Exception e )
+                        {
+                            log.error("Calling listener", e);
+                        }
+                        return null;
+                    }
+                }
+            );
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/d86f51f8/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
deleted file mode 100644
index 6688848..0000000
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.curator.framework.imps;
-
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import org.apache.curator.ensemble.EnsembleListener;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.BackgroundCallback;
-import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.api.CuratorWatcher;
-import org.apache.curator.framework.listen.ListenerContainer;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.server.quorum.QuorumPeer;
-import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
-import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.ByteArrayInputStream;
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * Tracks changes to the ensemble and notifies registered {@link org.apache.curator.ensemble.EnsembleListener} instances.
- */
-public class EnsembleTracker implements Closeable
-{
-    private final Logger log = LoggerFactory.getLogger(getClass());
-    private final CuratorFramework client;
-    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
-    private final ListenerContainer<EnsembleListener> listeners = new ListenerContainer<EnsembleListener>();
-    private final AtomicBoolean isConnected = new AtomicBoolean(true);
-    private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
-    {
-        @Override
-        public void stateChanged(CuratorFramework client, ConnectionState newState)
-        {
-            if ( (newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED) )
-            {
-                if ( isConnected.compareAndSet(false, true) )
-                {
-                    try
-                    {
-                        reset();
-                    }
-                    catch ( Exception e )
-                    {
-                        log.error("Trying to reset after reconnection", e);
-                    }
-                }
-            }
-            else
-            {
-                isConnected.set(false);
-            }
-        }
-    };
-
-    private final CuratorWatcher watcher = new CuratorWatcher()
-    {
-        @Override
-        public void process(WatchedEvent event) throws Exception
-        {
-            reset();
-        }
-    };
-
-    private enum State
-    {
-        LATENT,
-        STARTED,
-        CLOSED
-    }
-
-    private final BackgroundCallback backgroundCallback = new BackgroundCallback()
-    {
-        @Override
-        public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
-        {
-            processBackgroundResult(event);
-        }
-    };
-
-    public EnsembleTracker(CuratorFramework client)
-    {
-        this.client = client;
-    }
-
-    public void start() throws Exception
-    {
-        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
-        client.getConnectionStateListenable().addListener(connectionStateListener);
-        reset();
-    }
-
-    @Override
-    public void close() throws IOException
-    {
-        if ( state.compareAndSet(State.STARTED, State.CLOSED) )
-        {
-            listeners.clear();
-        }
-        client.getConnectionStateListenable().removeListener(connectionStateListener);
-    }
-
-    /**
-     * Return the ensemble listenable
-     *
-     * @return listenable
-     */
-    public ListenerContainer<EnsembleListener> getListenable()
-    {
-        Preconditions.checkState(state.get() != State.CLOSED, "Closed");
-
-        return listeners;
-    }
-
-    private void reset() throws Exception
-    {
-        client.getConfig().usingWatcher(watcher).inBackground(backgroundCallback).forEnsemble();
-    }
-
-    private void processBackgroundResult(CuratorEvent event) throws Exception
-    {
-        switch ( event.getType() )
-        {
-        case GET_CONFIG:
-        {
-            if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
-            {
-                processConfigData(event.getData());
-            }
-        }
-        }
-    }
-
-    private void processConfigData(byte[] data) throws Exception
-    {
-        Properties properties = new Properties();
-        properties.load(new ByteArrayInputStream(data));
-        QuorumVerifier qv = new QuorumMaj(properties);
-        StringBuilder sb = new StringBuilder();
-        for ( QuorumPeer.QuorumServer server : qv.getAllMembers().values() )
-        {
-            if ( sb.length() != 0 )
-            {
-                sb.append(",");
-            }
-            sb.append(server.clientAddr.getAddress().getHostAddress()).append(":").append(server.clientAddr.getPort());
-        }
-
-        final String connectionString = sb.toString();
-        listeners.forEach
-            (
-                new Function<EnsembleListener, Void>()
-                {
-                    @Override
-                    public Void apply(EnsembleListener listener)
-                    {
-                        try
-                        {
-                            listener.connectionStringUpdated(connectionString);
-                        }
-                        catch ( Exception e )
-                        {
-                            log.error("Calling listener", e);
-                        }
-                        return null;
-                    }
-                }
-            );
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/d86f51f8/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
index 297cf9b..133e690 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
@@ -25,6 +25,7 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.ensemble.EnsembleTracker;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.InstanceSpec;
 import org.apache.curator.test.TestingCluster;