You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by dr...@apache.org on 2015/08/17 19:02:20 UTC

[12/41] curator git commit: [CURATOR-223] Add executorService methods to ServiceCacheBuilder

[CURATOR-223] Add executorService methods to ServiceCacheBuilder

Add executorService methods to ServiceCacheBuilder to allow the caller to specify
an ExecutorService or a CloseableExecutorService to be used by the PathChildrenCache
embedded in ServiceCacheImpl.

Extracts ExecuteCalledWatchingExecutorService (and DelegatingExecutorService) into
the curator-test module for use by TestServiceCache.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/6ca77776
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/6ca77776
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/6ca77776

Branch: refs/heads/CURATOR-3.0
Commit: 6ca77776d3d2c71b1e541c0edd60d2c17efe9c66
Parents: 20e92a5
Author: Tom Dyas <td...@foursquare.com>
Authored: Tue Jun 16 17:38:18 2015 -0400
Committer: Tom Dyas <td...@foursquare.com>
Committed: Wed Jun 17 13:03:17 2015 -0400

----------------------------------------------------------------------
 .../recipes/cache/TestPathChildrenCache.java    | 124 +------------------
 .../curator/test/DelegatingExecutorService.java | 119 ++++++++++++++++++
 .../ExecuteCalledWatchingExecutorService.java   |  48 +++++++
 .../x/discovery/ServiceCacheBuilder.java        |  24 +++-
 .../details/ServiceCacheBuilderImpl.java        |  39 +++++-
 .../x/discovery/details/ServiceCacheImpl.java   |  17 ++-
 .../curator/x/discovery/TestServiceCache.java   |  53 ++++++++
 7 files changed, 297 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/6ca77776/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
index b904bdc..216660f 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
@@ -31,6 +31,7 @@ import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.imps.CuratorFrameworkImpl;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.ExecuteCalledWatchingExecutorService;
 import org.apache.curator.test.KillSession;
 import org.apache.curator.test.Timing;
 import org.apache.curator.utils.CloseableUtils;
@@ -1039,127 +1040,4 @@ public class TestPathChildrenCache extends BaseClassForTests
             CloseableUtils.closeQuietly(client);
         }
     }
-
-    public static class ExecuteCalledWatchingExecutorService extends DelegatingExecutorService
-    {
-        boolean executeCalled = false;
-
-        public ExecuteCalledWatchingExecutorService(ExecutorService delegate)
-        {
-            super(delegate);
-        }
-
-        @Override
-        public synchronized void execute(Runnable command)
-        {
-            executeCalled = true;
-            super.execute(command);
-        }
-
-        public synchronized boolean isExecuteCalled()
-        {
-            return executeCalled;
-        }
-
-        public synchronized void setExecuteCalled(boolean executeCalled)
-        {
-            this.executeCalled = executeCalled;
-        }
-    }
-
-    public static class DelegatingExecutorService implements ExecutorService
-    {
-        private final ExecutorService delegate;
-
-        public DelegatingExecutorService(
-                ExecutorService delegate
-        )
-        {
-            this.delegate = delegate;
-        }
-
-
-        @Override
-        public void shutdown()
-        {
-            delegate.shutdown();
-        }
-
-        @Override
-        public List<Runnable> shutdownNow()
-        {
-            return delegate.shutdownNow();
-        }
-
-        @Override
-        public boolean isShutdown()
-        {
-            return delegate.isShutdown();
-        }
-
-        @Override
-        public boolean isTerminated()
-        {
-            return delegate.isTerminated();
-        }
-
-        @Override
-        public boolean awaitTermination(long timeout, TimeUnit unit)
-                throws InterruptedException
-        {
-            return delegate.awaitTermination(timeout, unit);
-        }
-
-        @Override
-        public <T> Future<T> submit(Callable<T> task)
-        {
-            return delegate.submit(task);
-        }
-
-        @Override
-        public <T> Future<T> submit(Runnable task, T result)
-        {
-            return delegate.submit(task, result);
-        }
-
-        @Override
-        public Future<?> submit(Runnable task)
-        {
-            return delegate.submit(task);
-        }
-
-        @Override
-        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
-                throws InterruptedException
-        {
-            return delegate.invokeAll(tasks);
-        }
-
-        @Override
-        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
-                throws InterruptedException
-        {
-            return delegate.invokeAll(tasks, timeout, unit);
-        }
-
-        @Override
-        public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
-                throws InterruptedException, ExecutionException
-        {
-            return delegate.invokeAny(tasks);
-        }
-
-        @Override
-        public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
-                throws InterruptedException, ExecutionException, TimeoutException
-        {
-            return delegate.invokeAny(tasks, timeout, unit);
-        }
-
-        @Override
-        public void execute(Runnable command)
-        {
-            delegate.execute(command);
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/6ca77776/curator-test/src/main/java/org/apache/curator/test/DelegatingExecutorService.java
----------------------------------------------------------------------
diff --git a/curator-test/src/main/java/org/apache/curator/test/DelegatingExecutorService.java b/curator-test/src/main/java/org/apache/curator/test/DelegatingExecutorService.java
new file mode 100644
index 0000000..eff34dd
--- /dev/null
+++ b/curator-test/src/main/java/org/apache/curator/test/DelegatingExecutorService.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.test;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.*;
+
+public class DelegatingExecutorService implements ExecutorService
+{
+    private final ExecutorService delegate;
+
+    public DelegatingExecutorService(
+            ExecutorService delegate
+    )
+    {
+        this.delegate = delegate;
+    }
+
+
+    @Override
+    public void shutdown()
+    {
+        delegate.shutdown();
+    }
+
+    @Override
+    public List<Runnable> shutdownNow()
+    {
+        return delegate.shutdownNow();
+    }
+
+    @Override
+    public boolean isShutdown()
+    {
+        return delegate.isShutdown();
+    }
+
+    @Override
+    public boolean isTerminated()
+    {
+        return delegate.isTerminated();
+    }
+
+    @Override
+    public boolean awaitTermination(long timeout, TimeUnit unit)
+            throws InterruptedException
+    {
+        return delegate.awaitTermination(timeout, unit);
+    }
+
+    @Override
+    public <T> Future<T> submit(Callable<T> task)
+    {
+        return delegate.submit(task);
+    }
+
+    @Override
+    public <T> Future<T> submit(Runnable task, T result)
+    {
+        return delegate.submit(task, result);
+    }
+
+    @Override
+    public Future<?> submit(Runnable task)
+    {
+        return delegate.submit(task);
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+            throws InterruptedException
+    {
+        return delegate.invokeAll(tasks);
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+            throws InterruptedException
+    {
+        return delegate.invokeAll(tasks, timeout, unit);
+    }
+
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+            throws InterruptedException, ExecutionException
+    {
+        return delegate.invokeAny(tasks);
+    }
+
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+            throws InterruptedException, ExecutionException, TimeoutException
+    {
+        return delegate.invokeAny(tasks, timeout, unit);
+    }
+
+    @Override
+    public void execute(Runnable command)
+    {
+        delegate.execute(command);
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/6ca77776/curator-test/src/main/java/org/apache/curator/test/ExecuteCalledWatchingExecutorService.java
----------------------------------------------------------------------
diff --git a/curator-test/src/main/java/org/apache/curator/test/ExecuteCalledWatchingExecutorService.java b/curator-test/src/main/java/org/apache/curator/test/ExecuteCalledWatchingExecutorService.java
new file mode 100644
index 0000000..da7bc66
--- /dev/null
+++ b/curator-test/src/main/java/org/apache/curator/test/ExecuteCalledWatchingExecutorService.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.test;
+
+import java.util.concurrent.ExecutorService;
+
+public class ExecuteCalledWatchingExecutorService extends DelegatingExecutorService
+{
+    boolean executeCalled = false;
+
+    public ExecuteCalledWatchingExecutorService(ExecutorService delegate)
+    {
+        super(delegate);
+    }
+
+    @Override
+    public synchronized void execute(Runnable command)
+    {
+        executeCalled = true;
+        super.execute(command);
+    }
+
+    public synchronized boolean isExecuteCalled()
+    {
+        return executeCalled;
+    }
+
+    public synchronized void setExecuteCalled(boolean executeCalled)
+    {
+        this.executeCalled = executeCalled;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/6ca77776/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java
index 10ce305..290d9b1 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java
@@ -18,6 +18,8 @@
  */
 package org.apache.curator.x.discovery;
 
+import org.apache.curator.utils.CloseableExecutorService;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadFactory;
 
 public interface ServiceCacheBuilder<T>
@@ -38,10 +40,30 @@ public interface ServiceCacheBuilder<T>
     public ServiceCacheBuilder<T> name(String name);
 
     /**
-     * Optional thread factory to use for the cache's internal thread
+     * Optional thread factory to use for the cache's internal thread. The specified ExecutorService
+     * overrides any prior ThreadFactory or ExecutorService set on the ServiceCacheBuilder.
      *
      * @param threadFactory factory
      * @return this
      */
     public ServiceCacheBuilder<T> threadFactory(ThreadFactory threadFactory);
+
+    /**
+     * Optional ExecutorService to use for the cache's background thread. The specified ExecutorService
+     * will be wrapped in a CloseableExecutorService and overrides any prior ThreadFactory or ExecutorService
+     * set on the ServiceCacheBuilder.
+     *
+     * @param executorService executor service
+     * @return this
+     */
+    public ServiceCacheBuilder<T> executorService(ExecutorService executorService);
+
+    /**
+     * Optional CloseableExecutorService to use for the cache's background thread. The specified ExecutorService
+     * overrides any prior ThreadFactory or ExecutorService set on the ServiceCacheBuilder.
+     *
+     * @param executorService an instance of CloseableExecutorService
+     * @return this
+     */
+    public ServiceCacheBuilder<T> executorService(CloseableExecutorService executorService);
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/6ca77776/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java
index c4104f4..8922233 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java
@@ -18,8 +18,10 @@
  */
 package org.apache.curator.x.discovery.details;
 
+import org.apache.curator.utils.CloseableExecutorService;
 import org.apache.curator.x.discovery.ServiceCache;
 import org.apache.curator.x.discovery.ServiceCacheBuilder;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadFactory;
 
 /**
@@ -30,6 +32,7 @@ class ServiceCacheBuilderImpl<T> implements ServiceCacheBuilder<T>
     private ServiceDiscoveryImpl<T> discovery;
     private String name;
     private ThreadFactory threadFactory;
+    private CloseableExecutorService executorService;
 
     ServiceCacheBuilderImpl(ServiceDiscoveryImpl<T> discovery)
     {
@@ -44,7 +47,14 @@ class ServiceCacheBuilderImpl<T> implements ServiceCacheBuilder<T>
     @Override
     public ServiceCache<T> build()
     {
-        return new ServiceCacheImpl<T>(discovery, name, threadFactory);
+        if (executorService != null)
+        {
+            return new ServiceCacheImpl<T>(discovery, name, executorService);
+        }
+        else
+        {
+            return new ServiceCacheImpl<T>(discovery, name, threadFactory);
+        }
     }
 
     /**
@@ -70,6 +80,33 @@ class ServiceCacheBuilderImpl<T> implements ServiceCacheBuilder<T>
     public ServiceCacheBuilder<T> threadFactory(ThreadFactory threadFactory)
     {
         this.threadFactory = threadFactory;
+        this.executorService = null;
+        return this;
+    }
+
+    /**
+     * Optional executor service to use for the cache's background thread
+     *
+     * @param executorService executor service
+     * @return this
+     */
+    @Override
+    public ServiceCacheBuilder<T> executorService(ExecutorService executorService) {
+        this.executorService = new CloseableExecutorService(executorService);
+        this.threadFactory = null;
+        return this;
+    }
+
+    /**
+     * Optional CloseableExecutorService to use for the cache's background thread
+     *
+     * @param executorService an instance of CloseableExecutorService
+     * @return this
+     */
+    @Override
+    public ServiceCacheBuilder<T> executorService(CloseableExecutorService executorService) {
+        this.executorService = executorService;
+        this.threadFactory = null;
         return this;
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/6ca77776/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
index 0269d24..b8f39d5 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
@@ -22,6 +22,7 @@ import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import org.apache.curator.utils.CloseableExecutorService;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.listen.ListenerContainer;
@@ -36,6 +37,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -54,15 +56,26 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
         STOPPED
     }
 
+    private static CloseableExecutorService convertThreadFactory(ThreadFactory threadFactory)
+    {
+        Preconditions.checkNotNull(threadFactory, "threadFactory cannot be null");
+        return new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory));
+    }
+
     ServiceCacheImpl(ServiceDiscoveryImpl<T> discovery, String name, ThreadFactory threadFactory)
     {
+        this(discovery, name, convertThreadFactory(threadFactory));
+    }
+
+    ServiceCacheImpl(ServiceDiscoveryImpl<T> discovery, String name, CloseableExecutorService executorService)
+    {
         Preconditions.checkNotNull(discovery, "discovery cannot be null");
         Preconditions.checkNotNull(name, "name cannot be null");
-        Preconditions.checkNotNull(threadFactory, "threadFactory cannot be null");
+        Preconditions.checkNotNull(executorService, "executorService cannot be null");
 
         this.discovery = discovery;
 
-        cache = new PathChildrenCache(discovery.getClient(), discovery.pathForName(name), true, threadFactory);
+        cache = new PathChildrenCache(discovery.getClient(), discovery.pathForName(name), true, false, executorService);
         cache.getListenable().addListener(this);
     }
 

http://git-wip-us.apache.org/repos/asf/curator/blob/6ca77776/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
index be114d4..5850961 100644
--- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
+++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
@@ -20,6 +20,7 @@ package org.apache.curator.x.discovery;
 
 import com.google.common.collect.Lists;
 import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.ExecuteCalledWatchingExecutorService;
 import org.apache.curator.test.Timing;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
@@ -35,6 +36,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
@@ -255,4 +257,55 @@ public class TestServiceCache extends BaseClassForTests
             }
         }
     }
+
+    @Test
+    public void testExecutorServiceIsInvoked() throws Exception {
+        List<Closeable> closeables = Lists.newArrayList();
+        try {
+            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+            closeables.add(client);
+            client.start();
+
+            ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/discovery").client(client).build();
+            closeables.add(discovery);
+            discovery.start();
+
+            ExecuteCalledWatchingExecutorService exec = new ExecuteCalledWatchingExecutorService(Executors.newSingleThreadExecutor());
+            Assert.assertFalse(exec.isExecuteCalled());
+
+            ServiceCache<String> cache = discovery.serviceCacheBuilder().name("test").executorService(exec).build();
+            closeables.add(cache);
+            cache.start();
+
+            final Semaphore semaphore = new Semaphore(0);
+            ServiceCacheListener listener = new ServiceCacheListener()
+            {
+                @Override
+                public void cacheChanged()
+                {
+                    semaphore.release();
+                }
+
+                @Override
+                public void stateChanged(CuratorFramework client, ConnectionState newState)
+                {
+                }
+            };
+            cache.addListener(listener);
+
+            ServiceInstance<String>     instance1 = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
+            discovery.registerService(instance1);
+            Assert.assertTrue(semaphore.tryAcquire(10, TimeUnit.SECONDS));
+
+            Assert.assertTrue(exec.isExecuteCalled());
+        }
+        finally
+        {
+            Collections.reverse(closeables);
+            for ( Closeable c : closeables )
+            {
+                CloseableUtils.closeQuietly(c);
+            }
+        }
+    }
 }