You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by dr...@apache.org on 2015/08/17 19:02:20 UTC
[12/41] curator git commit: [CURATOR-223] Add executorService methods
to ServiceCacheBuilder
[CURATOR-223] Add executorService methods to ServiceCacheBuilder
Add executorService methods to ServiceCacheBuilder to allow the caller to specify
an ExecutorService or a CloseableExecutorService to be used by the PathChildrenCache
embedded in ServiceCacheImpl.
Extracts ExecuteCalledWatchingExecutorService (and DelegatingExecutorService) into
the curator-test module for use by TestServiceCache.
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/6ca77776
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/6ca77776
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/6ca77776
Branch: refs/heads/CURATOR-3.0
Commit: 6ca77776d3d2c71b1e541c0edd60d2c17efe9c66
Parents: 20e92a5
Author: Tom Dyas <td...@foursquare.com>
Authored: Tue Jun 16 17:38:18 2015 -0400
Committer: Tom Dyas <td...@foursquare.com>
Committed: Wed Jun 17 13:03:17 2015 -0400
----------------------------------------------------------------------
.../recipes/cache/TestPathChildrenCache.java | 124 +------------------
.../curator/test/DelegatingExecutorService.java | 119 ++++++++++++++++++
.../ExecuteCalledWatchingExecutorService.java | 48 +++++++
.../x/discovery/ServiceCacheBuilder.java | 24 +++-
.../details/ServiceCacheBuilderImpl.java | 39 +++++-
.../x/discovery/details/ServiceCacheImpl.java | 17 ++-
.../curator/x/discovery/TestServiceCache.java | 53 ++++++++
7 files changed, 297 insertions(+), 127 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/6ca77776/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
index b904bdc..216660f 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
@@ -31,6 +31,7 @@ import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.imps.CuratorFrameworkImpl;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.ExecuteCalledWatchingExecutorService;
import org.apache.curator.test.KillSession;
import org.apache.curator.test.Timing;
import org.apache.curator.utils.CloseableUtils;
@@ -1039,127 +1040,4 @@ public class TestPathChildrenCache extends BaseClassForTests
CloseableUtils.closeQuietly(client);
}
}
-
- public static class ExecuteCalledWatchingExecutorService extends DelegatingExecutorService
- {
- boolean executeCalled = false;
-
- public ExecuteCalledWatchingExecutorService(ExecutorService delegate)
- {
- super(delegate);
- }
-
- @Override
- public synchronized void execute(Runnable command)
- {
- executeCalled = true;
- super.execute(command);
- }
-
- public synchronized boolean isExecuteCalled()
- {
- return executeCalled;
- }
-
- public synchronized void setExecuteCalled(boolean executeCalled)
- {
- this.executeCalled = executeCalled;
- }
- }
-
- public static class DelegatingExecutorService implements ExecutorService
- {
- private final ExecutorService delegate;
-
- public DelegatingExecutorService(
- ExecutorService delegate
- )
- {
- this.delegate = delegate;
- }
-
-
- @Override
- public void shutdown()
- {
- delegate.shutdown();
- }
-
- @Override
- public List<Runnable> shutdownNow()
- {
- return delegate.shutdownNow();
- }
-
- @Override
- public boolean isShutdown()
- {
- return delegate.isShutdown();
- }
-
- @Override
- public boolean isTerminated()
- {
- return delegate.isTerminated();
- }
-
- @Override
- public boolean awaitTermination(long timeout, TimeUnit unit)
- throws InterruptedException
- {
- return delegate.awaitTermination(timeout, unit);
- }
-
- @Override
- public <T> Future<T> submit(Callable<T> task)
- {
- return delegate.submit(task);
- }
-
- @Override
- public <T> Future<T> submit(Runnable task, T result)
- {
- return delegate.submit(task, result);
- }
-
- @Override
- public Future<?> submit(Runnable task)
- {
- return delegate.submit(task);
- }
-
- @Override
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
- throws InterruptedException
- {
- return delegate.invokeAll(tasks);
- }
-
- @Override
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
- throws InterruptedException
- {
- return delegate.invokeAll(tasks, timeout, unit);
- }
-
- @Override
- public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
- throws InterruptedException, ExecutionException
- {
- return delegate.invokeAny(tasks);
- }
-
- @Override
- public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException
- {
- return delegate.invokeAny(tasks, timeout, unit);
- }
-
- @Override
- public void execute(Runnable command)
- {
- delegate.execute(command);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/curator/blob/6ca77776/curator-test/src/main/java/org/apache/curator/test/DelegatingExecutorService.java
----------------------------------------------------------------------
diff --git a/curator-test/src/main/java/org/apache/curator/test/DelegatingExecutorService.java b/curator-test/src/main/java/org/apache/curator/test/DelegatingExecutorService.java
new file mode 100644
index 0000000..eff34dd
--- /dev/null
+++ b/curator-test/src/main/java/org/apache/curator/test/DelegatingExecutorService.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.test;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.*;
+
+public class DelegatingExecutorService implements ExecutorService
+{
+ private final ExecutorService delegate;
+
+ public DelegatingExecutorService(
+ ExecutorService delegate
+ )
+ {
+ this.delegate = delegate;
+ }
+
+
+ @Override
+ public void shutdown()
+ {
+ delegate.shutdown();
+ }
+
+ @Override
+ public List<Runnable> shutdownNow()
+ {
+ return delegate.shutdownNow();
+ }
+
+ @Override
+ public boolean isShutdown()
+ {
+ return delegate.isShutdown();
+ }
+
+ @Override
+ public boolean isTerminated()
+ {
+ return delegate.isTerminated();
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit)
+ throws InterruptedException
+ {
+ return delegate.awaitTermination(timeout, unit);
+ }
+
+ @Override
+ public <T> Future<T> submit(Callable<T> task)
+ {
+ return delegate.submit(task);
+ }
+
+ @Override
+ public <T> Future<T> submit(Runnable task, T result)
+ {
+ return delegate.submit(task, result);
+ }
+
+ @Override
+ public Future<?> submit(Runnable task)
+ {
+ return delegate.submit(task);
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+ throws InterruptedException
+ {
+ return delegate.invokeAll(tasks);
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+ throws InterruptedException
+ {
+ return delegate.invokeAll(tasks, timeout, unit);
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+ throws InterruptedException, ExecutionException
+ {
+ return delegate.invokeAny(tasks);
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException
+ {
+ return delegate.invokeAny(tasks, timeout, unit);
+ }
+
+ @Override
+ public void execute(Runnable command)
+ {
+ delegate.execute(command);
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/6ca77776/curator-test/src/main/java/org/apache/curator/test/ExecuteCalledWatchingExecutorService.java
----------------------------------------------------------------------
diff --git a/curator-test/src/main/java/org/apache/curator/test/ExecuteCalledWatchingExecutorService.java b/curator-test/src/main/java/org/apache/curator/test/ExecuteCalledWatchingExecutorService.java
new file mode 100644
index 0000000..da7bc66
--- /dev/null
+++ b/curator-test/src/main/java/org/apache/curator/test/ExecuteCalledWatchingExecutorService.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.test;
+
+import java.util.concurrent.ExecutorService;
+
+public class ExecuteCalledWatchingExecutorService extends DelegatingExecutorService
+{
+ boolean executeCalled = false;
+
+ public ExecuteCalledWatchingExecutorService(ExecutorService delegate)
+ {
+ super(delegate);
+ }
+
+ @Override
+ public synchronized void execute(Runnable command)
+ {
+ executeCalled = true;
+ super.execute(command);
+ }
+
+ public synchronized boolean isExecuteCalled()
+ {
+ return executeCalled;
+ }
+
+ public synchronized void setExecuteCalled(boolean executeCalled)
+ {
+ this.executeCalled = executeCalled;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/6ca77776/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java
index 10ce305..290d9b1 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java
@@ -18,6 +18,8 @@
*/
package org.apache.curator.x.discovery;
+import org.apache.curator.utils.CloseableExecutorService;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
public interface ServiceCacheBuilder<T>
@@ -38,10 +40,30 @@ public interface ServiceCacheBuilder<T>
public ServiceCacheBuilder<T> name(String name);
/**
- * Optional thread factory to use for the cache's internal thread
+ * Optional thread factory to use for the cache's internal thread. The specified ExecutorService
+ * overrides any prior ThreadFactory or ExecutorService set on the ServiceCacheBuilder.
*
* @param threadFactory factory
* @return this
*/
public ServiceCacheBuilder<T> threadFactory(ThreadFactory threadFactory);
+
+ /**
+ * Optional ExecutorService to use for the cache's background thread. The specified ExecutorService
+ * will be wrapped in a CloseableExecutorService and overrides any prior ThreadFactory or ExecutorService
+ * set on the ServiceCacheBuilder.
+ *
+ * @param executorService executor service
+ * @return this
+ */
+ public ServiceCacheBuilder<T> executorService(ExecutorService executorService);
+
+ /**
+ * Optional CloseableExecutorService to use for the cache's background thread. The specified ExecutorService
+ * overrides any prior ThreadFactory or ExecutorService set on the ServiceCacheBuilder.
+ *
+ * @param executorService an instance of CloseableExecutorService
+ * @return this
+ */
+ public ServiceCacheBuilder<T> executorService(CloseableExecutorService executorService);
}
http://git-wip-us.apache.org/repos/asf/curator/blob/6ca77776/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java
index c4104f4..8922233 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java
@@ -18,8 +18,10 @@
*/
package org.apache.curator.x.discovery.details;
+import org.apache.curator.utils.CloseableExecutorService;
import org.apache.curator.x.discovery.ServiceCache;
import org.apache.curator.x.discovery.ServiceCacheBuilder;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
/**
@@ -30,6 +32,7 @@ class ServiceCacheBuilderImpl<T> implements ServiceCacheBuilder<T>
private ServiceDiscoveryImpl<T> discovery;
private String name;
private ThreadFactory threadFactory;
+ private CloseableExecutorService executorService;
ServiceCacheBuilderImpl(ServiceDiscoveryImpl<T> discovery)
{
@@ -44,7 +47,14 @@ class ServiceCacheBuilderImpl<T> implements ServiceCacheBuilder<T>
@Override
public ServiceCache<T> build()
{
- return new ServiceCacheImpl<T>(discovery, name, threadFactory);
+ if (executorService != null)
+ {
+ return new ServiceCacheImpl<T>(discovery, name, executorService);
+ }
+ else
+ {
+ return new ServiceCacheImpl<T>(discovery, name, threadFactory);
+ }
}
/**
@@ -70,6 +80,33 @@ class ServiceCacheBuilderImpl<T> implements ServiceCacheBuilder<T>
public ServiceCacheBuilder<T> threadFactory(ThreadFactory threadFactory)
{
this.threadFactory = threadFactory;
+ this.executorService = null;
+ return this;
+ }
+
+ /**
+ * Optional executor service to use for the cache's background thread
+ *
+ * @param executorService executor service
+ * @return this
+ */
+ @Override
+ public ServiceCacheBuilder<T> executorService(ExecutorService executorService) {
+ this.executorService = new CloseableExecutorService(executorService);
+ this.threadFactory = null;
+ return this;
+ }
+
+ /**
+ * Optional CloseableExecutorService to use for the cache's background thread
+ *
+ * @param executorService an instance of CloseableExecutorService
+ * @return this
+ */
+ @Override
+ public ServiceCacheBuilder<T> executorService(CloseableExecutorService executorService) {
+ this.executorService = executorService;
+ this.threadFactory = null;
return this;
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/6ca77776/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
index 0269d24..b8f39d5 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
@@ -22,6 +22,7 @@ import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import org.apache.curator.utils.CloseableExecutorService;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.listen.ListenerContainer;
@@ -36,6 +37,7 @@ import java.io.IOException;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicReference;
@@ -54,15 +56,26 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
STOPPED
}
+ private static CloseableExecutorService convertThreadFactory(ThreadFactory threadFactory)
+ {
+ Preconditions.checkNotNull(threadFactory, "threadFactory cannot be null");
+ return new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory));
+ }
+
ServiceCacheImpl(ServiceDiscoveryImpl<T> discovery, String name, ThreadFactory threadFactory)
{
+ this(discovery, name, convertThreadFactory(threadFactory));
+ }
+
+ ServiceCacheImpl(ServiceDiscoveryImpl<T> discovery, String name, CloseableExecutorService executorService)
+ {
Preconditions.checkNotNull(discovery, "discovery cannot be null");
Preconditions.checkNotNull(name, "name cannot be null");
- Preconditions.checkNotNull(threadFactory, "threadFactory cannot be null");
+ Preconditions.checkNotNull(executorService, "executorService cannot be null");
this.discovery = discovery;
- cache = new PathChildrenCache(discovery.getClient(), discovery.pathForName(name), true, threadFactory);
+ cache = new PathChildrenCache(discovery.getClient(), discovery.pathForName(name), true, false, executorService);
cache.getListenable().addListener(this);
}
http://git-wip-us.apache.org/repos/asf/curator/blob/6ca77776/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
index be114d4..5850961 100644
--- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
+++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
@@ -20,6 +20,7 @@ package org.apache.curator.x.discovery;
import com.google.common.collect.Lists;
import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.ExecuteCalledWatchingExecutorService;
import org.apache.curator.test.Timing;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.framework.CuratorFramework;
@@ -35,6 +36,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -255,4 +257,55 @@ public class TestServiceCache extends BaseClassForTests
}
}
}
+
+ @Test
+ public void testExecutorServiceIsInvoked() throws Exception {
+ List<Closeable> closeables = Lists.newArrayList();
+ try {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ closeables.add(client);
+ client.start();
+
+ ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/discovery").client(client).build();
+ closeables.add(discovery);
+ discovery.start();
+
+ ExecuteCalledWatchingExecutorService exec = new ExecuteCalledWatchingExecutorService(Executors.newSingleThreadExecutor());
+ Assert.assertFalse(exec.isExecuteCalled());
+
+ ServiceCache<String> cache = discovery.serviceCacheBuilder().name("test").executorService(exec).build();
+ closeables.add(cache);
+ cache.start();
+
+ final Semaphore semaphore = new Semaphore(0);
+ ServiceCacheListener listener = new ServiceCacheListener()
+ {
+ @Override
+ public void cacheChanged()
+ {
+ semaphore.release();
+ }
+
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ }
+ };
+ cache.addListener(listener);
+
+ ServiceInstance<String> instance1 = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
+ discovery.registerService(instance1);
+ Assert.assertTrue(semaphore.tryAcquire(10, TimeUnit.SECONDS));
+
+ Assert.assertTrue(exec.isExecuteCalled());
+ }
+ finally
+ {
+ Collections.reverse(closeables);
+ for ( Closeable c : closeables )
+ {
+ CloseableUtils.closeQuietly(c);
+ }
+ }
+ }
}