You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2013/09/07 04:40:38 UTC

git commit: Taking a custom Executor and ThreadFactory wasn't thought through. It's actually useless. This constructor is now deprecated in favor of passing in an ExecutorService. The old version is wrapped internally so that it works as it used to. But,

Updated Branches:
  refs/heads/CURATOR-51 [created] 50e8fbb8c


Taking a custom Executor and ThreadFactory wasn't thought through. It's actually useless. This constructor is now
deprecated in favor of passing in an ExecutorService. The old version is wrapped internally so that it works as it
used to. But, it will be removed in the near future.


Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/50e8fbb8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/50e8fbb8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/50e8fbb8

Branch: refs/heads/CURATOR-51
Commit: 50e8fbb8c5b77c27bb5276b044c855f5c0d2614b
Parents: fd4cd43
Author: randgalt <ra...@apache.org>
Authored: Fri Sep 6 19:39:01 2013 -0700
Committer: randgalt <ra...@apache.org>
Committed: Fri Sep 6 19:39:01 2013 -0700

----------------------------------------------------------------------
 .../recipes/leader/LeaderSelector.java          | 229 +++++++++++--------
 .../leader/TestLeaderSelectorWithExecutor.java  |  99 ++++++++
 2 files changed, 238 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/50e8fbb8/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java
index ab8a802..afb07bc 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java
@@ -16,14 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.framework.recipes.leader;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.utils.CloseableExecutorService;
 import org.apache.curator.utils.ThreadUtils;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
@@ -31,42 +32,43 @@ import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.io.UnsupportedEncodingException;
 import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.AbstractExecutorService;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * <p>
- *     Abstraction to select a "leader" amongst multiple contenders in a group of JMVs connected
- *     to a Zookeeper cluster. If a group of N thread/processes contends for leadership, one will
- *     be assigned leader until it releases leadership at which time another one from the group will
- *     be chosen.
+ * Abstraction to select a "leader" amongst multiple contenders in a group of JMVs connected
+ * to a Zookeeper cluster. If a group of N thread/processes contends for leadership, one will
+ * be assigned leader until it releases leadership at which time another one from the group will
+ * be chosen.
  * </p>
- *
+ * <p/>
  * <p>
- *     Note that this class uses an underlying {@link InterProcessMutex} and as a result leader
- *     election is "fair" - each user will become leader in the order originally requested
- *     (from ZK's point of view).
+ * Note that this class uses an underlying {@link InterProcessMutex} and as a result leader
+ * election is "fair" - each user will become leader in the order originally requested
+ * (from ZK's point of view).
  * </p>
- *
  */
 public class LeaderSelector implements Closeable
 {
-    private final Logger                    log = LoggerFactory.getLogger(getClass());
-    private final CuratorFramework          client;
-    private final LeaderSelectorListener    listener;
-    private final ExecutorService           executorService;
-    private final Executor                  executor;
-    private final InterProcessMutex         mutex;
-    private final AtomicReference<State>    state = new AtomicReference<State>(State.LATENT);
-    private final AtomicBoolean             autoRequeue = new AtomicBoolean(false);
-
-    private volatile boolean                hasLeadership;
-    private volatile String                 id = "";
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final CuratorFramework client;
+    private final LeaderSelectorListener listener;
+    private final CloseableExecutorService executorService;
+    private final InterProcessMutex mutex;
+    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+    private final AtomicBoolean autoRequeue = new AtomicBoolean(false);
+
+    private volatile boolean hasLeadership;
+    private volatile String id = "";
 
     private enum State
     {
@@ -76,28 +78,41 @@ public class LeaderSelector implements Closeable
     }
 
     // guarded by synchronization
-    private boolean                isQueued = false;
+    private boolean isQueued = false;
 
     private static final ThreadFactory defaultThreadFactory = ThreadUtils.newThreadFactory("LeaderSelector");
 
     /**
-     * @param client the client
+     * @param client     the client
      * @param leaderPath the path for this leadership group
-     * @param listener listener
+     * @param listener   listener
      */
     public LeaderSelector(CuratorFramework client, String leaderPath, LeaderSelectorListener listener)
     {
-        this(client, leaderPath, defaultThreadFactory, MoreExecutors.sameThreadExecutor(), listener);
+        this(client, leaderPath, Executors.newSingleThreadExecutor(defaultThreadFactory), listener);
+    }
+
+    /**
+     * @param client        the client
+     * @param leaderPath    the path for this leadership group
+     * @param threadFactory factory to use for making internal threads
+     * @param executor      the executor to run in
+     * @param listener      listener
+     * @deprecated This constructor was poorly thought out. Custom executor is useless. Use this version instead: {@link #LeaderSelector(CuratorFramework, String, ExecutorService, LeaderSelectorListener)}
+     */
+    @Deprecated
+    public LeaderSelector(CuratorFramework client, String leaderPath, ThreadFactory threadFactory, Executor executor, LeaderSelectorListener listener)
+    {
+        this(client, leaderPath, wrapExecutor(threadFactory, executor), listener);
     }
 
     /**
      * @param client the client
      * @param leaderPath the path for this leadership group
-     * @param threadFactory factory to use for making internal threads
-     * @param executor the executor to run in
+     * @param executorService thread pool to use
      * @param listener listener
      */
-    public LeaderSelector(CuratorFramework client, String leaderPath, ThreadFactory threadFactory, Executor executor, LeaderSelectorListener listener)
+    public LeaderSelector(CuratorFramework client, String leaderPath, ExecutorService executorService, LeaderSelectorListener listener)
     {
         Preconditions.checkNotNull(client, "client cannot be null");
         Preconditions.checkNotNull(leaderPath, "leaderPath cannot be null");
@@ -105,10 +120,9 @@ public class LeaderSelector implements Closeable
 
         this.client = client;
         this.listener = listener;
-        this.executor = executor;
         hasLeadership = false;
 
-        executorService = Executors.newFixedThreadPool(1, threadFactory);
+        this.executorService = new CloseableExecutorService(executorService);
         mutex = new InterProcessMutex(client, leaderPath)
         {
             @Override
@@ -148,10 +162,10 @@ public class LeaderSelector implements Closeable
      *
      * @param id ID
      */
-    public void     setId(String id)
+    public void setId(String id)
     {
         Preconditions.checkNotNull(id, "id cannot be null");
-        
+
         this.id = id;
     }
 
@@ -171,7 +185,7 @@ public class LeaderSelector implements Closeable
      * <b>IMPORTANT: </b> previous versions allowed this method to be called multiple times. This
      * is no longer supported. Use {@link #requeue()} for this purpose.
      */
-    public void     start()
+    public void start()
     {
         Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
 
@@ -189,7 +203,7 @@ public class LeaderSelector implements Closeable
      *
      * @return true if re-queue is successful
      */
-    public synchronized boolean     requeue()
+    public synchronized boolean requeue()
     {
         Preconditions.checkState(state.get() == State.STARTED, "close() has already been called");
 
@@ -197,17 +211,17 @@ public class LeaderSelector implements Closeable
         {
             isQueued = true;
             executorService.submit
-            (
-                new Callable<Object>()
-                {
-                    @Override
-                    public Object call() throws Exception
+                (
+                    new Callable<Object>()
                     {
-                        doWorkLoop();
-                        return null;
+                        @Override
+                        public Object call() throws Exception
+                        {
+                            doWorkLoop();
+                            return null;
+                        }
                     }
-                }
-            );
+                );
 
             return true;
         }
@@ -217,29 +231,29 @@ public class LeaderSelector implements Closeable
     /**
      * Shutdown this selector and remove yourself from the leadership group
      */
-    public void     close()
+    public void close()
     {
         Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started");
 
         client.getConnectionStateListenable().removeListener(listener);
-        executorService.shutdownNow();
+        executorService.close();
     }
 
     /**
      * <p>
-     *     Returns the set of current participants in the leader selection
+     * Returns the set of current participants in the leader selection
      * </p>
-     *
+     * <p/>
      * <p>
-     *     <B>NOTE</B> - this method polls the ZK server. Therefore it can possibly
-     *     return a value that does not match {@link #hasLeadership()} as hasLeadership
-     *     uses a local field of the class.
+     * <B>NOTE</B> - this method polls the ZK server. Therefore it can possibly
+     * return a value that does not match {@link #hasLeadership()} as hasLeadership
+     * uses a local field of the class.
      * </p>
      *
      * @return participants
      * @throws Exception ZK errors, interruptions, etc.
      */
-    public Collection<Participant>  getParticipants() throws Exception
+    public Collection<Participant> getParticipants() throws Exception
     {
         Collection<String> participantNodes = mutex.getParticipantNodes();
 
@@ -250,12 +264,12 @@ public class LeaderSelector implements Closeable
     {
         ImmutableList.Builder<Participant> builder = ImmutableList.builder();
 
-        boolean         isLeader = true;
+        boolean isLeader = true;
         for ( String path : participantNodes )
         {
             try
             {
-                Participant     participant = participantForPath(client, path, isLeader);
+                Participant participant = participantForPath(client, path, isLeader);
                 builder.add(participant);
             }
             catch ( KeeperException.NoNodeException ignore )
@@ -271,22 +285,22 @@ public class LeaderSelector implements Closeable
 
     /**
      * <p>
-     *     Return the id for the current leader. If for some reason there is no
-     *     current leader, a dummy participant is returned.
+     * Return the id for the current leader. If for some reason there is no
+     * current leader, a dummy participant is returned.
      * </p>
-     *
+     * <p/>
      * <p>
-     *     <B>NOTE</B> - this method polls the ZK server. Therefore it can possibly
-     *     return a value that does not match {@link #hasLeadership()} as hasLeadership
-     *     uses a local field of the class.
+     * <B>NOTE</B> - this method polls the ZK server. Therefore it can possibly
+     * return a value that does not match {@link #hasLeadership()} as hasLeadership
+     * uses a local field of the class.
      * </p>
      *
      * @return leader
      * @throws Exception ZK errors, interruptions, etc.
      */
-    public Participant      getLeader() throws Exception
+    public Participant getLeader() throws Exception
     {
-        Collection<String>      participantNodes = mutex.getParticipantNodes();
+        Collection<String> participantNodes = mutex.getParticipantNodes();
         return getLeader(client, participantNodes);
     }
 
@@ -311,8 +325,8 @@ public class LeaderSelector implements Closeable
 
     private static Participant participantForPath(CuratorFramework client, String path, boolean markAsLeader) throws Exception
     {
-        byte[]      bytes = client.getData().forPath(path);
-        String      thisId = new String(bytes, "UTF-8");
+        byte[] bytes = client.getData().forPath(path);
+        String thisId = new String(bytes, "UTF-8");
         return new Participant(thisId, markAsLeader);
     }
 
@@ -325,32 +339,22 @@ public class LeaderSelector implements Closeable
             mutex.acquire();
 
             hasLeadership = true;
-            executor.execute
-            (
-                new Runnable()
-                {
-                    @Override
-                    public void run()
-                    {
-                        try
-                        {
-                            listener.takeLeadership(client);
-                        }
-                        catch ( InterruptedException e )
-                        {
-                            Thread.currentThread().interrupt();
-                        }
-                        catch ( Throwable e )
-                        {
-                            log.error("The leader threw an exception", e);
-                        }
-                        finally
-                        {
-                            clearIsQueued();
-                        }
-                    }
-                }
-            );
+            try
+            {
+                listener.takeLeadership(client);
+            }
+            catch ( InterruptedException e )
+            {
+                Thread.currentThread().interrupt();
+            }
+            catch ( Throwable e )
+            {
+                log.error("The leader threw an exception", e);
+            }
+            finally
+            {
+                clearIsQueued();
+            }
         }
         catch ( InterruptedException e )
         {
@@ -401,11 +405,56 @@ public class LeaderSelector implements Closeable
             {
                 throw exception;
             }
-        } while ( autoRequeue.get() && (state.get() == State.STARTED) && !Thread.currentThread().isInterrupted() );
+        }
+        while ( autoRequeue.get() && (state.get() == State.STARTED) && !Thread.currentThread().isInterrupted() );
     }
 
     private synchronized void clearIsQueued()
     {
         isQueued = false;
     }
+
+    // temporary wrapper for deprecated constructor
+    private static ExecutorService wrapExecutor(ThreadFactory threadFactory, final Executor executor)
+    {
+        final ExecutorService service = Executors.newSingleThreadExecutor(threadFactory);
+        return new AbstractExecutorService()
+        {
+            @Override
+            public void shutdown()
+            {
+                service.shutdown();
+            }
+
+            @Override
+            public List<Runnable> shutdownNow()
+            {
+                return service.shutdownNow();
+            }
+
+            @Override
+            public boolean isShutdown()
+            {
+                return service.isShutdown();
+            }
+
+            @Override
+            public boolean isTerminated()
+            {
+                return service.isTerminated();
+            }
+
+            @Override
+            public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
+            {
+                return service.awaitTermination(timeout, unit);
+            }
+
+            @Override
+            public void execute(Runnable command)
+            {
+                executor.execute(command);
+            }
+        };
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/50e8fbb8/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelectorWithExecutor.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelectorWithExecutor.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelectorWithExecutor.java
new file mode 100644
index 0000000..bbfc24f
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelectorWithExecutor.java
@@ -0,0 +1,99 @@
+package org.apache.curator.framework.recipes.leader;
+
+import com.google.common.io.Closeables;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.BaseClassForTests;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.ThreadUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class TestLeaderSelectorWithExecutor extends BaseClassForTests
+{
+    private static final ThreadFactory threadFactory = ThreadUtils.newThreadFactory("FeedGenerator");
+
+    @Test
+    public void test() throws Exception
+    {
+        Timing timing = new Timing();
+        LeaderSelector leaderSelector = null;
+        CuratorFramework client = CuratorFrameworkFactory.builder()
+            .retryPolicy(new ExponentialBackoffRetry(100, 3))
+            .connectString(server.getConnectString())
+            .sessionTimeoutMs(timing.session())
+            .connectionTimeoutMs(timing.connection())
+            .build();
+        try
+        {
+            client.start();
+
+            MyLeaderSelectorListener listener = new MyLeaderSelectorListener();
+            ExecutorService executorPool = Executors.newFixedThreadPool(20);
+            leaderSelector = new LeaderSelector(client, "/test", threadFactory, executorPool, listener);
+
+            leaderSelector.autoRequeue();
+            leaderSelector.start();
+
+            timing.sleepABit();
+
+            Assert.assertEquals(listener.getLeaderCount(), 1);
+        }
+        finally
+        {
+            Closeables.closeQuietly(leaderSelector);
+            Closeables.closeQuietly(client);
+        }
+    }
+
+    private class MyLeaderSelectorListener implements LeaderSelectorListener
+    {
+        private volatile Thread ourThread;
+        private final AtomicInteger leaderCount = new AtomicInteger(0);
+
+        public int getLeaderCount()
+        {
+            return leaderCount.get();
+        }
+
+        @Override
+        public void takeLeadership(CuratorFramework curatorFramework) throws Exception
+        {
+            ourThread = Thread.currentThread();
+            try
+            {
+                leaderCount.incrementAndGet();
+                while ( !Thread.currentThread().isInterrupted() )
+                {
+                    Thread.sleep(1000);
+                }
+            }
+            catch ( InterruptedException e )
+            {
+                Thread.currentThread().interrupt();
+            }
+            finally
+            {
+                leaderCount.decrementAndGet();
+            }
+        }
+
+        @Override
+        public void stateChanged(CuratorFramework curatorFramework, ConnectionState newState)
+        {
+            if ( (newState == ConnectionState.LOST) || (newState == ConnectionState.SUSPENDED) )
+            {
+                if ( ourThread != null )
+                {
+                    ourThread.interrupt();
+                }
+            }
+        }
+    }
+}