You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2015/04/21 23:47:18 UTC
[2/2] curator git commit: sync on holder for safety during multi-step
operations
sync on holder for safety during multi-step operations
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/c62b1137
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/c62b1137
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/c62b1137
Branch: refs/heads/CURATOR-164
Commit: c62b1137fa25104f2e24d65e467d0cfc769bd6e2
Parents: fa0c9da
Author: randgalt <ra...@apache.org>
Authored: Tue Apr 21 16:47:10 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Apr 21 16:47:10 2015 -0500
----------------------------------------------------------------------
.../discovery/details/ServiceDiscoveryImpl.java | 86 ++++++++++++--------
1 file changed, 53 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/c62b1137/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
index 80b012e..ba18e42 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
@@ -58,6 +58,7 @@ import java.util.concurrent.atomic.AtomicReference;
/**
* A mechanism to register and query service instances using ZooKeeper
*/
+@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
{
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -207,16 +208,19 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
{
clean();
- Holder<T> holder = getOrMakeHolder(service, null);
- if ( holder.state.get() == State.UNREGISTERED )
+ final Holder<T> holder = getOrMakeHolder(service, null);
+ synchronized(holder)
{
- throw new Exception("Service has been unregistered: " + service);
- }
+ if ( holder.state.get() == State.UNREGISTERED )
+ {
+ throw new Exception("Service has been unregistered: " + service);
+ }
- holder.service.set(service);
- byte[] bytes = serializer.serialize(service);
- String path = pathForInstance(service.getName(), service.getId());
- client.setData().forPath(path, bytes);
+ holder.service.set(service);
+ byte[] bytes = serializer.serialize(service);
+ String path = pathForInstance(service.getName(), service.getId());
+ client.setData().forPath(path, bytes);
+ }
}
@VisibleForTesting
@@ -455,17 +459,27 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
@VisibleForTesting
ServiceInstance<T> getRegisteredService(String id)
{
- Holder<T> holder = services.get(id);
- return ((holder != null) && (holder.state.get() == State.REGISTERED)) ? holder.service.get() : null;
+ final Holder<T> holder = services.get(id);
+ if ( holder != null )
+ {
+ synchronized(holder)
+ {
+ return (holder.state.get() == State.REGISTERED) ? holder.service.get() : null;
+ }
+ }
+ return null;
}
private void reRegisterServices() throws Exception
{
- for ( Holder<T> service : services.values() )
+ for ( final Holder<T> holder : services.values() )
{
- if ( service.state.get() == State.REGISTERED )
+ synchronized(holder)
{
- internalRegisterService(service.service.get());
+ if ( holder.state.get() == State.REGISTERED )
+ {
+ internalRegisterService(holder.service.get());
+ }
}
}
}
@@ -529,39 +543,45 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
Iterator<Holder<T>> iterator = services.values().iterator();
while ( iterator.hasNext() )
{
- Holder<T> holder = iterator.next();
- if ( holder.state.get() == State.UNREGISTERED )
+ final Holder<T> holder = iterator.next();
+ synchronized(holder)
{
- long elapsed = System.currentTimeMillis() - holder.stateChangeMs.get();
- if ( elapsed >= CLEAN_THRESHOLD_MS )
+ if ( holder.state.get() == State.UNREGISTERED )
{
- iterator.remove();
+ long elapsed = System.currentTimeMillis() - holder.stateChangeMs.get();
+ if ( elapsed >= CLEAN_THRESHOLD_MS )
+ {
+ iterator.remove();
+ }
}
}
}
}
}
- private void internalUnregisterService(Holder<T> holder) throws Exception
+ private void internalUnregisterService(final Holder<T> holder) throws Exception
{
if ( holder != null )
{
- holder.setState(State.UNREGISTERED);
- NodeCache cache = holder.cache.getAndSet(null);
- if ( cache != null )
+ synchronized(holder)
{
- CloseableUtils.closeQuietly(cache);
- }
+ holder.setState(State.UNREGISTERED);
+ NodeCache cache = holder.cache.getAndSet(null);
+ if ( cache != null )
+ {
+ CloseableUtils.closeQuietly(cache);
+ }
- ServiceInstance<T> service = holder.service.get();
- String path = pathForInstance(service.getName(), service.getId());
- try
- {
- client.delete().guaranteed().forPath(path);
- }
- catch ( KeeperException.NoNodeException ignore )
- {
- // ignore
+ ServiceInstance<T> service = holder.service.get();
+ String path = pathForInstance(service.getName(), service.getId());
+ try
+ {
+ client.delete().guaranteed().forPath(path);
+ }
+ catch ( KeeperException.NoNodeException ignore )
+ {
+ // ignore
+ }
}
}
}