You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2015/01/12 18:31:18 UTC

[1/3] curator git commit: Use NodeCache instead of manually watching. It's safer and more Curator-like

Repository: curator
Updated Branches:
  refs/heads/master a0a676e3e -> 2474454fb


Use NodeCache instead of manually watching. It's safer and more Curator-like


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/11dad798
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/11dad798
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/11dad798

Branch: refs/heads/master
Commit: 11dad79838b4dc00d6a5ebc59c55fe124f55d22c
Parents: 37dc447
Author: randgalt <ra...@apache.org>
Authored: Mon Jan 12 12:29:44 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jan 12 12:29:44 2015 -0500

----------------------------------------------------------------------
 .../discovery/details/ServiceDiscoveryImpl.java | 92 ++++++++++----------
 1 file changed, 48 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/11dad798/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
index ca8eabe..3a92e7a 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
@@ -25,10 +25,8 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.BackgroundCallback;
-import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.api.CuratorEventType;
-import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.apache.curator.framework.recipes.cache.NodeCacheListener;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.utils.CloseableUtils;
@@ -44,7 +42,6 @@ import org.apache.curator.x.discovery.ServiceType;
 import org.apache.curator.x.discovery.strategies.RoundRobinStrategy;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,9 +61,9 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
     private final String basePath;
     private final InstanceSerializer<T> serializer;
     private final Map<String, ServiceInstance<T>> services = Maps.newConcurrentMap();
+    private final Map<String, NodeCache> watchedServices;
     private final Collection<ServiceCache<T>> caches = Sets.newSetFromMap(Maps.<ServiceCache<T>, Boolean>newConcurrentMap());
     private final Collection<ServiceProvider<T>> providers = Sets.newSetFromMap(Maps.<ServiceProvider<T>, Boolean>newConcurrentMap());
-    private final boolean watchInstances;
     private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
     {
         @Override
@@ -96,13 +93,13 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
      */
     public ServiceDiscoveryImpl(CuratorFramework client, String basePath, InstanceSerializer<T> serializer, ServiceInstance<T> thisInstance, boolean watchInstances)
     {
-        this.watchInstances = watchInstances;
         this.client = Preconditions.checkNotNull(client, "client cannot be null");
         this.basePath = Preconditions.checkNotNull(basePath, "basePath cannot be null");
         this.serializer = Preconditions.checkNotNull(serializer, "serializer cannot be null");
+        watchedServices = watchInstances ? Maps.<String, NodeCache>newConcurrentMap() : null;
         if ( thisInstance != null )
         {
-            services.put(thisInstance.getId(), thisInstance);
+            setService(thisInstance);
         }
     }
 
@@ -129,6 +126,13 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
         {
             CloseableUtils.closeQuietly(provider);
         }
+        if ( watchedServices != null )
+        {
+            for ( NodeCache nodeCache : watchedServices.values() )
+            {
+                CloseableUtils.closeQuietly(nodeCache);
+            }
+        }
 
         Iterator<ServiceInstance<T>> it = services.values().iterator();
         while ( it.hasNext() )
@@ -171,7 +175,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
     @Override
     public void registerService(ServiceInstance<T> service) throws Exception
     {
-        services.put(service.getId(), service);
+        setService(service);
         internalRegisterService(service);
     }
 
@@ -197,10 +201,6 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
             {
                 CreateMode      mode = (service.getServiceType() == ServiceType.DYNAMIC) ? CreateMode.EPHEMERAL : CreateMode.PERSISTENT;
                 client.create().creatingParentsIfNeeded().withMode(mode).forPath(path, bytes);
-                if ( watchInstances )
-                {
-                    resetWatchedInstance(service);
-                }
                 isDone = true;
             }
             catch ( KeeperException.NodeExistsException e )
@@ -374,37 +374,6 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
         return builder.build();
     }
 
-    private void resetWatchedInstance(final ServiceInstance<T> service) throws Exception
-    {
-        CuratorWatcher watcher = new CuratorWatcher()
-        {
-            @Override
-            public void process(WatchedEvent event) throws Exception
-            {
-                if ( event.getType() == Watcher.Event.EventType.NodeDataChanged )
-                {
-                    resetWatchedInstance(service);
-                }
-            }
-        };
-
-        BackgroundCallback callback = new BackgroundCallback()
-        {
-            @Override
-            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
-            {
-                if ( event.getType() == CuratorEventType.GET_DATA )
-                {
-                    ServiceInstance<T> newInstance = serializer.deserialize(event.getData());
-                    services.put(newInstance.getId(), newInstance);
-                }
-            }
-        };
-
-        String path = pathForInstance(service.getName(), service.getId());
-        client.getData().usingWatcher(watcher).inBackground(callback).forPath(path);
-    }
-
     private List<String> getChildrenWatched(String path, Watcher watcher, boolean recurse) throws Exception
     {
         List<String>    instanceIds;
@@ -453,4 +422,39 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
             internalRegisterService(service);
         }
     }
+
+    private void setService(final ServiceInstance<T> instance)
+    {
+        services.put(instance.getId(), instance);
+        if ( watchedServices != null )
+        {
+            final NodeCache nodeCache = new NodeCache(client, pathForInstance(instance.getName(), instance.getId()));
+            try
+            {
+                nodeCache.start(true);
+            }
+            catch ( Exception e )
+            {
+                log.error("Could not start node cache for: " + instance, e);
+            }
+            NodeCacheListener listener = new NodeCacheListener()
+            {
+                @Override
+                public void nodeChanged() throws Exception
+                {
+                    if ( nodeCache.getCurrentData() != null )
+                    {
+                        ServiceInstance<T> newInstance = serializer.deserialize(nodeCache.getCurrentData().getData());
+                        services.put(newInstance.getId(), newInstance);
+                    }
+                    else
+                    {
+                        log.warn("Instance data has been deleted for: " + instance);
+                    }
+                }
+            };
+            nodeCache.getListenable().addListener(listener);
+            watchedServices.put(instance.getId(), nodeCache);
+        }
+    }
 }


[3/3] curator git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/curator

Posted by ra...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/curator


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/2474454f
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/2474454f
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/2474454f

Branch: refs/heads/master
Commit: 2474454fbe736067aa159942e61fb43682a42bdf
Parents: 8a84f59 a0a676e
Author: randgalt <ra...@apache.org>
Authored: Mon Jan 12 12:31:13 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jan 12 12:31:13 2015 -0500

----------------------------------------------------------------------
 .../framework/recipes/cache/TestTreeCache.java        | 14 ++++++++++++++
 1 file changed, 14 insertions(+)
----------------------------------------------------------------------



[2/3] curator git commit: Merge branch 'CURATOR-176'

Posted by ra...@apache.org.
Merge branch 'CURATOR-176'


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/8a84f596
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/8a84f596
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/8a84f596

Branch: refs/heads/master
Commit: 8a84f5968a3297f09d31c3a8eb8beca8477b50a5
Parents: f17b46f 11dad79
Author: randgalt <ra...@apache.org>
Authored: Mon Jan 12 12:31:00 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jan 12 12:31:00 2015 -0500

----------------------------------------------------------------------
 .../discovery/details/ServiceDiscoveryImpl.java | 92 ++++++++++----------
 1 file changed, 48 insertions(+), 44 deletions(-)
----------------------------------------------------------------------