You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2015/04/22 00:44:57 UTC

curator git commit: moved holder into separate class so that it's easier to reason about and lock

Repository: curator
Updated Branches:
  refs/heads/CURATOR-164 c62b1137f -> d6a51f4ae


moved holder into separate class so that it's easier to reason about and lock


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/d6a51f4a
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/d6a51f4a
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/d6a51f4a

Branch: refs/heads/CURATOR-164
Commit: d6a51f4ae9a0365fba1ae77b9780d9bb43a79c72
Parents: c62b113
Author: randgalt <ra...@apache.org>
Authored: Tue Apr 21 17:44:53 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Apr 21 17:44:53 2015 -0500

----------------------------------------------------------------------
 .../curator/x/discovery/details/Holder.java     | 173 +++++++++++++++++++
 .../discovery/details/ServiceDiscoveryImpl.java | 120 ++++++-------
 .../discovery/details/TestServiceDiscovery.java |   3 -
 3 files changed, 227 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/d6a51f4a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/Holder.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/Holder.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/Holder.java
new file mode 100644
index 0000000..69c7667
--- /dev/null
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/Holder.java
@@ -0,0 +1,173 @@
+package org.apache.curator.x.discovery.details;
+
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.apache.curator.x.discovery.ServiceInstance;
+import java.util.concurrent.locks.ReentrantLock;
+
+class Holder<T>
+{
+    enum State
+    {
+        NEW,
+        REGISTERED,
+        UNREGISTERED
+    }
+
+    private ServiceInstance<T> service;
+    private NodeCache cache;
+    private State state;
+    private long stateChangeMs;
+    private final ReentrantLock lock = new ReentrantLock();
+
+    Holder(ServiceInstance<T> service)
+    {
+        this.service = service;
+        setState(State.NEW);
+    }
+
+    ServiceInstance<T> getService()
+    {
+        lock.lock();
+        try
+        {
+            return service;
+        }
+        finally
+        {
+            lock.unlock();
+        }
+    }
+
+    ServiceInstance<T> getServiceIfRegistered()
+    {
+        lock.lock();
+        try
+        {
+            return (state == State.REGISTERED) ? service : null;
+        }
+        finally
+        {
+            lock.unlock();
+        }
+    }
+
+    void setService(ServiceInstance<T> service)
+    {
+        lock.lock();
+        try
+        {
+            this.service = service;
+        }
+        finally
+        {
+            lock.unlock();
+        }
+    }
+
+    NodeCache getCache()
+    {
+        lock.lock();
+        try
+        {
+            return cache;
+        }
+        finally
+        {
+            lock.unlock();
+        }
+    }
+
+    NodeCache getAndClearCache()
+    {
+        lock.lock();
+        try
+        {
+            NodeCache localCache = cache;
+            cache = null;
+            return localCache;
+        }
+        finally
+        {
+            lock.unlock();
+        }
+    }
+
+    void setCache(NodeCache cache)
+    {
+        lock.lock();
+        try
+        {
+            this.cache = cache;
+        }
+        finally
+        {
+            lock.unlock();
+        }
+    }
+
+    State getState()
+    {
+        lock.lock();
+        try
+        {
+            return state;
+        }
+        finally
+        {
+            lock.unlock();
+        }
+    }
+
+    boolean isRegistered()
+    {
+        lock.lock();
+        try
+        {
+            return state == State.REGISTERED;
+        }
+        finally
+        {
+            lock.unlock();
+        }
+    }
+
+    boolean isLapsedUnregistered(int cleanThresholdMs)
+    {
+        lock.lock();
+        try
+        {
+            if ( state == State.UNREGISTERED )
+            {
+                long elapsed = System.currentTimeMillis() - stateChangeMs;
+                if ( elapsed >= cleanThresholdMs )
+                {
+                    return true;
+                }
+            }
+            return false;
+        }
+        finally
+        {
+            lock.unlock();
+        }
+    }
+
+    void setState(State state)
+    {
+        lock.lock();
+        try
+        {
+            this.state = state;
+            stateChangeMs = System.currentTimeMillis();
+        }
+        finally
+        {
+            lock.unlock();
+        }
+    }
+
+    ReentrantLock getLock()
+    {
+        return lock;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/d6a51f4a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
index ba18e42..ec049fd 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
@@ -21,7 +21,9 @@ package org.apache.curator.x.discovery.details;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -53,12 +55,10 @@ import java.util.List;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * A mechanism to register and query service instances using ZooKeeper
  */
-@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
 public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
@@ -92,33 +92,6 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
 
     private final int CLEAN_THRESHOLD_MS = Integer.getInteger("curator-discovery-clean-threshold-ms", (int)TimeUnit.MINUTES.toMillis(5));
 
-    private enum State
-    {
-        NEW,
-        REGISTERED,
-        UNREGISTERED
-    }
-
-    private static class Holder<T>
-    {
-        private final AtomicReference<ServiceInstance<T>> service = new AtomicReference<ServiceInstance<T>>();
-        private final AtomicReference<NodeCache> cache = new AtomicReference<NodeCache>();
-        private final AtomicReference<State> state = new AtomicReference<State>();
-        private final AtomicLong stateChangeMs = new AtomicLong();
-
-        public Holder(ServiceInstance<T> instance)
-        {
-            service.set(instance);
-            setState(State.NEW);
-        }
-
-        public void setState(State state)
-        {
-            this.state.set(state);
-            stateChangeMs.set(System.currentTimeMillis());
-        }
-    }
-
     /**
      * @param client the client
      * @param basePath base path to store data
@@ -181,7 +154,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
             }
             catch ( Exception e )
             {
-                log.error("Could not unregister instance: " + holder.service.get().getName(), e);
+                log.error("Could not unregister instance: " + holder.getService().getName(), e);
             }
         }
 
@@ -204,23 +177,28 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
     }
 
     @Override
-    public void updateService(ServiceInstance<T> service) throws Exception
+    public void updateService(final ServiceInstance<T> service) throws Exception
     {
         clean();
 
-        final Holder<T> holder = getOrMakeHolder(service, null);
-        synchronized(holder)
+        Holder<T> holder = getOrMakeHolder(service, null);
+        holder.getLock().lock();
+        try
         {
-            if ( holder.state.get() == State.UNREGISTERED )
+            if ( holder.getState() == Holder.State.UNREGISTERED )
             {
                 throw new Exception("Service has been unregistered: " + service);
             }
 
-            holder.service.set(service);
+            holder.setService(service);
             byte[] bytes = serializer.serialize(service);
             String path = pathForInstance(service.getName(), service.getId());
             client.setData().forPath(path, bytes);
         }
+        finally
+        {
+            holder.getLock().unlock();
+        }
     }
 
     @VisibleForTesting
@@ -418,7 +396,21 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
     @VisibleForTesting
     int debugServicesQty()
     {
-        return services.size();
+        return Iterables.size
+            (
+                Iterables.filter
+                    (
+                        services.values(),
+                        new Predicate<Holder<T>>()
+                        {
+                            @Override
+                            public boolean apply(Holder<T> holder)
+                            {
+                                return holder.isRegistered();
+                            }
+                        }
+                    )
+            );
     }
 
     private List<String> getChildrenWatched(String path, Watcher watcher, boolean recurse) throws Exception
@@ -459,28 +451,26 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
     @VisibleForTesting
     ServiceInstance<T> getRegisteredService(String id)
     {
-        final Holder<T> holder = services.get(id);
-        if ( holder != null )
-        {
-            synchronized(holder)
-            {
-                return (holder.state.get() == State.REGISTERED) ? holder.service.get() : null;
-            }
-        }
-        return null;
+        Holder<T> holder = services.get(id);
+        return (holder != null) ? holder.getServiceIfRegistered() : null;
     }
 
     private void reRegisterServices() throws Exception
     {
         for ( final Holder<T> holder : services.values() )
         {
-            synchronized(holder)
+            holder.getLock().lock();
+            try
             {
-                if ( holder.state.get() == State.REGISTERED )
+                if ( holder.isRegistered() )
                 {
-                    internalRegisterService(holder.service.get());
+                    internalRegisterService(holder.getService());
                 }
             }
+            finally
+            {
+                holder.getLock().unlock();
+            }
         }
     }
 
@@ -488,7 +478,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
     {
         final NodeCache nodeCache = watchInstances ? new NodeCache(client, pathForInstance(instance.getName(), instance.getId())) : null;
         Holder<T> holder = getOrMakeHolder(instance, nodeCache);
-        holder.setState(State.REGISTERED);
+        holder.setState(Holder.State.REGISTERED);
 
         if ( nodeCache != null )
         {
@@ -511,7 +501,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
                         Holder<T> holder = services.get(newInstance.getId());
                         if ( holder != null )
                         {
-                            holder.service.set(newInstance);
+                            holder.setService(newInstance);
                         }
                     }
                     else
@@ -529,7 +519,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
         Holder<T> newHolder = new Holder<T>(instance);
         Holder<T> oldHolder = services.putIfAbsent(instance.getId(), newHolder);
         Holder<T> useHolder = (oldHolder != null) ? oldHolder : newHolder;
-        useHolder.cache.set(nodeCache);
+        useHolder.setCache(nodeCache);
         return useHolder;
     }
 
@@ -540,20 +530,13 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
         long elpased = now - localLastCleanMs;
         if ( (elpased >= CLEAN_THRESHOLD_MS) && lastCleanMs.compareAndSet(localLastCleanMs, now + 1) )
         {
-            Iterator<Holder<T>> iterator = services.values().iterator();
+            final Iterator<Holder<T>> iterator = services.values().iterator();
             while ( iterator.hasNext() )
             {
-                final Holder<T> holder = iterator.next();
-                synchronized(holder)
+                Holder<T> holder = iterator.next();
+                if ( holder.isLapsedUnregistered(CLEAN_THRESHOLD_MS) )
                 {
-                    if ( holder.state.get() == State.UNREGISTERED )
-                    {
-                        long elapsed = System.currentTimeMillis() - holder.stateChangeMs.get();
-                        if ( elapsed >= CLEAN_THRESHOLD_MS )
-                        {
-                            iterator.remove();
-                        }
-                    }
+                    iterator.remove();
                 }
             }
         }
@@ -563,16 +546,17 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
     {
         if ( holder != null )
         {
-            synchronized(holder)
+            holder.getLock().lock();
+            try
             {
-                holder.setState(State.UNREGISTERED);
-                NodeCache cache = holder.cache.getAndSet(null);
+                holder.setState(Holder.State.UNREGISTERED);
+                NodeCache cache = holder.getAndClearCache();
                 if ( cache != null )
                 {
                     CloseableUtils.closeQuietly(cache);
                 }
 
-                ServiceInstance<T> service = holder.service.get();
+                ServiceInstance<T> service = holder.getService();
                 String path = pathForInstance(service.getName(), service.getId());
                 try
                 {
@@ -583,6 +567,10 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
                     // ignore
                 }
             }
+            finally
+            {
+                holder.getLock().unlock();
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/d6a51f4a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
index 2808c5c..f60773f 100644
--- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
+++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
@@ -31,9 +31,6 @@ import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.x.discovery.ServiceDiscovery;
 import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
 import org.apache.curator.x.discovery.ServiceInstance;
-import org.apache.curator.x.discovery.details.InstanceSerializer;
-import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
-import org.apache.curator.x.discovery.details.ServiceDiscoveryImpl;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.io.Closeable;