You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2015/04/21 23:47:18 UTC

[2/2] curator git commit: sync on holder for safety during multi-step operations

sync on holder for safety during multi-step operations


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/c62b1137
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/c62b1137
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/c62b1137

Branch: refs/heads/CURATOR-164
Commit: c62b1137fa25104f2e24d65e467d0cfc769bd6e2
Parents: fa0c9da
Author: randgalt <ra...@apache.org>
Authored: Tue Apr 21 16:47:10 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Apr 21 16:47:10 2015 -0500

----------------------------------------------------------------------
 .../discovery/details/ServiceDiscoveryImpl.java | 86 ++++++++++++--------
 1 file changed, 53 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/c62b1137/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
index 80b012e..ba18e42 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
@@ -58,6 +58,7 @@ import java.util.concurrent.atomic.AtomicReference;
 /**
  * A mechanism to register and query service instances using ZooKeeper
  */
+@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
 public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
@@ -207,16 +208,19 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
     {
         clean();
 
-        Holder<T> holder = getOrMakeHolder(service, null);
-        if ( holder.state.get() == State.UNREGISTERED )
+        final Holder<T> holder = getOrMakeHolder(service, null);
+        synchronized(holder)
         {
-            throw new Exception("Service has been unregistered: " + service);
-        }
+            if ( holder.state.get() == State.UNREGISTERED )
+            {
+                throw new Exception("Service has been unregistered: " + service);
+            }
 
-        holder.service.set(service);
-        byte[] bytes = serializer.serialize(service);
-        String path = pathForInstance(service.getName(), service.getId());
-        client.setData().forPath(path, bytes);
+            holder.service.set(service);
+            byte[] bytes = serializer.serialize(service);
+            String path = pathForInstance(service.getName(), service.getId());
+            client.setData().forPath(path, bytes);
+        }
     }
 
     @VisibleForTesting
@@ -455,17 +459,27 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
     @VisibleForTesting
     ServiceInstance<T> getRegisteredService(String id)
     {
-        Holder<T> holder = services.get(id);
-        return ((holder != null) && (holder.state.get() == State.REGISTERED)) ? holder.service.get() : null;
+        final Holder<T> holder = services.get(id);
+        if ( holder != null )
+        {
+            synchronized(holder)
+            {
+                return (holder.state.get() == State.REGISTERED) ? holder.service.get() : null;
+            }
+        }
+        return null;
     }
 
     private void reRegisterServices() throws Exception
     {
-        for ( Holder<T> service : services.values() )
+        for ( final Holder<T> holder : services.values() )
         {
-            if ( service.state.get() == State.REGISTERED )
+            synchronized(holder)
             {
-                internalRegisterService(service.service.get());
+                if ( holder.state.get() == State.REGISTERED )
+                {
+                    internalRegisterService(holder.service.get());
+                }
             }
         }
     }
@@ -529,39 +543,45 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
             Iterator<Holder<T>> iterator = services.values().iterator();
             while ( iterator.hasNext() )
             {
-                Holder<T> holder = iterator.next();
-                if ( holder.state.get() == State.UNREGISTERED )
+                final Holder<T> holder = iterator.next();
+                synchronized(holder)
                 {
-                    long elapsed = System.currentTimeMillis() - holder.stateChangeMs.get();
-                    if ( elapsed >= CLEAN_THRESHOLD_MS )
+                    if ( holder.state.get() == State.UNREGISTERED )
                     {
-                        iterator.remove();
+                        long elapsed = System.currentTimeMillis() - holder.stateChangeMs.get();
+                        if ( elapsed >= CLEAN_THRESHOLD_MS )
+                        {
+                            iterator.remove();
+                        }
                     }
                 }
             }
         }
     }
 
-    private void internalUnregisterService(Holder<T> holder) throws Exception
+    private void internalUnregisterService(final Holder<T> holder) throws Exception
     {
         if ( holder != null )
         {
-            holder.setState(State.UNREGISTERED);
-            NodeCache cache = holder.cache.getAndSet(null);
-            if ( cache != null )
+            synchronized(holder)
             {
-                CloseableUtils.closeQuietly(cache);
-            }
+                holder.setState(State.UNREGISTERED);
+                NodeCache cache = holder.cache.getAndSet(null);
+                if ( cache != null )
+                {
+                    CloseableUtils.closeQuietly(cache);
+                }
 
-            ServiceInstance<T> service = holder.service.get();
-            String path = pathForInstance(service.getName(), service.getId());
-            try
-            {
-                client.delete().guaranteed().forPath(path);
-            }
-            catch ( KeeperException.NoNodeException ignore )
-            {
-                // ignore
+                ServiceInstance<T> service = holder.service.get();
+                String path = pathForInstance(service.getName(), service.getId());
+                try
+                {
+                    client.delete().guaranteed().forPath(path);
+                }
+                catch ( KeeperException.NoNodeException ignore )
+                {
+                    // ignore
+                }
             }
         }
     }