You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2015/04/22 00:44:57 UTC
curator git commit: moved holder into separate class so that it's
easier to reason about and lock
Repository: curator
Updated Branches:
refs/heads/CURATOR-164 c62b1137f -> d6a51f4ae
moved holder into separate class so that it's easier to reason about and lock
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/d6a51f4a
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/d6a51f4a
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/d6a51f4a
Branch: refs/heads/CURATOR-164
Commit: d6a51f4ae9a0365fba1ae77b9780d9bb43a79c72
Parents: c62b113
Author: randgalt <ra...@apache.org>
Authored: Tue Apr 21 17:44:53 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Apr 21 17:44:53 2015 -0500
----------------------------------------------------------------------
.../curator/x/discovery/details/Holder.java | 173 +++++++++++++++++++
.../discovery/details/ServiceDiscoveryImpl.java | 120 ++++++-------
.../discovery/details/TestServiceDiscovery.java | 3 -
3 files changed, 227 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/d6a51f4a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/Holder.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/Holder.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/Holder.java
new file mode 100644
index 0000000..69c7667
--- /dev/null
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/Holder.java
@@ -0,0 +1,173 @@
+package org.apache.curator.x.discovery.details;
+
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.apache.curator.x.discovery.ServiceInstance;
+import java.util.concurrent.locks.ReentrantLock;
+
+class Holder<T>
+{
+ enum State
+ {
+ NEW,
+ REGISTERED,
+ UNREGISTERED
+ }
+
+ private ServiceInstance<T> service;
+ private NodeCache cache;
+ private State state;
+ private long stateChangeMs;
+ private final ReentrantLock lock = new ReentrantLock();
+
+ Holder(ServiceInstance<T> service)
+ {
+ this.service = service;
+ setState(State.NEW);
+ }
+
+ ServiceInstance<T> getService()
+ {
+ lock.lock();
+ try
+ {
+ return service;
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ ServiceInstance<T> getServiceIfRegistered()
+ {
+ lock.lock();
+ try
+ {
+ return (state == State.REGISTERED) ? service : null;
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ void setService(ServiceInstance<T> service)
+ {
+ lock.lock();
+ try
+ {
+ this.service = service;
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ NodeCache getCache()
+ {
+ lock.lock();
+ try
+ {
+ return cache;
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ NodeCache getAndClearCache()
+ {
+ lock.lock();
+ try
+ {
+ NodeCache localCache = cache;
+ cache = null;
+ return localCache;
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ void setCache(NodeCache cache)
+ {
+ lock.lock();
+ try
+ {
+ this.cache = cache;
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ State getState()
+ {
+ lock.lock();
+ try
+ {
+ return state;
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ boolean isRegistered()
+ {
+ lock.lock();
+ try
+ {
+ return state == State.REGISTERED;
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ boolean isLapsedUnregistered(int cleanThresholdMs)
+ {
+ lock.lock();
+ try
+ {
+ if ( state == State.UNREGISTERED )
+ {
+ long elapsed = System.currentTimeMillis() - stateChangeMs;
+ if ( elapsed >= cleanThresholdMs )
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ void setState(State state)
+ {
+ lock.lock();
+ try
+ {
+ this.state = state;
+ stateChangeMs = System.currentTimeMillis();
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ ReentrantLock getLock()
+ {
+ return lock;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/d6a51f4a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
index ba18e42..ec049fd 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
@@ -21,7 +21,9 @@ package org.apache.curator.x.discovery.details;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -53,12 +55,10 @@ import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
/**
* A mechanism to register and query service instances using ZooKeeper
*/
-@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
{
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -92,33 +92,6 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
private final int CLEAN_THRESHOLD_MS = Integer.getInteger("curator-discovery-clean-threshold-ms", (int)TimeUnit.MINUTES.toMillis(5));
- private enum State
- {
- NEW,
- REGISTERED,
- UNREGISTERED
- }
-
- private static class Holder<T>
- {
- private final AtomicReference<ServiceInstance<T>> service = new AtomicReference<ServiceInstance<T>>();
- private final AtomicReference<NodeCache> cache = new AtomicReference<NodeCache>();
- private final AtomicReference<State> state = new AtomicReference<State>();
- private final AtomicLong stateChangeMs = new AtomicLong();
-
- public Holder(ServiceInstance<T> instance)
- {
- service.set(instance);
- setState(State.NEW);
- }
-
- public void setState(State state)
- {
- this.state.set(state);
- stateChangeMs.set(System.currentTimeMillis());
- }
- }
-
/**
* @param client the client
* @param basePath base path to store data
@@ -181,7 +154,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
}
catch ( Exception e )
{
- log.error("Could not unregister instance: " + holder.service.get().getName(), e);
+ log.error("Could not unregister instance: " + holder.getService().getName(), e);
}
}
@@ -204,23 +177,28 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
}
@Override
- public void updateService(ServiceInstance<T> service) throws Exception
+ public void updateService(final ServiceInstance<T> service) throws Exception
{
clean();
- final Holder<T> holder = getOrMakeHolder(service, null);
- synchronized(holder)
+ Holder<T> holder = getOrMakeHolder(service, null);
+ holder.getLock().lock();
+ try
{
- if ( holder.state.get() == State.UNREGISTERED )
+ if ( holder.getState() == Holder.State.UNREGISTERED )
{
throw new Exception("Service has been unregistered: " + service);
}
- holder.service.set(service);
+ holder.setService(service);
byte[] bytes = serializer.serialize(service);
String path = pathForInstance(service.getName(), service.getId());
client.setData().forPath(path, bytes);
}
+ finally
+ {
+ holder.getLock().unlock();
+ }
}
@VisibleForTesting
@@ -418,7 +396,21 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
@VisibleForTesting
int debugServicesQty()
{
- return services.size();
+ return Iterables.size
+ (
+ Iterables.filter
+ (
+ services.values(),
+ new Predicate<Holder<T>>()
+ {
+ @Override
+ public boolean apply(Holder<T> holder)
+ {
+ return holder.isRegistered();
+ }
+ }
+ )
+ );
}
private List<String> getChildrenWatched(String path, Watcher watcher, boolean recurse) throws Exception
@@ -459,28 +451,26 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
@VisibleForTesting
ServiceInstance<T> getRegisteredService(String id)
{
- final Holder<T> holder = services.get(id);
- if ( holder != null )
- {
- synchronized(holder)
- {
- return (holder.state.get() == State.REGISTERED) ? holder.service.get() : null;
- }
- }
- return null;
+ Holder<T> holder = services.get(id);
+ return (holder != null) ? holder.getServiceIfRegistered() : null;
}
private void reRegisterServices() throws Exception
{
for ( final Holder<T> holder : services.values() )
{
- synchronized(holder)
+ holder.getLock().lock();
+ try
{
- if ( holder.state.get() == State.REGISTERED )
+ if ( holder.isRegistered() )
{
- internalRegisterService(holder.service.get());
+ internalRegisterService(holder.getService());
}
}
+ finally
+ {
+ holder.getLock().unlock();
+ }
}
}
@@ -488,7 +478,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
{
final NodeCache nodeCache = watchInstances ? new NodeCache(client, pathForInstance(instance.getName(), instance.getId())) : null;
Holder<T> holder = getOrMakeHolder(instance, nodeCache);
- holder.setState(State.REGISTERED);
+ holder.setState(Holder.State.REGISTERED);
if ( nodeCache != null )
{
@@ -511,7 +501,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
Holder<T> holder = services.get(newInstance.getId());
if ( holder != null )
{
- holder.service.set(newInstance);
+ holder.setService(newInstance);
}
}
else
@@ -529,7 +519,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
Holder<T> newHolder = new Holder<T>(instance);
Holder<T> oldHolder = services.putIfAbsent(instance.getId(), newHolder);
Holder<T> useHolder = (oldHolder != null) ? oldHolder : newHolder;
- useHolder.cache.set(nodeCache);
+ useHolder.setCache(nodeCache);
return useHolder;
}
@@ -540,20 +530,13 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
long elpased = now - localLastCleanMs;
if ( (elpased >= CLEAN_THRESHOLD_MS) && lastCleanMs.compareAndSet(localLastCleanMs, now + 1) )
{
- Iterator<Holder<T>> iterator = services.values().iterator();
+ final Iterator<Holder<T>> iterator = services.values().iterator();
while ( iterator.hasNext() )
{
- final Holder<T> holder = iterator.next();
- synchronized(holder)
+ Holder<T> holder = iterator.next();
+ if ( holder.isLapsedUnregistered(CLEAN_THRESHOLD_MS) )
{
- if ( holder.state.get() == State.UNREGISTERED )
- {
- long elapsed = System.currentTimeMillis() - holder.stateChangeMs.get();
- if ( elapsed >= CLEAN_THRESHOLD_MS )
- {
- iterator.remove();
- }
- }
+ iterator.remove();
}
}
}
@@ -563,16 +546,17 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
{
if ( holder != null )
{
- synchronized(holder)
+ holder.getLock().lock();
+ try
{
- holder.setState(State.UNREGISTERED);
- NodeCache cache = holder.cache.getAndSet(null);
+ holder.setState(Holder.State.UNREGISTERED);
+ NodeCache cache = holder.getAndClearCache();
if ( cache != null )
{
CloseableUtils.closeQuietly(cache);
}
- ServiceInstance<T> service = holder.service.get();
+ ServiceInstance<T> service = holder.getService();
String path = pathForInstance(service.getName(), service.getId());
try
{
@@ -583,6 +567,10 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
// ignore
}
}
+ finally
+ {
+ holder.getLock().unlock();
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/d6a51f4a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
index 2808c5c..f60773f 100644
--- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
+++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
@@ -31,9 +31,6 @@ import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
-import org.apache.curator.x.discovery.details.InstanceSerializer;
-import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
-import org.apache.curator.x.discovery.details.ServiceDiscoveryImpl;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.io.Closeable;