You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2015/01/12 18:31:18 UTC
[1/3] curator git commit: Use NodeCache instead of manually watching.
It's safer and more Curator-like
Repository: curator
Updated Branches:
refs/heads/master a0a676e3e -> 2474454fb
Use NodeCache instead of manually watching. It's safer and more Curator-like
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/11dad798
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/11dad798
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/11dad798
Branch: refs/heads/master
Commit: 11dad79838b4dc00d6a5ebc59c55fe124f55d22c
Parents: 37dc447
Author: randgalt <ra...@apache.org>
Authored: Mon Jan 12 12:29:44 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jan 12 12:29:44 2015 -0500
----------------------------------------------------------------------
.../discovery/details/ServiceDiscoveryImpl.java | 92 ++++++++++----------
1 file changed, 48 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/11dad798/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
index ca8eabe..3a92e7a 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
@@ -25,10 +25,8 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.BackgroundCallback;
-import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.api.CuratorEventType;
-import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.utils.CloseableUtils;
@@ -44,7 +42,6 @@ import org.apache.curator.x.discovery.ServiceType;
import org.apache.curator.x.discovery.strategies.RoundRobinStrategy;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,9 +61,9 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
private final String basePath;
private final InstanceSerializer<T> serializer;
private final Map<String, ServiceInstance<T>> services = Maps.newConcurrentMap();
+ private final Map<String, NodeCache> watchedServices;
private final Collection<ServiceCache<T>> caches = Sets.newSetFromMap(Maps.<ServiceCache<T>, Boolean>newConcurrentMap());
private final Collection<ServiceProvider<T>> providers = Sets.newSetFromMap(Maps.<ServiceProvider<T>, Boolean>newConcurrentMap());
- private final boolean watchInstances;
private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
{
@Override
@@ -96,13 +93,13 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
*/
public ServiceDiscoveryImpl(CuratorFramework client, String basePath, InstanceSerializer<T> serializer, ServiceInstance<T> thisInstance, boolean watchInstances)
{
- this.watchInstances = watchInstances;
this.client = Preconditions.checkNotNull(client, "client cannot be null");
this.basePath = Preconditions.checkNotNull(basePath, "basePath cannot be null");
this.serializer = Preconditions.checkNotNull(serializer, "serializer cannot be null");
+ watchedServices = watchInstances ? Maps.<String, NodeCache>newConcurrentMap() : null;
if ( thisInstance != null )
{
- services.put(thisInstance.getId(), thisInstance);
+ setService(thisInstance);
}
}
@@ -129,6 +126,13 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
{
CloseableUtils.closeQuietly(provider);
}
+ if ( watchedServices != null )
+ {
+ for ( NodeCache nodeCache : watchedServices.values() )
+ {
+ CloseableUtils.closeQuietly(nodeCache);
+ }
+ }
Iterator<ServiceInstance<T>> it = services.values().iterator();
while ( it.hasNext() )
@@ -171,7 +175,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
@Override
public void registerService(ServiceInstance<T> service) throws Exception
{
- services.put(service.getId(), service);
+ setService(service);
internalRegisterService(service);
}
@@ -197,10 +201,6 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
{
CreateMode mode = (service.getServiceType() == ServiceType.DYNAMIC) ? CreateMode.EPHEMERAL : CreateMode.PERSISTENT;
client.create().creatingParentsIfNeeded().withMode(mode).forPath(path, bytes);
- if ( watchInstances )
- {
- resetWatchedInstance(service);
- }
isDone = true;
}
catch ( KeeperException.NodeExistsException e )
@@ -374,37 +374,6 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
return builder.build();
}
- private void resetWatchedInstance(final ServiceInstance<T> service) throws Exception
- {
- CuratorWatcher watcher = new CuratorWatcher()
- {
- @Override
- public void process(WatchedEvent event) throws Exception
- {
- if ( event.getType() == Watcher.Event.EventType.NodeDataChanged )
- {
- resetWatchedInstance(service);
- }
- }
- };
-
- BackgroundCallback callback = new BackgroundCallback()
- {
- @Override
- public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
- {
- if ( event.getType() == CuratorEventType.GET_DATA )
- {
- ServiceInstance<T> newInstance = serializer.deserialize(event.getData());
- services.put(newInstance.getId(), newInstance);
- }
- }
- };
-
- String path = pathForInstance(service.getName(), service.getId());
- client.getData().usingWatcher(watcher).inBackground(callback).forPath(path);
- }
-
private List<String> getChildrenWatched(String path, Watcher watcher, boolean recurse) throws Exception
{
List<String> instanceIds;
@@ -453,4 +422,39 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
internalRegisterService(service);
}
}
+
+ private void setService(final ServiceInstance<T> instance)
+ {
+ services.put(instance.getId(), instance);
+ if ( watchedServices != null )
+ {
+ final NodeCache nodeCache = new NodeCache(client, pathForInstance(instance.getName(), instance.getId()));
+ try
+ {
+ nodeCache.start(true);
+ }
+ catch ( Exception e )
+ {
+ log.error("Could not start node cache for: " + instance, e);
+ }
+ NodeCacheListener listener = new NodeCacheListener()
+ {
+ @Override
+ public void nodeChanged() throws Exception
+ {
+ if ( nodeCache.getCurrentData() != null )
+ {
+ ServiceInstance<T> newInstance = serializer.deserialize(nodeCache.getCurrentData().getData());
+ services.put(newInstance.getId(), newInstance);
+ }
+ else
+ {
+ log.warn("Instance data has been deleted for: " + instance);
+ }
+ }
+ };
+ nodeCache.getListenable().addListener(listener);
+ watchedServices.put(instance.getId(), nodeCache);
+ }
+ }
}
[3/3] curator git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/curator
Posted by ra...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/curator
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/2474454f
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/2474454f
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/2474454f
Branch: refs/heads/master
Commit: 2474454fbe736067aa159942e61fb43682a42bdf
Parents: 8a84f59 a0a676e
Author: randgalt <ra...@apache.org>
Authored: Mon Jan 12 12:31:13 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jan 12 12:31:13 2015 -0500
----------------------------------------------------------------------
.../framework/recipes/cache/TestTreeCache.java | 14 ++++++++++++++
1 file changed, 14 insertions(+)
----------------------------------------------------------------------
[2/3] curator git commit: Merge branch 'CURATOR-176'
Posted by ra...@apache.org.
Merge branch 'CURATOR-176'
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/8a84f596
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/8a84f596
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/8a84f596
Branch: refs/heads/master
Commit: 8a84f5968a3297f09d31c3a8eb8beca8477b50a5
Parents: f17b46f 11dad79
Author: randgalt <ra...@apache.org>
Authored: Mon Jan 12 12:31:00 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jan 12 12:31:00 2015 -0500
----------------------------------------------------------------------
.../discovery/details/ServiceDiscoveryImpl.java | 92 ++++++++++----------
1 file changed, 48 insertions(+), 44 deletions(-)
----------------------------------------------------------------------