You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2015/04/28 00:09:53 UTC
[01/12] curator git commit: when unregistering a service remove it
from the internal map first and then delete (guaranteed) the node
Repository: curator
Updated Branches:
refs/heads/master 6e16d0d5c -> 06af6ff1c
when unregistering a service remove it from the internal map first and then delete (guaranteed) the node
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/915d83ad
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/915d83ad
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/915d83ad
Branch: refs/heads/master
Commit: 915d83add911d624ab3584508f566344827fbae6
Parents: c65e091
Author: randgalt <ra...@apache.org>
Authored: Tue Apr 21 12:31:17 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Apr 21 12:31:17 2015 -0500
----------------------------------------------------------------------
.../discovery/details/ServiceDiscoveryImpl.java | 62 ++++++++--------
.../x/discovery/TestServiceDiscovery.java | 74 +++++++++++++++++++-
2 files changed, 103 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/915d83ad/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
index 41c5d77..824eb75 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.curator.x.discovery.details;
import com.google.common.annotations.VisibleForTesting;
@@ -149,7 +150,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
ServiceInstance<T> service = it.next();
String path = pathForInstance(service.getName(), service.getId());
boolean doRemove = true;
-
+
try
{
client.delete().forPath(path);
@@ -163,13 +164,13 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
doRemove = false;
log.error("Could not unregister instance: " + service.getName(), e);
}
-
+
if ( doRemove )
{
it.remove();
}
}
-
+
client.getConnectionStateListenable().removeListener(connectionStateListener);
}
@@ -189,25 +190,25 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
@Override
public void updateService(ServiceInstance<T> service) throws Exception
{
- byte[] bytes = serializer.serialize(service);
- String path = pathForInstance(service.getName(), service.getId());
+ byte[] bytes = serializer.serialize(service);
+ String path = pathForInstance(service.getName(), service.getId());
client.setData().forPath(path, bytes);
services.put(service.getId(), service);
}
@VisibleForTesting
- protected void internalRegisterService(ServiceInstance<T> service) throws Exception
+ protected void internalRegisterService(ServiceInstance<T> service) throws Exception
{
- byte[] bytes = serializer.serialize(service);
- String path = pathForInstance(service.getName(), service.getId());
+ byte[] bytes = serializer.serialize(service);
+ String path = pathForInstance(service.getName(), service.getId());
- final int MAX_TRIES = 2;
- boolean isDone = false;
+ final int MAX_TRIES = 2;
+ boolean isDone = false;
for ( int i = 0; !isDone && (i < MAX_TRIES); ++i )
{
try
{
- CreateMode mode = (service.getServiceType() == ServiceType.DYNAMIC) ? CreateMode.EPHEMERAL : CreateMode.PERSISTENT;
+ CreateMode mode = (service.getServiceType() == ServiceType.DYNAMIC) ? CreateMode.EPHEMERAL : CreateMode.PERSISTENT;
client.create().creatingParentsIfNeeded().withMode(mode).forPath(path, bytes);
isDone = true;
}
@@ -225,18 +226,19 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
* @throws Exception errors
*/
@Override
- public void unregisterService(ServiceInstance<T> service) throws Exception
+ public void unregisterService(ServiceInstance<T> service) throws Exception
{
- String path = pathForInstance(service.getName(), service.getId());
+ services.remove(service.getId());
+
+ String path = pathForInstance(service.getName(), service.getId());
try
{
- client.delete().forPath(path);
+ client.delete().guaranteed().forPath(path);
}
catch ( KeeperException.NoNodeException ignore )
{
// ignore
}
- services.remove(service.getId());
}
/**
@@ -271,9 +273,9 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
* @throws Exception errors
*/
@Override
- public Collection<String> queryForNames() throws Exception
+ public Collection<String> queryForNames() throws Exception
{
- List<String> names = client.getChildren().forPath(basePath);
+ List<String> names = client.getChildren().forPath(basePath);
return ImmutableList.copyOf(names);
}
@@ -285,7 +287,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
* @throws Exception errors
*/
@Override
- public Collection<ServiceInstance<T>> queryForInstances(String name) throws Exception
+ public Collection<ServiceInstance<T>> queryForInstances(String name) throws Exception
{
return queryForInstances(name, null);
}
@@ -301,10 +303,10 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
@Override
public ServiceInstance<T> queryForInstance(String name, String id) throws Exception
{
- String path = pathForInstance(name, id);
+ String path = pathForInstance(name, id);
try
{
- byte[] bytes = client.getData().forPath(path);
+ byte[] bytes = client.getData().forPath(path);
return serializer.deserialize(bytes);
}
catch ( KeeperException.NoNodeException ignore )
@@ -314,22 +316,22 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
return null;
}
- void cacheOpened(ServiceCache<T> cache)
+ void cacheOpened(ServiceCache<T> cache)
{
caches.add(cache);
}
- void cacheClosed(ServiceCache<T> cache)
+ void cacheClosed(ServiceCache<T> cache)
{
caches.remove(cache);
}
- void providerOpened(ServiceProvider<T> provider)
+ void providerOpened(ServiceProvider<T> provider)
{
providers.add(provider);
}
- void providerClosed(ServiceProvider<T> cache)
+ void providerClosed(ServiceProvider<T> cache)
{
providers.remove(cache);
}
@@ -339,7 +341,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
return client;
}
- String pathForName(String name)
+ String pathForName(String name)
{
return ZKPaths.makePath(basePath, name);
}
@@ -349,11 +351,11 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
return serializer;
}
- List<ServiceInstance<T>> queryForInstances(String name, Watcher watcher) throws Exception
+ List<ServiceInstance<T>> queryForInstances(String name, Watcher watcher) throws Exception
{
- ImmutableList.Builder<ServiceInstance<T>> builder = ImmutableList.builder();
- String path = pathForName(name);
- List<String> instanceIds;
+ ImmutableList.Builder<ServiceInstance<T>> builder = ImmutableList.builder();
+ String path = pathForName(name);
+ List<String> instanceIds;
if ( watcher != null )
{
@@ -384,7 +386,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
private List<String> getChildrenWatched(String path, Watcher watcher, boolean recurse) throws Exception
{
- List<String> instanceIds;
+ List<String> instanceIds;
try
{
instanceIds = client.getChildren().usingWatcher(watcher).forPath(path);
http://git-wip-us.apache.org/repos/asf/curator/blob/915d83ad/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java
index 6eb9ebb..40d491a 100644
--- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java
+++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java
@@ -28,6 +28,7 @@ import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.KillSession;
import org.apache.curator.test.Timing;
import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.x.discovery.details.InstanceSerializer;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
import org.apache.curator.x.discovery.details.ServiceDiscoveryImpl;
import org.testng.Assert;
@@ -37,7 +38,9 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
public class TestServiceDiscovery extends BaseClassForTests
{
@@ -269,15 +272,15 @@ public class TestServiceDiscovery extends BaseClassForTests
public void testNoServerOnStart() throws Exception
{
server.stop();
- List<Closeable> closeables = Lists.newArrayList();
+ List<Closeable> closeables = Lists.newArrayList();
try
{
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
closeables.add(client);
client.start();
- ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
- ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build();
+ ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
+ ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build();
closeables.add(discovery);
discovery.start();
@@ -297,4 +300,69 @@ public class TestServiceDiscovery extends BaseClassForTests
}
}
}
+
+ // CURATOR-164
+ @Test
+ public void testUnregisterService() throws Exception
+ {
+ final String name = "name";
+
+ final CountDownLatch restartLatch = new CountDownLatch(1);
+ List<Closeable> closeables = Lists.newArrayList();
+
+ InstanceSerializer<String> slowSerializer = new JsonInstanceSerializer<String>(String.class)
+ {
+ private boolean first = true;
+
+ @Override
+ public byte[] serialize(ServiceInstance<String> instance) throws Exception
+ {
+ if ( first )
+ {
+ System.out.println("Serializer first registration.");
+ first = false;
+ }
+ else
+ {
+ System.out.println("Waiting for reconnect to finish.");
+ // Simulate the serialize method being slow.
+ // This could just be a timed wait, but that's kind of non-deterministic.
+ restartLatch.await();
+ }
+ return super.serialize(instance);
+ }
+ };
+
+ try
+ {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ closeables.add(client);
+ client.start();
+
+ ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name(name).port(10064).build();
+ ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).serializer(slowSerializer).build();
+ closeables.add(discovery);
+ discovery.start();
+
+ Assert.assertFalse(discovery.queryForInstances(name).isEmpty(), "Service should start registered.");
+
+ server.stop();
+ server.restart();
+
+ discovery.unregisterService(instance);
+ restartLatch.countDown();
+
+ TimeUnit.SECONDS.sleep(1); // Wait for the rest of registration to finish.
+
+ Assert.assertTrue(discovery.queryForInstances(name).isEmpty(), "Service should have unregistered.");
+ }
+ finally
+ {
+ Collections.reverse(closeables);
+ for ( Closeable c : closeables )
+ {
+ CloseableUtils.closeQuietly(c);
+ }
+ }
+ }
}
[03/12] curator git commit: Concurrent registrations/unregistrations
and connection issues can cause inconsistent state. Change to a model whereby
'unregistering' an instance doesn't remove it from management but changes the
state. Instance will still be
Posted by ra...@apache.org.
Concurrent registrations/unregistrations and connection issues can cause inconsistent state. Change to a model whereby 'unregistering' an instance doesn't remove it from management but changes the state. Instance will still be managed for a period of time and clean after a reasonable period
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/f489dfeb
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/f489dfeb
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/f489dfeb
Branch: refs/heads/master
Commit: f489dfebeed4ecf004ded37a0f05a0a8a2dc7e6d
Parents: 0178c83
Author: randgalt <ra...@apache.org>
Authored: Tue Apr 21 15:13:26 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Apr 21 15:13:26 2015 -0500
----------------------------------------------------------------------
.../discovery/details/ServiceDiscoveryImpl.java | 190 ++++++---
.../x/discovery/TestServiceDiscovery.java | 368 -----------------
.../discovery/details/TestServiceDiscovery.java | 402 +++++++++++++++++++
3 files changed, 546 insertions(+), 414 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/f489dfeb/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
index 824eb75..f53c7ce 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
@@ -50,7 +50,10 @@ import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
/**
* A mechanism to register and query service instances using ZooKeeper
@@ -61,10 +64,11 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
private final CuratorFramework client;
private final String basePath;
private final InstanceSerializer<T> serializer;
- private final Map<String, ServiceInstance<T>> services = Maps.newConcurrentMap();
- private final Map<String, NodeCache> watchedServices;
+ private final ConcurrentMap<String, Holder<T>> services = Maps.newConcurrentMap();
private final Collection<ServiceCache<T>> caches = Sets.newSetFromMap(Maps.<ServiceCache<T>, Boolean>newConcurrentMap());
private final Collection<ServiceProvider<T>> providers = Sets.newSetFromMap(Maps.<ServiceProvider<T>, Boolean>newConcurrentMap());
+ private final boolean watchInstances;
+ private final AtomicLong lastCleanMs = new AtomicLong(System.currentTimeMillis());
private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
{
@Override
@@ -85,6 +89,35 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
}
};
+ private static final int CLEAN_THRESHOLD_MS = Integer.getInteger("curator-discovery-clean-threshold-ms", (int)TimeUnit.MINUTES.toMillis(5));
+
+ private enum State
+ {
+ NEW,
+ REGISTERED,
+ UNREGISTERED
+ }
+
+ private static class Holder<T>
+ {
+ private final AtomicReference<ServiceInstance<T>> service = new AtomicReference<ServiceInstance<T>>();
+ private final AtomicReference<NodeCache> cache = new AtomicReference<NodeCache>();
+ private final AtomicReference<State> state = new AtomicReference<State>();
+ private final AtomicLong stateChangeMs = new AtomicLong();
+
+ public Holder(ServiceInstance<T> instance)
+ {
+ service.set(instance);
+ setState(State.NEW);
+ }
+
+ public void setState(State state)
+ {
+ this.state.set(state);
+ stateChangeMs.set(System.currentTimeMillis());
+ }
+ }
+
/**
* @param client the client
* @param basePath base path to store data
@@ -94,10 +127,10 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
*/
public ServiceDiscoveryImpl(CuratorFramework client, String basePath, InstanceSerializer<T> serializer, ServiceInstance<T> thisInstance, boolean watchInstances)
{
+ this.watchInstances = watchInstances;
this.client = Preconditions.checkNotNull(client, "client cannot be null");
this.basePath = Preconditions.checkNotNull(basePath, "basePath cannot be null");
this.serializer = Preconditions.checkNotNull(serializer, "serializer cannot be null");
- watchedServices = watchInstances ? Maps.<String, NodeCache>newConcurrentMap() : null;
if ( thisInstance != null )
{
setService(thisInstance);
@@ -134,26 +167,12 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
{
CloseableUtils.closeQuietly(provider);
}
- if ( watchedServices != null )
- {
- for ( NodeCache nodeCache : watchedServices.values() )
- {
- CloseableUtils.closeQuietly(nodeCache);
- }
- }
- Iterator<ServiceInstance<T>> it = services.values().iterator();
- while ( it.hasNext() )
+ for ( Holder<T> holder : services.values() )
{
- // Should not use unregisterService because of potential ConcurrentModificationException
- // so we in-line the bulk of the method here
- ServiceInstance<T> service = it.next();
- String path = pathForInstance(service.getName(), service.getId());
- boolean doRemove = true;
-
try
{
- client.delete().forPath(path);
+ internalUnregisterService(holder);
}
catch ( KeeperException.NoNodeException ignore )
{
@@ -161,13 +180,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
}
catch ( Exception e )
{
- doRemove = false;
- log.error("Could not unregister instance: " + service.getName(), e);
- }
-
- if ( doRemove )
- {
- it.remove();
+ log.error("Could not unregister instance: " + holder.service.get().getName(), e);
}
}
@@ -183,6 +196,8 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
@Override
public void registerService(ServiceInstance<T> service) throws Exception
{
+ clean();
+
setService(service);
internalRegisterService(service);
}
@@ -190,10 +205,18 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
@Override
public void updateService(ServiceInstance<T> service) throws Exception
{
+ clean();
+
+ Holder<T> holder = getOrMakeHolder(service, null);
+ if ( holder.state.get() == State.UNREGISTERED )
+ {
+ throw new Exception("Service has been unregistered: " + service);
+ }
+
+ holder.service.set(service);
byte[] bytes = serializer.serialize(service);
String path = pathForInstance(service.getName(), service.getId());
client.setData().forPath(path, bytes);
- services.put(service.getId(), service);
}
@VisibleForTesting
@@ -228,17 +251,9 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
@Override
public void unregisterService(ServiceInstance<T> service) throws Exception
{
- services.remove(service.getId());
+ clean();
- String path = pathForInstance(service.getName(), service.getId());
- try
- {
- client.delete().guaranteed().forPath(path);
- }
- catch ( KeeperException.NoNodeException ignore )
- {
- // ignore
- }
+ internalUnregisterService(getOrMakeHolder(service, null));
}
/**
@@ -249,6 +264,8 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
@Override
public ServiceProviderBuilder<T> serviceProviderBuilder()
{
+ clean();
+
return new ServiceProviderBuilderImpl<T>(this)
.providerStrategy(new RoundRobinStrategy<T>())
.threadFactory(ThreadUtils.newThreadFactory("ServiceProvider"));
@@ -262,6 +279,8 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
@Override
public ServiceCacheBuilder<T> serviceCacheBuilder()
{
+ clean();
+
return new ServiceCacheBuilderImpl<T>(this)
.threadFactory(ThreadUtils.newThreadFactory("ServiceCache"));
}
@@ -275,6 +294,8 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
@Override
public Collection<String> queryForNames() throws Exception
{
+ clean();
+
List<String> names = client.getChildren().forPath(basePath);
return ImmutableList.copyOf(names);
}
@@ -303,6 +324,8 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
@Override
public ServiceInstance<T> queryForInstance(String name, String id) throws Exception
{
+ clean();
+
String path = pathForInstance(name, id);
try
{
@@ -338,6 +361,8 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
CuratorFramework getClient()
{
+ clean();
+
return client;
}
@@ -353,6 +378,8 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
List<ServiceInstance<T>> queryForInstances(String name, Watcher watcher) throws Exception
{
+ clean();
+
ImmutableList.Builder<ServiceInstance<T>> builder = ImmutableList.builder();
String path = pathForName(name);
List<String> instanceIds;
@@ -384,6 +411,12 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
return builder.build();
}
+ @VisibleForTesting
+ int debugServicesQty()
+ {
+ return services.size();
+ }
+
private List<String> getChildrenWatched(String path, Watcher watcher, boolean recurse) throws Exception
{
List<String> instanceIds;
@@ -422,23 +455,29 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
@VisibleForTesting
ServiceInstance<T> getRegisteredService(String id)
{
- return services.get(id);
+ Holder<T> holder = services.get(id);
+ return ((holder != null) && (holder.state.get() == State.REGISTERED)) ? holder.service.get() : null;
}
private void reRegisterServices() throws Exception
{
- for ( ServiceInstance<T> service : services.values() )
+ for ( Holder<T> service : services.values() )
{
- internalRegisterService(service);
+ if ( service.state.get() == State.REGISTERED )
+ {
+ internalRegisterService(service.service.get());
+ }
}
}
private void setService(final ServiceInstance<T> instance)
{
- services.put(instance.getId(), instance);
- if ( watchedServices != null )
+ final NodeCache nodeCache = watchInstances ? new NodeCache(client, pathForInstance(instance.getName(), instance.getId())) : null;
+ Holder<T> holder = getOrMakeHolder(instance, nodeCache);
+ holder.setState(State.REGISTERED);
+
+ if ( nodeCache != null )
{
- final NodeCache nodeCache = new NodeCache(client, pathForInstance(instance.getName(), instance.getId()));
try
{
nodeCache.start(true);
@@ -455,7 +494,11 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
if ( nodeCache.getCurrentData() != null )
{
ServiceInstance<T> newInstance = serializer.deserialize(nodeCache.getCurrentData().getData());
- services.put(newInstance.getId(), newInstance);
+ Holder<T> holder = services.get(newInstance.getId());
+ if ( holder != null )
+ {
+ holder.service.set(newInstance);
+ }
}
else
{
@@ -464,7 +507,62 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
}
};
nodeCache.getListenable().addListener(listener);
- watchedServices.put(instance.getId(), nodeCache);
+ }
+ }
+
+ private Holder<T> getOrMakeHolder(ServiceInstance<T> instance, NodeCache nodeCache)
+ {
+ Holder<T> newHolder = new Holder<T>(instance);
+ Holder<T> oldHolder = services.putIfAbsent(instance.getId(), newHolder);
+ Holder<T> useHolder = (oldHolder != null) ? oldHolder : newHolder;
+ useHolder.cache.set(nodeCache);
+ return useHolder;
+ }
+
+ private void clean()
+ {
+ long localLastCleanMs = lastCleanMs.get();
+ long now = System.currentTimeMillis();
+ long elpased = now - localLastCleanMs;
+ if ( (elpased >= CLEAN_THRESHOLD_MS) && lastCleanMs.compareAndSet(localLastCleanMs, now + 1) )
+ {
+ Iterator<Holder<T>> iterator = services.values().iterator();
+ while ( iterator.hasNext() )
+ {
+ Holder<T> holder = iterator.next();
+ if ( holder.state.get() == State.UNREGISTERED )
+ {
+ long elapsed = System.currentTimeMillis() - holder.stateChangeMs.get();
+ if ( elapsed >= CLEAN_THRESHOLD_MS )
+ {
+ iterator.remove();
+ }
+ }
+ }
+ }
+ }
+
+ private void internalUnregisterService(Holder<T> holder) throws Exception
+ {
+ if ( holder != null )
+ {
+ holder.setState(State.UNREGISTERED);
+ NodeCache cache = holder.cache.getAndSet(null);
+ if ( cache != null )
+ {
+ CloseableUtils.closeQuietly(cache);
+ }
+
+ ServiceInstance<T> service = holder.service.get();
+ String path = pathForInstance(service.getName(), service.getId());
+ try
+ {
+ client.delete().guaranteed().forPath(path);
+ }
+ catch ( KeeperException.NoNodeException ignore )
+ {
+ // ignore
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/f489dfeb/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java
deleted file mode 100644
index 3b45494..0000000
--- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java
+++ /dev/null
@@ -1,368 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.curator.x.discovery;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.RetryOneTime;
-import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.test.KillSession;
-import org.apache.curator.test.Timing;
-import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.x.discovery.details.InstanceSerializer;
-import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
-import org.apache.curator.x.discovery.details.ServiceDiscoveryImpl;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-import java.io.Closeable;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-
-public class TestServiceDiscovery extends BaseClassForTests
-{
- private static final Comparator<ServiceInstance<Void>> comparator = new Comparator<ServiceInstance<Void>>()
- {
- @Override
- public int compare(ServiceInstance<Void> o1, ServiceInstance<Void> o2)
- {
- return o1.getId().compareTo(o2.getId());
- }
- };
-
- @Test
- public void testCrashedServerMultiInstances() throws Exception
- {
- List<Closeable> closeables = Lists.newArrayList();
- try
- {
- Timing timing = new Timing();
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- closeables.add(client);
- client.start();
-
- final Semaphore semaphore = new Semaphore(0);
- ServiceInstance<String> instance1 = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
- ServiceInstance<String> instance2 = ServiceInstance.<String>builder().payload("thing").name("test").port(10065).build();
- ServiceDiscovery<String> discovery = new ServiceDiscoveryImpl<String>(client, "/test", new JsonInstanceSerializer<String>(String.class), instance1, false)
- {
- @Override
- protected void internalRegisterService(ServiceInstance<String> service) throws Exception
- {
- super.internalRegisterService(service);
- semaphore.release();
- }
- };
- closeables.add(discovery);
- discovery.start();
- discovery.registerService(instance2);
-
- timing.acquireSemaphore(semaphore, 2);
- Assert.assertEquals(discovery.queryForInstances("test").size(), 2);
-
- KillSession.kill(client.getZookeeperClient().getZooKeeper(), server.getConnectString());
- server.stop();
-
- server.restart();
- closeables.add(server);
-
- timing.acquireSemaphore(semaphore, 2);
- Assert.assertEquals(discovery.queryForInstances("test").size(), 2);
- }
- finally
- {
- for ( Closeable c : closeables )
- {
- CloseableUtils.closeQuietly(c);
- }
- }
- }
-
- @Test
- public void testCrashedServer() throws Exception
- {
- List<Closeable> closeables = Lists.newArrayList();
- try
- {
- Timing timing = new Timing();
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- closeables.add(client);
- client.start();
-
- final Semaphore semaphore = new Semaphore(0);
- ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
- ServiceDiscovery<String> discovery = new ServiceDiscoveryImpl<String>(client, "/test", new JsonInstanceSerializer<String>(String.class), instance, false)
- {
- @Override
- protected void internalRegisterService(ServiceInstance<String> service) throws Exception
- {
- super.internalRegisterService(service);
- semaphore.release();
- }
- };
- closeables.add(discovery);
- discovery.start();
-
- timing.acquireSemaphore(semaphore);
- Assert.assertEquals(discovery.queryForInstances("test").size(), 1);
-
- KillSession.kill(client.getZookeeperClient().getZooKeeper(), server.getConnectString());
- server.stop();
-
- server.restart();
- closeables.add(server);
-
- timing.acquireSemaphore(semaphore);
- Assert.assertEquals(discovery.queryForInstances("test").size(), 1);
- }
- finally
- {
- for ( Closeable c : closeables )
- {
- CloseableUtils.closeQuietly(c);
- }
- }
- }
-
- @Test
- public void testCrashedInstance() throws Exception
- {
- List<Closeable> closeables = Lists.newArrayList();
- try
- {
- Timing timing = new Timing();
-
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- closeables.add(client);
- client.start();
-
- ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
- ServiceDiscovery<String> discovery = new ServiceDiscoveryImpl<String>(client, "/test", new JsonInstanceSerializer<String>(String.class), instance, false);
- closeables.add(discovery);
- discovery.start();
-
- Assert.assertEquals(discovery.queryForInstances("test").size(), 1);
-
- KillSession.kill(client.getZookeeperClient().getZooKeeper(), server.getConnectString());
- Thread.sleep(timing.multiple(1.5).session());
-
- Assert.assertEquals(discovery.queryForInstances("test").size(), 1);
- }
- finally
- {
- Collections.reverse(closeables);
- for ( Closeable c : closeables )
- {
- CloseableUtils.closeQuietly(c);
- }
- }
- }
-
- @Test
- public void testMultipleInstances() throws Exception
- {
- final String SERVICE_ONE = "one";
- final String SERVICE_TWO = "two";
-
- List<Closeable> closeables = Lists.newArrayList();
- try
- {
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
- closeables.add(client);
- client.start();
-
- ServiceInstance<Void> s1_i1 = ServiceInstance.<Void>builder().name(SERVICE_ONE).build();
- ServiceInstance<Void> s1_i2 = ServiceInstance.<Void>builder().name(SERVICE_ONE).build();
- ServiceInstance<Void> s2_i1 = ServiceInstance.<Void>builder().name(SERVICE_TWO).build();
- ServiceInstance<Void> s2_i2 = ServiceInstance.<Void>builder().name(SERVICE_TWO).build();
-
- ServiceDiscovery<Void> discovery = ServiceDiscoveryBuilder.builder(Void.class).client(client).basePath("/test").build();
- closeables.add(discovery);
- discovery.start();
-
- discovery.registerService(s1_i1);
- discovery.registerService(s1_i2);
- discovery.registerService(s2_i1);
- discovery.registerService(s2_i2);
-
- Assert.assertEquals(Sets.newHashSet(discovery.queryForNames()), Sets.newHashSet(SERVICE_ONE, SERVICE_TWO));
-
- List<ServiceInstance<Void>> list = Lists.newArrayList();
- list.add(s1_i1);
- list.add(s1_i2);
- Collections.sort(list, comparator);
- List<ServiceInstance<Void>> queriedInstances = Lists.newArrayList(discovery.queryForInstances(SERVICE_ONE));
- Collections.sort(queriedInstances, comparator);
- Assert.assertEquals(queriedInstances, list, String.format("Not equal l: %s - d: %s", list, queriedInstances));
-
- list.clear();
-
- list.add(s2_i1);
- list.add(s2_i2);
- Collections.sort(list, comparator);
- queriedInstances = Lists.newArrayList(discovery.queryForInstances(SERVICE_TWO));
- Collections.sort(queriedInstances, comparator);
- Assert.assertEquals(queriedInstances, list, String.format("Not equal 2: %s - d: %s", list, queriedInstances));
- }
- finally
- {
- Collections.reverse(closeables);
- for ( Closeable c : closeables )
- {
- CloseableUtils.closeQuietly(c);
- }
- }
- }
-
- @Test
- public void testBasic() throws Exception
- {
- List<Closeable> closeables = Lists.newArrayList();
- try
- {
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
- closeables.add(client);
- client.start();
-
- ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
- ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build();
- closeables.add(discovery);
- discovery.start();
-
- Assert.assertEquals(discovery.queryForNames(), Arrays.asList("test"));
-
- List<ServiceInstance<String>> list = Lists.newArrayList();
- list.add(instance);
- Assert.assertEquals(discovery.queryForInstances("test"), list);
- }
- finally
- {
- Collections.reverse(closeables);
- for ( Closeable c : closeables )
- {
- CloseableUtils.closeQuietly(c);
- }
- }
- }
-
- @Test
- public void testNoServerOnStart() throws Exception
- {
- server.stop();
- List<Closeable> closeables = Lists.newArrayList();
- try
- {
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
- closeables.add(client);
- client.start();
-
- ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
- ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build();
- closeables.add(discovery);
- discovery.start();
-
- server.restart();
- Assert.assertEquals(discovery.queryForNames(), Arrays.asList("test"));
-
- List<ServiceInstance<String>> list = Lists.newArrayList();
- list.add(instance);
- Assert.assertEquals(discovery.queryForInstances("test"), list);
- }
- finally
- {
- Collections.reverse(closeables);
- for ( Closeable c : closeables )
- {
- CloseableUtils.closeQuietly(c);
- }
- }
- }
-
- // CURATOR-164
- @Test
- public void testUnregisterService() throws Exception
- {
- final String name = "name";
-
- final CountDownLatch restartLatch = new CountDownLatch(1);
- List<Closeable> closeables = Lists.newArrayList();
-
- InstanceSerializer<String> slowSerializer = new JsonInstanceSerializer<String>(String.class)
- {
- private boolean first = true;
-
- @Override
- public byte[] serialize(ServiceInstance<String> instance) throws Exception
- {
- if ( first )
- {
- System.out.println("Serializer first registration.");
- first = false;
- }
- else
- {
- System.out.println("Waiting for reconnect to finish.");
- // Simulate the serialize method being slow.
- // This could just be a timed wait, but that's kind of non-deterministic.
- restartLatch.await();
- }
- return super.serialize(instance);
- }
- };
-
- try
- {
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
- closeables.add(client);
- client.start();
-
- ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name(name).port(10064).build();
- ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).serializer(slowSerializer).watchInstances(true).build();
- closeables.add(discovery);
- discovery.start();
-
- Assert.assertFalse(discovery.queryForInstances(name).isEmpty(), "Service should start registered.");
-
- server.stop();
- server.restart();
-
- discovery.unregisterService(instance);
- restartLatch.countDown();
-
- new Timing().sleepABit(); // Wait for the rest of registration to finish.
-
- Assert.assertTrue(discovery.queryForInstances(name).isEmpty(), "Service should have unregistered.");
- }
- finally
- {
- Collections.reverse(closeables);
- for ( Closeable c : closeables )
- {
- CloseableUtils.closeQuietly(c);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/curator/blob/f489dfeb/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
new file mode 100644
index 0000000..2808c5c
--- /dev/null
+++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
@@ -0,0 +1,402 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.discovery.details;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.KillSession;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.x.discovery.ServiceDiscovery;
+import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
+import org.apache.curator.x.discovery.ServiceInstance;
+import org.apache.curator.x.discovery.details.InstanceSerializer;
+import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
+import org.apache.curator.x.discovery.details.ServiceDiscoveryImpl;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.io.Closeable;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+
+public class TestServiceDiscovery extends BaseClassForTests
+{
+ private static final Comparator<ServiceInstance<Void>> comparator = new Comparator<ServiceInstance<Void>>()
+ {
+ @Override
+ public int compare(ServiceInstance<Void> o1, ServiceInstance<Void> o2)
+ {
+ return o1.getId().compareTo(o2.getId());
+ }
+ };
+
+ @Test
+ public void testCrashedServerMultiInstances() throws Exception
+ {
+ List<Closeable> closeables = Lists.newArrayList();
+ try
+ {
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ closeables.add(client);
+ client.start();
+
+ final Semaphore semaphore = new Semaphore(0);
+ ServiceInstance<String> instance1 = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
+ ServiceInstance<String> instance2 = ServiceInstance.<String>builder().payload("thing").name("test").port(10065).build();
+ ServiceDiscovery<String> discovery = new ServiceDiscoveryImpl<String>(client, "/test", new JsonInstanceSerializer<String>(String.class), instance1, false)
+ {
+ @Override
+ protected void internalRegisterService(ServiceInstance<String> service) throws Exception
+ {
+ super.internalRegisterService(service);
+ semaphore.release();
+ }
+ };
+ closeables.add(discovery);
+ discovery.start();
+ discovery.registerService(instance2);
+
+ timing.acquireSemaphore(semaphore, 2);
+ Assert.assertEquals(discovery.queryForInstances("test").size(), 2);
+
+ KillSession.kill(client.getZookeeperClient().getZooKeeper(), server.getConnectString());
+ server.stop();
+
+ server.restart();
+ closeables.add(server);
+
+ timing.acquireSemaphore(semaphore, 2);
+ Assert.assertEquals(discovery.queryForInstances("test").size(), 2);
+ }
+ finally
+ {
+ for ( Closeable c : closeables )
+ {
+ CloseableUtils.closeQuietly(c);
+ }
+ }
+ }
+
+ @Test
+ public void testCrashedServer() throws Exception
+ {
+ List<Closeable> closeables = Lists.newArrayList();
+ try
+ {
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ closeables.add(client);
+ client.start();
+
+ final Semaphore semaphore = new Semaphore(0);
+ ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
+ ServiceDiscovery<String> discovery = new ServiceDiscoveryImpl<String>(client, "/test", new JsonInstanceSerializer<String>(String.class), instance, false)
+ {
+ @Override
+ protected void internalRegisterService(ServiceInstance<String> service) throws Exception
+ {
+ super.internalRegisterService(service);
+ semaphore.release();
+ }
+ };
+ closeables.add(discovery);
+ discovery.start();
+
+ timing.acquireSemaphore(semaphore);
+ Assert.assertEquals(discovery.queryForInstances("test").size(), 1);
+
+ KillSession.kill(client.getZookeeperClient().getZooKeeper(), server.getConnectString());
+ server.stop();
+
+ server.restart();
+ closeables.add(server);
+
+ timing.acquireSemaphore(semaphore);
+ Assert.assertEquals(discovery.queryForInstances("test").size(), 1);
+ }
+ finally
+ {
+ for ( Closeable c : closeables )
+ {
+ CloseableUtils.closeQuietly(c);
+ }
+ }
+ }
+
+ @Test
+ public void testCrashedInstance() throws Exception
+ {
+ List<Closeable> closeables = Lists.newArrayList();
+ try
+ {
+ Timing timing = new Timing();
+
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ closeables.add(client);
+ client.start();
+
+ ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
+ ServiceDiscovery<String> discovery = new ServiceDiscoveryImpl<String>(client, "/test", new JsonInstanceSerializer<String>(String.class), instance, false);
+ closeables.add(discovery);
+ discovery.start();
+
+ Assert.assertEquals(discovery.queryForInstances("test").size(), 1);
+
+ KillSession.kill(client.getZookeeperClient().getZooKeeper(), server.getConnectString());
+ Thread.sleep(timing.multiple(1.5).session());
+
+ Assert.assertEquals(discovery.queryForInstances("test").size(), 1);
+ }
+ finally
+ {
+ Collections.reverse(closeables);
+ for ( Closeable c : closeables )
+ {
+ CloseableUtils.closeQuietly(c);
+ }
+ }
+ }
+
+ @Test
+ public void testMultipleInstances() throws Exception
+ {
+ final String SERVICE_ONE = "one";
+ final String SERVICE_TWO = "two";
+
+ List<Closeable> closeables = Lists.newArrayList();
+ try
+ {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ closeables.add(client);
+ client.start();
+
+ ServiceInstance<Void> s1_i1 = ServiceInstance.<Void>builder().name(SERVICE_ONE).build();
+ ServiceInstance<Void> s1_i2 = ServiceInstance.<Void>builder().name(SERVICE_ONE).build();
+ ServiceInstance<Void> s2_i1 = ServiceInstance.<Void>builder().name(SERVICE_TWO).build();
+ ServiceInstance<Void> s2_i2 = ServiceInstance.<Void>builder().name(SERVICE_TWO).build();
+
+ ServiceDiscovery<Void> discovery = ServiceDiscoveryBuilder.builder(Void.class).client(client).basePath("/test").build();
+ closeables.add(discovery);
+ discovery.start();
+
+ discovery.registerService(s1_i1);
+ discovery.registerService(s1_i2);
+ discovery.registerService(s2_i1);
+ discovery.registerService(s2_i2);
+
+ Assert.assertEquals(Sets.newHashSet(discovery.queryForNames()), Sets.newHashSet(SERVICE_ONE, SERVICE_TWO));
+
+ List<ServiceInstance<Void>> list = Lists.newArrayList();
+ list.add(s1_i1);
+ list.add(s1_i2);
+ Collections.sort(list, comparator);
+ List<ServiceInstance<Void>> queriedInstances = Lists.newArrayList(discovery.queryForInstances(SERVICE_ONE));
+ Collections.sort(queriedInstances, comparator);
+ Assert.assertEquals(queriedInstances, list, String.format("Not equal l: %s - d: %s", list, queriedInstances));
+
+ list.clear();
+
+ list.add(s2_i1);
+ list.add(s2_i2);
+ Collections.sort(list, comparator);
+ queriedInstances = Lists.newArrayList(discovery.queryForInstances(SERVICE_TWO));
+ Collections.sort(queriedInstances, comparator);
+ Assert.assertEquals(queriedInstances, list, String.format("Not equal 2: %s - d: %s", list, queriedInstances));
+ }
+ finally
+ {
+ Collections.reverse(closeables);
+ for ( Closeable c : closeables )
+ {
+ CloseableUtils.closeQuietly(c);
+ }
+ }
+ }
+
+ @Test
+ public void testBasic() throws Exception
+ {
+ List<Closeable> closeables = Lists.newArrayList();
+ try
+ {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ closeables.add(client);
+ client.start();
+
+ ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
+ ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build();
+ closeables.add(discovery);
+ discovery.start();
+
+ Assert.assertEquals(discovery.queryForNames(), Collections.singletonList("test"));
+
+ List<ServiceInstance<String>> list = Lists.newArrayList();
+ list.add(instance);
+ Assert.assertEquals(discovery.queryForInstances("test"), list);
+ }
+ finally
+ {
+ Collections.reverse(closeables);
+ for ( Closeable c : closeables )
+ {
+ CloseableUtils.closeQuietly(c);
+ }
+ }
+ }
+
+ @Test
+ public void testNoServerOnStart() throws Exception
+ {
+ server.stop();
+ List<Closeable> closeables = Lists.newArrayList();
+ try
+ {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ closeables.add(client);
+ client.start();
+
+ ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
+ ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build();
+ closeables.add(discovery);
+ discovery.start();
+
+ server.restart();
+ Assert.assertEquals(discovery.queryForNames(), Collections.singletonList("test"));
+
+ List<ServiceInstance<String>> list = Lists.newArrayList();
+ list.add(instance);
+ Assert.assertEquals(discovery.queryForInstances("test"), list);
+ }
+ finally
+ {
+ Collections.reverse(closeables);
+ for ( Closeable c : closeables )
+ {
+ CloseableUtils.closeQuietly(c);
+ }
+ }
+ }
+
+ // CURATOR-164
+ @Test
+ public void testUnregisterService() throws Exception
+ {
+ final String name = "name";
+
+ final CountDownLatch restartLatch = new CountDownLatch(1);
+ List<Closeable> closeables = Lists.newArrayList();
+
+ InstanceSerializer<String> slowSerializer = new JsonInstanceSerializer<String>(String.class)
+ {
+ private boolean first = true;
+
+ @Override
+ public byte[] serialize(ServiceInstance<String> instance) throws Exception
+ {
+ if ( first )
+ {
+ System.out.println("Serializer first registration.");
+ first = false;
+ }
+ else
+ {
+ System.out.println("Waiting for reconnect to finish.");
+ // Simulate the serialize method being slow.
+ // This could just be a timed wait, but that's kind of non-deterministic.
+ restartLatch.await();
+ }
+ return super.serialize(instance);
+ }
+ };
+
+ try
+ {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ closeables.add(client);
+ client.start();
+
+ ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name(name).port(10064).build();
+ ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).serializer(slowSerializer).watchInstances(true).build();
+ closeables.add(discovery);
+ discovery.start();
+
+ Assert.assertFalse(discovery.queryForInstances(name).isEmpty(), "Service should start registered.");
+
+ server.stop();
+ server.restart();
+
+ discovery.unregisterService(instance);
+ restartLatch.countDown();
+
+ new Timing().sleepABit(); // Wait for the rest of registration to finish.
+
+ Assert.assertTrue(discovery.queryForInstances(name).isEmpty(), "Service should have unregistered.");
+ }
+ finally
+ {
+ Collections.reverse(closeables);
+ for ( Closeable c : closeables )
+ {
+ CloseableUtils.closeQuietly(c);
+ }
+ }
+ }
+
+ @Test
+ public void testCleaning() throws Exception
+ {
+ System.setProperty("curator-discovery-clean-threshold-ms", "10");
+ List<Closeable> closeables = Lists.newArrayList();
+ try
+ {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ closeables.add(client);
+ client.start();
+
+ ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
+ ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build();
+ closeables.add(discovery);
+ discovery.start();
+ discovery.unregisterService(instance);
+
+ Thread.sleep(100);
+
+ discovery.queryForNames(); // causes a clean
+ Assert.assertEquals(((ServiceDiscoveryImpl)discovery).debugServicesQty(), 0);
+ }
+ finally
+ {
+ System.clearProperty("curator-discovery-clean-threshold-ms");
+ Collections.reverse(closeables);
+ for ( Closeable c : closeables )
+ {
+ CloseableUtils.closeQuietly(c);
+ }
+ }
+ }
+}
[09/12] curator git commit: Use sync instead of locks. It's simpler
and clearer
Posted by ra...@apache.org.
Use sync instead of locks. It's simpler and clearer
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/1a16f82b
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/1a16f82b
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/1a16f82b
Branch: refs/heads/master
Commit: 1a16f82baae0360a2823db2cc811fbeb3d6e1392
Parents: f8b67dc
Author: randgalt <ra...@apache.org>
Authored: Mon Apr 27 14:51:46 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Apr 27 14:51:46 2015 -0500
----------------------------------------------------------------------
.../curator/x/discovery/details/Holder.java | 106 ++++---------------
.../discovery/details/ServiceDiscoveryImpl.java | 24 +----
2 files changed, 26 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/1a16f82b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/Holder.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/Holder.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/Holder.java
index cbc6236..d088f8d 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/Holder.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/Holder.java
@@ -2,8 +2,6 @@ package org.apache.curator.x.discovery.details;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.x.discovery.ServiceInstance;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
class Holder<T>
{
@@ -18,7 +16,6 @@ class Holder<T>
private NodeCache cache;
private State state;
private long stateChangeMs;
- private final ReentrantLock lock = new ReentrantLock();
Holder(ServiceInstance<T> service, NodeCache nodeCache)
{
@@ -27,110 +24,49 @@ class Holder<T>
setState(State.NEW);
}
- ServiceInstance<T> getService()
+ synchronized ServiceInstance<T> getService()
{
- lock.lock();
- try
- {
- return service;
- }
- finally
- {
- lock.unlock();
- }
+ return service;
}
- ServiceInstance<T> getServiceIfRegistered()
+ synchronized ServiceInstance<T> getServiceIfRegistered()
{
- lock.lock();
- try
- {
- return (state == State.REGISTERED) ? service : null;
- }
- finally
- {
- lock.unlock();
- }
+ return (state == State.REGISTERED) ? service : null;
}
- void setService(ServiceInstance<T> service)
+ synchronized void setService(ServiceInstance<T> service)
{
- lock.lock();
- try
- {
- this.service = service;
- }
- finally
- {
- lock.unlock();
- }
+ this.service = service;
}
- NodeCache getAndClearCache()
+ synchronized NodeCache getAndClearCache()
{
- lock.lock();
- try
- {
- NodeCache localCache = cache;
- cache = null;
- return localCache;
- }
- finally
- {
- lock.unlock();
- }
+ NodeCache localCache = cache;
+ cache = null;
+ return localCache;
}
- boolean isRegistered()
+ synchronized boolean isRegistered()
{
- lock.lock();
- try
- {
- return state == State.REGISTERED;
- }
- finally
- {
- lock.unlock();
- }
+ return state == State.REGISTERED;
}
- boolean isLapsedUnregistered(int cleanThresholdMs)
+ synchronized boolean isLapsedUnregistered(int cleanThresholdMs)
{
- lock.lock();
- try
+ if ( state == State.UNREGISTERED )
{
- if ( state == State.UNREGISTERED )
+ long elapsed = System.currentTimeMillis() - stateChangeMs;
+ if ( elapsed >= cleanThresholdMs )
{
- long elapsed = System.currentTimeMillis() - stateChangeMs;
- if ( elapsed >= cleanThresholdMs )
- {
- return true;
- }
+ return true;
}
- return false;
- }
- finally
- {
- lock.unlock();
- }
- }
-
- void setState(State state)
- {
- lock.lock();
- try
- {
- this.state = state;
- stateChangeMs = System.currentTimeMillis();
- }
- finally
- {
- lock.unlock();
}
+ return false;
}
- Lock getLock()
+ synchronized void setState(State state)
{
- return lock;
+ this.state = state;
+ stateChangeMs = System.currentTimeMillis();
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/1a16f82b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
index 8e3e1f9..a35cd3a 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
@@ -59,6 +59,7 @@ import java.util.concurrent.atomic.AtomicLong;
/**
* A mechanism to register and query service instances using ZooKeeper
*/
+@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
{
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -181,9 +182,8 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
{
clean();
- Holder<T> holder = getOrMakeHolder(service, null);
- holder.getLock().lock();
- try
+ final Holder<T> holder = getOrMakeHolder(service, null);
+ synchronized(holder)
{
if ( !holder.isRegistered() )
{
@@ -195,10 +195,6 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
String path = pathForInstance(service.getName(), service.getId());
client.setData().forPath(path, bytes);
}
- finally
- {
- holder.getLock().unlock();
- }
}
@VisibleForTesting
@@ -459,18 +455,13 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
{
for ( final Holder<T> holder : services.values() )
{
- holder.getLock().lock();
- try
+ synchronized(holder)
{
if ( holder.isRegistered() )
{
internalRegisterService(holder.getService());
}
}
- finally
- {
- holder.getLock().unlock();
- }
}
}
@@ -544,8 +535,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
{
if ( holder != null )
{
- holder.getLock().lock();
- try
+ synchronized(holder)
{
holder.setState(Holder.State.UNREGISTERED);
NodeCache cache = holder.getAndClearCache();
@@ -565,10 +555,6 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
// ignore
}
}
- finally
- {
- holder.getLock().unlock();
- }
}
}
}
[02/12] curator git commit: set watched instances to true to check
that as well
Posted by ra...@apache.org.
set watched instances to true to check that as well
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/0178c830
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/0178c830
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/0178c830
Branch: refs/heads/master
Commit: 0178c830385854bb8737afba301c7896604e4547
Parents: 915d83a
Author: randgalt <ra...@apache.org>
Authored: Tue Apr 21 12:35:23 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Apr 21 12:35:23 2015 -0500
----------------------------------------------------------------------
.../org/apache/curator/x/discovery/TestServiceDiscovery.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/0178c830/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java
index 40d491a..3b45494 100644
--- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java
+++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java
@@ -340,7 +340,7 @@ public class TestServiceDiscovery extends BaseClassForTests
client.start();
ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name(name).port(10064).build();
- ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).serializer(slowSerializer).build();
+ ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).serializer(slowSerializer).watchInstances(true).build();
closeables.add(discovery);
discovery.start();
@@ -352,7 +352,7 @@ public class TestServiceDiscovery extends BaseClassForTests
discovery.unregisterService(instance);
restartLatch.countDown();
- TimeUnit.SECONDS.sleep(1); // Wait for the rest of registration to finish.
+ new Timing().sleepABit(); // Wait for the rest of registration to finish.
Assert.assertTrue(discovery.queryForInstances(name).isEmpty(), "Service should have unregistered.");
}
[08/12] curator git commit: cleanup/refactoring
Posted by ra...@apache.org.
cleanup/refactoring
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/f8b67dc4
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/f8b67dc4
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/f8b67dc4
Branch: refs/heads/master
Commit: f8b67dc42d9892b59b5afd52e351c4ef11c4d457
Parents: 1665084
Author: randgalt <ra...@apache.org>
Authored: Tue Apr 21 18:01:34 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Apr 21 18:01:34 2015 -0500
----------------------------------------------------------------------
.../main/java/org/apache/curator/x/discovery/details/Holder.java | 3 ++-
.../apache/curator/x/discovery/details/ServiceDiscoveryImpl.java | 3 +--
2 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/f8b67dc4/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/Holder.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/Holder.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/Holder.java
index c126b29..cbc6236 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/Holder.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/Holder.java
@@ -2,6 +2,7 @@ package org.apache.curator.x.discovery.details;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.x.discovery.ServiceInstance;
+import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class Holder<T>
@@ -128,7 +129,7 @@ class Holder<T>
}
}
- ReentrantLock getLock()
+ Lock getLock()
{
return lock;
}
http://git-wip-us.apache.org/repos/asf/curator/blob/f8b67dc4/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
index f15a387..8e3e1f9 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
@@ -518,8 +518,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
{
Holder<T> newHolder = new Holder<T>(instance, nodeCache);
Holder<T> oldHolder = services.putIfAbsent(instance.getId(), newHolder);
- Holder<T> useHolder = (oldHolder != null) ? oldHolder : newHolder;
- return useHolder;
+ return (oldHolder != null) ? oldHolder : newHolder;
}
private void clean()
[10/12] curator git commit: vast simplication. Holder isn't needed.
This is better
Posted by ra...@apache.org.
vast simplication. Holder isn't needed. This is better
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/03879d1e
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/03879d1e
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/03879d1e
Branch: refs/heads/master
Commit: 03879d1e627e93bd867bb7a0fdfdd875b033560e
Parents: 1a16f82
Author: randgalt <ra...@apache.org>
Authored: Mon Apr 27 15:12:11 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Apr 27 15:12:11 2015 -0500
----------------------------------------------------------------------
.../curator/x/discovery/details/Holder.java | 72 --------
.../discovery/details/ServiceDiscoveryImpl.java | 163 +++++++------------
.../discovery/details/TestServiceDiscovery.java | 5 -
3 files changed, 57 insertions(+), 183 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/03879d1e/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/Holder.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/Holder.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/Holder.java
deleted file mode 100644
index d088f8d..0000000
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/Holder.java
+++ /dev/null
@@ -1,72 +0,0 @@
-package org.apache.curator.x.discovery.details;
-
-import org.apache.curator.framework.recipes.cache.NodeCache;
-import org.apache.curator.x.discovery.ServiceInstance;
-
-class Holder<T>
-{
- enum State
- {
- NEW,
- REGISTERED,
- UNREGISTERED
- }
-
- private ServiceInstance<T> service;
- private NodeCache cache;
- private State state;
- private long stateChangeMs;
-
- Holder(ServiceInstance<T> service, NodeCache nodeCache)
- {
- cache = nodeCache;
- this.service = service;
- setState(State.NEW);
- }
-
- synchronized ServiceInstance<T> getService()
- {
- return service;
- }
-
- synchronized ServiceInstance<T> getServiceIfRegistered()
- {
- return (state == State.REGISTERED) ? service : null;
- }
-
- synchronized void setService(ServiceInstance<T> service)
- {
- this.service = service;
- }
-
- synchronized NodeCache getAndClearCache()
- {
- NodeCache localCache = cache;
- cache = null;
- return localCache;
- }
-
- synchronized boolean isRegistered()
- {
- return state == State.REGISTERED;
- }
-
- synchronized boolean isLapsedUnregistered(int cleanThresholdMs)
- {
- if ( state == State.UNREGISTERED )
- {
- long elapsed = System.currentTimeMillis() - stateChangeMs;
- if ( elapsed >= cleanThresholdMs )
- {
- return true;
- }
- }
- return false;
- }
-
- synchronized void setState(State state)
- {
- this.state = state;
- stateChangeMs = System.currentTimeMillis();
- }
-}
http://git-wip-us.apache.org/repos/asf/curator/blob/03879d1e/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
index a35cd3a..7b2a9ec 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
@@ -21,9 +21,7 @@ package org.apache.curator.x.discovery.details;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -50,11 +48,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collection;
-import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
/**
* A mechanism to register and query service instances using ZooKeeper
@@ -66,11 +61,10 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
private final CuratorFramework client;
private final String basePath;
private final InstanceSerializer<T> serializer;
- private final ConcurrentMap<String, Holder<T>> services = Maps.newConcurrentMap();
+ private final ConcurrentMap<String, Entry<T>> services = Maps.newConcurrentMap();
private final Collection<ServiceCache<T>> caches = Sets.newSetFromMap(Maps.<ServiceCache<T>, Boolean>newConcurrentMap());
private final Collection<ServiceProvider<T>> providers = Sets.newSetFromMap(Maps.<ServiceProvider<T>, Boolean>newConcurrentMap());
private final boolean watchInstances;
- private final AtomicLong lastCleanMs = new AtomicLong(System.currentTimeMillis());
private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
{
@Override
@@ -91,7 +85,16 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
}
};
- private final int CLEAN_THRESHOLD_MS = Integer.getInteger("curator-discovery-clean-threshold-ms", (int)TimeUnit.MINUTES.toMillis(5));
+ private static class Entry<T>
+ {
+ private volatile ServiceInstance<T> service;
+ private volatile NodeCache cache;
+
+ private Entry(ServiceInstance<T> service)
+ {
+ this.service = service;
+ }
+ }
/**
* @param client the client
@@ -108,7 +111,9 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
this.serializer = Preconditions.checkNotNull(serializer, "serializer cannot be null");
if ( thisInstance != null )
{
- setService(thisInstance);
+ Entry<T> entry = new Entry<T>(thisInstance);
+ entry.cache = makeNodeCache(thisInstance);
+ services.put(thisInstance.getId(), entry);
}
}
@@ -143,11 +148,11 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
CloseableUtils.closeQuietly(provider);
}
- for ( Holder<T> holder : services.values() )
+ for ( Entry<T> entry : services.values() )
{
try
{
- internalUnregisterService(holder);
+ internalUnregisterService(entry);
}
catch ( KeeperException.NoNodeException ignore )
{
@@ -155,7 +160,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
}
catch ( Exception e )
{
- log.error("Could not unregister instance: " + holder.getService().getName(), e);
+ log.error("Could not unregister instance: " + entry.service.getName(), e);
}
}
@@ -171,26 +176,30 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
@Override
public void registerService(ServiceInstance<T> service) throws Exception
{
- clean();
-
- setService(service);
- internalRegisterService(service);
+ Entry<T> newEntry = new Entry<T>(service);
+ Entry<T> oldEntry = services.putIfAbsent(service.getId(), newEntry);
+ Entry<T> useEntry = (oldEntry != null) ? oldEntry : newEntry;
+ synchronized(useEntry)
+ {
+ if ( useEntry == newEntry ) // i.e. is new
+ {
+ useEntry.cache = makeNodeCache(service);
+ }
+ internalRegisterService(service);
+ }
}
@Override
public void updateService(final ServiceInstance<T> service) throws Exception
{
- clean();
-
- final Holder<T> holder = getOrMakeHolder(service, null);
- synchronized(holder)
+ Entry<T> entry = services.get(service.getId());
+ if ( entry == null )
{
- if ( !holder.isRegistered() )
- {
- throw new Exception("Service has been unregistered: " + service);
- }
-
- holder.setService(service);
+ throw new Exception("Service has been unregistered: " + service);
+ }
+ synchronized(entry)
+ {
+ entry.service = service;
byte[] bytes = serializer.serialize(service);
String path = pathForInstance(service.getName(), service.getId());
client.setData().forPath(path, bytes);
@@ -229,9 +238,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
@Override
public void unregisterService(ServiceInstance<T> service) throws Exception
{
- clean();
-
- internalUnregisterService(getOrMakeHolder(service, null));
+ internalUnregisterService(services.remove(service.getId()));
}
/**
@@ -242,8 +249,6 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
@Override
public ServiceProviderBuilder<T> serviceProviderBuilder()
{
- clean();
-
return new ServiceProviderBuilderImpl<T>(this)
.providerStrategy(new RoundRobinStrategy<T>())
.threadFactory(ThreadUtils.newThreadFactory("ServiceProvider"));
@@ -257,8 +262,6 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
@Override
public ServiceCacheBuilder<T> serviceCacheBuilder()
{
- clean();
-
return new ServiceCacheBuilderImpl<T>(this)
.threadFactory(ThreadUtils.newThreadFactory("ServiceCache"));
}
@@ -272,8 +275,6 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
@Override
public Collection<String> queryForNames() throws Exception
{
- clean();
-
List<String> names = client.getChildren().forPath(basePath);
return ImmutableList.copyOf(names);
}
@@ -302,8 +303,6 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
@Override
public ServiceInstance<T> queryForInstance(String name, String id) throws Exception
{
- clean();
-
String path = pathForInstance(name, id);
try
{
@@ -339,8 +338,6 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
CuratorFramework getClient()
{
- clean();
-
return client;
}
@@ -356,8 +353,6 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
List<ServiceInstance<T>> queryForInstances(String name, Watcher watcher) throws Exception
{
- clean();
-
ImmutableList.Builder<ServiceInstance<T>> builder = ImmutableList.builder();
String path = pathForName(name);
List<String> instanceIds;
@@ -392,21 +387,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
@VisibleForTesting
int debugServicesQty()
{
- return Iterables.size
- (
- Iterables.filter
- (
- services.values(),
- new Predicate<Holder<T>>()
- {
- @Override
- public boolean apply(Holder<T> holder)
- {
- return holder.isRegistered();
- }
- }
- )
- );
+ return services.size();
}
private List<String> getChildrenWatched(String path, Watcher watcher, boolean recurse) throws Exception
@@ -447,30 +428,24 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
@VisibleForTesting
ServiceInstance<T> getRegisteredService(String id)
{
- Holder<T> holder = services.get(id);
- return (holder != null) ? holder.getServiceIfRegistered() : null;
+ Entry<T> entry = services.get(id);
+ return (entry != null) ? entry.service : null;
}
private void reRegisterServices() throws Exception
{
- for ( final Holder<T> holder : services.values() )
+ for ( final Entry<T> entry : services.values() )
{
- synchronized(holder)
+ synchronized(entry)
{
- if ( holder.isRegistered() )
- {
- internalRegisterService(holder.getService());
- }
+ internalRegisterService(entry.service);
}
}
}
- private void setService(final ServiceInstance<T> instance)
+ private NodeCache makeNodeCache(final ServiceInstance<T> instance)
{
final NodeCache nodeCache = watchInstances ? new NodeCache(client, pathForInstance(instance.getName(), instance.getId())) : null;
- Holder<T> holder = getOrMakeHolder(instance, nodeCache);
- holder.setState(Holder.State.REGISTERED);
-
if ( nodeCache != null )
{
try
@@ -489,10 +464,13 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
if ( nodeCache.getCurrentData() != null )
{
ServiceInstance<T> newInstance = serializer.deserialize(nodeCache.getCurrentData().getData());
- Holder<T> holder = services.get(newInstance.getId());
- if ( holder != null )
+ Entry<T> entry = services.get(newInstance.getId());
+ if ( entry != null )
{
- holder.setService(newInstance);
+ synchronized(entry)
+ {
+ entry.service = newInstance;
+ }
}
}
else
@@ -503,49 +481,22 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
};
nodeCache.getListenable().addListener(listener);
}
+ return nodeCache;
}
- private Holder<T> getOrMakeHolder(ServiceInstance<T> instance, NodeCache nodeCache)
- {
- Holder<T> newHolder = new Holder<T>(instance, nodeCache);
- Holder<T> oldHolder = services.putIfAbsent(instance.getId(), newHolder);
- return (oldHolder != null) ? oldHolder : newHolder;
- }
-
- private void clean()
- {
- long localLastCleanMs = lastCleanMs.get();
- long now = System.currentTimeMillis();
- long elpased = now - localLastCleanMs;
- if ( (elpased >= CLEAN_THRESHOLD_MS) && lastCleanMs.compareAndSet(localLastCleanMs, now + 1) )
- {
- final Iterator<Holder<T>> iterator = services.values().iterator();
- while ( iterator.hasNext() )
- {
- Holder<T> holder = iterator.next();
- if ( holder.isLapsedUnregistered(CLEAN_THRESHOLD_MS) )
- {
- iterator.remove();
- }
- }
- }
- }
-
- private void internalUnregisterService(final Holder<T> holder) throws Exception
+ private void internalUnregisterService(final Entry<T> entry) throws Exception
{
- if ( holder != null )
+ if ( entry != null )
{
- synchronized(holder)
+ synchronized(entry)
{
- holder.setState(Holder.State.UNREGISTERED);
- NodeCache cache = holder.getAndClearCache();
- if ( cache != null )
+ if ( entry.cache != null )
{
- CloseableUtils.closeQuietly(cache);
+ CloseableUtils.closeQuietly(entry.cache);
+ entry.cache = null;
}
- ServiceInstance<T> service = holder.getService();
- String path = pathForInstance(service.getName(), service.getId());
+ String path = pathForInstance(entry.service.getName(), entry.service.getId());
try
{
client.delete().guaranteed().forPath(path);
http://git-wip-us.apache.org/repos/asf/curator/blob/03879d1e/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
index f60773f..8b1e5fc 100644
--- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
+++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
@@ -367,7 +367,6 @@ public class TestServiceDiscovery extends BaseClassForTests
@Test
public void testCleaning() throws Exception
{
- System.setProperty("curator-discovery-clean-threshold-ms", "10");
List<Closeable> closeables = Lists.newArrayList();
try
{
@@ -381,14 +380,10 @@ public class TestServiceDiscovery extends BaseClassForTests
discovery.start();
discovery.unregisterService(instance);
- Thread.sleep(100);
-
- discovery.queryForNames(); // causes a clean
Assert.assertEquals(((ServiceDiscoveryImpl)discovery).debugServicesQty(), 0);
}
finally
{
- System.clearProperty("curator-discovery-clean-threshold-ms");
Collections.reverse(closeables);
for ( Closeable c : closeables )
{
[06/12] curator git commit: moved holder into separate class so that
it's easier to reason about and lock
Posted by ra...@apache.org.
moved holder into separate class so that it's easier to reason about and lock
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/d6a51f4a
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/d6a51f4a
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/d6a51f4a
Branch: refs/heads/master
Commit: d6a51f4ae9a0365fba1ae77b9780d9bb43a79c72
Parents: c62b113
Author: randgalt <ra...@apache.org>
Authored: Tue Apr 21 17:44:53 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Apr 21 17:44:53 2015 -0500
----------------------------------------------------------------------
.../curator/x/discovery/details/Holder.java | 173 +++++++++++++++++++
.../discovery/details/ServiceDiscoveryImpl.java | 120 ++++++-------
.../discovery/details/TestServiceDiscovery.java | 3 -
3 files changed, 227 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/d6a51f4a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/Holder.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/Holder.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/Holder.java
new file mode 100644
index 0000000..69c7667
--- /dev/null
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/Holder.java
@@ -0,0 +1,173 @@
+package org.apache.curator.x.discovery.details;
+
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.apache.curator.x.discovery.ServiceInstance;
+import java.util.concurrent.locks.ReentrantLock;
+
+class Holder<T>
+{
+ enum State
+ {
+ NEW,
+ REGISTERED,
+ UNREGISTERED
+ }
+
+ private ServiceInstance<T> service;
+ private NodeCache cache;
+ private State state;
+ private long stateChangeMs;
+ private final ReentrantLock lock = new ReentrantLock();
+
+ Holder(ServiceInstance<T> service)
+ {
+ this.service = service;
+ setState(State.NEW);
+ }
+
+ ServiceInstance<T> getService()
+ {
+ lock.lock();
+ try
+ {
+ return service;
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ ServiceInstance<T> getServiceIfRegistered()
+ {
+ lock.lock();
+ try
+ {
+ return (state == State.REGISTERED) ? service : null;
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ void setService(ServiceInstance<T> service)
+ {
+ lock.lock();
+ try
+ {
+ this.service = service;
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ NodeCache getCache()
+ {
+ lock.lock();
+ try
+ {
+ return cache;
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ NodeCache getAndClearCache()
+ {
+ lock.lock();
+ try
+ {
+ NodeCache localCache = cache;
+ cache = null;
+ return localCache;
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ void setCache(NodeCache cache)
+ {
+ lock.lock();
+ try
+ {
+ this.cache = cache;
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ State getState()
+ {
+ lock.lock();
+ try
+ {
+ return state;
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ boolean isRegistered()
+ {
+ lock.lock();
+ try
+ {
+ return state == State.REGISTERED;
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ boolean isLapsedUnregistered(int cleanThresholdMs)
+ {
+ lock.lock();
+ try
+ {
+ if ( state == State.UNREGISTERED )
+ {
+ long elapsed = System.currentTimeMillis() - stateChangeMs;
+ if ( elapsed >= cleanThresholdMs )
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ void setState(State state)
+ {
+ lock.lock();
+ try
+ {
+ this.state = state;
+ stateChangeMs = System.currentTimeMillis();
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ ReentrantLock getLock()
+ {
+ return lock;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/d6a51f4a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
index ba18e42..ec049fd 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
@@ -21,7 +21,9 @@ package org.apache.curator.x.discovery.details;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -53,12 +55,10 @@ import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
/**
* A mechanism to register and query service instances using ZooKeeper
*/
-@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
{
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -92,33 +92,6 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
private final int CLEAN_THRESHOLD_MS = Integer.getInteger("curator-discovery-clean-threshold-ms", (int)TimeUnit.MINUTES.toMillis(5));
- private enum State
- {
- NEW,
- REGISTERED,
- UNREGISTERED
- }
-
- private static class Holder<T>
- {
- private final AtomicReference<ServiceInstance<T>> service = new AtomicReference<ServiceInstance<T>>();
- private final AtomicReference<NodeCache> cache = new AtomicReference<NodeCache>();
- private final AtomicReference<State> state = new AtomicReference<State>();
- private final AtomicLong stateChangeMs = new AtomicLong();
-
- public Holder(ServiceInstance<T> instance)
- {
- service.set(instance);
- setState(State.NEW);
- }
-
- public void setState(State state)
- {
- this.state.set(state);
- stateChangeMs.set(System.currentTimeMillis());
- }
- }
-
/**
* @param client the client
* @param basePath base path to store data
@@ -181,7 +154,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
}
catch ( Exception e )
{
- log.error("Could not unregister instance: " + holder.service.get().getName(), e);
+ log.error("Could not unregister instance: " + holder.getService().getName(), e);
}
}
@@ -204,23 +177,28 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
}
@Override
- public void updateService(ServiceInstance<T> service) throws Exception
+ public void updateService(final ServiceInstance<T> service) throws Exception
{
clean();
- final Holder<T> holder = getOrMakeHolder(service, null);
- synchronized(holder)
+ Holder<T> holder = getOrMakeHolder(service, null);
+ holder.getLock().lock();
+ try
{
- if ( holder.state.get() == State.UNREGISTERED )
+ if ( holder.getState() == Holder.State.UNREGISTERED )
{
throw new Exception("Service has been unregistered: " + service);
}
- holder.service.set(service);
+ holder.setService(service);
byte[] bytes = serializer.serialize(service);
String path = pathForInstance(service.getName(), service.getId());
client.setData().forPath(path, bytes);
}
+ finally
+ {
+ holder.getLock().unlock();
+ }
}
@VisibleForTesting
@@ -418,7 +396,21 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
@VisibleForTesting
int debugServicesQty()
{
- return services.size();
+ return Iterables.size
+ (
+ Iterables.filter
+ (
+ services.values(),
+ new Predicate<Holder<T>>()
+ {
+ @Override
+ public boolean apply(Holder<T> holder)
+ {
+ return holder.isRegistered();
+ }
+ }
+ )
+ );
}
private List<String> getChildrenWatched(String path, Watcher watcher, boolean recurse) throws Exception
@@ -459,28 +451,26 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
@VisibleForTesting
ServiceInstance<T> getRegisteredService(String id)
{
- final Holder<T> holder = services.get(id);
- if ( holder != null )
- {
- synchronized(holder)
- {
- return (holder.state.get() == State.REGISTERED) ? holder.service.get() : null;
- }
- }
- return null;
+ Holder<T> holder = services.get(id);
+ return (holder != null) ? holder.getServiceIfRegistered() : null;
}
private void reRegisterServices() throws Exception
{
for ( final Holder<T> holder : services.values() )
{
- synchronized(holder)
+ holder.getLock().lock();
+ try
{
- if ( holder.state.get() == State.REGISTERED )
+ if ( holder.isRegistered() )
{
- internalRegisterService(holder.service.get());
+ internalRegisterService(holder.getService());
}
}
+ finally
+ {
+ holder.getLock().unlock();
+ }
}
}
@@ -488,7 +478,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
{
final NodeCache nodeCache = watchInstances ? new NodeCache(client, pathForInstance(instance.getName(), instance.getId())) : null;
Holder<T> holder = getOrMakeHolder(instance, nodeCache);
- holder.setState(State.REGISTERED);
+ holder.setState(Holder.State.REGISTERED);
if ( nodeCache != null )
{
@@ -511,7 +501,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
Holder<T> holder = services.get(newInstance.getId());
if ( holder != null )
{
- holder.service.set(newInstance);
+ holder.setService(newInstance);
}
}
else
@@ -529,7 +519,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
Holder<T> newHolder = new Holder<T>(instance);
Holder<T> oldHolder = services.putIfAbsent(instance.getId(), newHolder);
Holder<T> useHolder = (oldHolder != null) ? oldHolder : newHolder;
- useHolder.cache.set(nodeCache);
+ useHolder.setCache(nodeCache);
return useHolder;
}
@@ -540,20 +530,13 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
long elpased = now - localLastCleanMs;
if ( (elpased >= CLEAN_THRESHOLD_MS) && lastCleanMs.compareAndSet(localLastCleanMs, now + 1) )
{
- Iterator<Holder<T>> iterator = services.values().iterator();
+ final Iterator<Holder<T>> iterator = services.values().iterator();
while ( iterator.hasNext() )
{
- final Holder<T> holder = iterator.next();
- synchronized(holder)
+ Holder<T> holder = iterator.next();
+ if ( holder.isLapsedUnregistered(CLEAN_THRESHOLD_MS) )
{
- if ( holder.state.get() == State.UNREGISTERED )
- {
- long elapsed = System.currentTimeMillis() - holder.stateChangeMs.get();
- if ( elapsed >= CLEAN_THRESHOLD_MS )
- {
- iterator.remove();
- }
- }
+ iterator.remove();
}
}
}
@@ -563,16 +546,17 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
{
if ( holder != null )
{
- synchronized(holder)
+ holder.getLock().lock();
+ try
{
- holder.setState(State.UNREGISTERED);
- NodeCache cache = holder.cache.getAndSet(null);
+ holder.setState(Holder.State.UNREGISTERED);
+ NodeCache cache = holder.getAndClearCache();
if ( cache != null )
{
CloseableUtils.closeQuietly(cache);
}
- ServiceInstance<T> service = holder.service.get();
+ ServiceInstance<T> service = holder.getService();
String path = pathForInstance(service.getName(), service.getId());
try
{
@@ -583,6 +567,10 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
// ignore
}
}
+ finally
+ {
+ holder.getLock().unlock();
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/d6a51f4a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
index 2808c5c..f60773f 100644
--- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
+++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
@@ -31,9 +31,6 @@ import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
-import org.apache.curator.x.discovery.details.InstanceSerializer;
-import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
-import org.apache.curator.x.discovery.details.ServiceDiscoveryImpl;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.io.Closeable;
[05/12] curator git commit: sync on holder for safety during
multi-step operations
Posted by ra...@apache.org.
sync on holder for safety during multi-step operations
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/c62b1137
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/c62b1137
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/c62b1137
Branch: refs/heads/master
Commit: c62b1137fa25104f2e24d65e467d0cfc769bd6e2
Parents: fa0c9da
Author: randgalt <ra...@apache.org>
Authored: Tue Apr 21 16:47:10 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Apr 21 16:47:10 2015 -0500
----------------------------------------------------------------------
.../discovery/details/ServiceDiscoveryImpl.java | 86 ++++++++++++--------
1 file changed, 53 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/c62b1137/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
index 80b012e..ba18e42 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
@@ -58,6 +58,7 @@ import java.util.concurrent.atomic.AtomicReference;
/**
* A mechanism to register and query service instances using ZooKeeper
*/
+@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
{
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -207,16 +208,19 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
{
clean();
- Holder<T> holder = getOrMakeHolder(service, null);
- if ( holder.state.get() == State.UNREGISTERED )
+ final Holder<T> holder = getOrMakeHolder(service, null);
+ synchronized(holder)
{
- throw new Exception("Service has been unregistered: " + service);
- }
+ if ( holder.state.get() == State.UNREGISTERED )
+ {
+ throw new Exception("Service has been unregistered: " + service);
+ }
- holder.service.set(service);
- byte[] bytes = serializer.serialize(service);
- String path = pathForInstance(service.getName(), service.getId());
- client.setData().forPath(path, bytes);
+ holder.service.set(service);
+ byte[] bytes = serializer.serialize(service);
+ String path = pathForInstance(service.getName(), service.getId());
+ client.setData().forPath(path, bytes);
+ }
}
@VisibleForTesting
@@ -455,17 +459,27 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
@VisibleForTesting
ServiceInstance<T> getRegisteredService(String id)
{
- Holder<T> holder = services.get(id);
- return ((holder != null) && (holder.state.get() == State.REGISTERED)) ? holder.service.get() : null;
+ final Holder<T> holder = services.get(id);
+ if ( holder != null )
+ {
+ synchronized(holder)
+ {
+ return (holder.state.get() == State.REGISTERED) ? holder.service.get() : null;
+ }
+ }
+ return null;
}
private void reRegisterServices() throws Exception
{
- for ( Holder<T> service : services.values() )
+ for ( final Holder<T> holder : services.values() )
{
- if ( service.state.get() == State.REGISTERED )
+ synchronized(holder)
{
- internalRegisterService(service.service.get());
+ if ( holder.state.get() == State.REGISTERED )
+ {
+ internalRegisterService(holder.service.get());
+ }
}
}
}
@@ -529,39 +543,45 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
Iterator<Holder<T>> iterator = services.values().iterator();
while ( iterator.hasNext() )
{
- Holder<T> holder = iterator.next();
- if ( holder.state.get() == State.UNREGISTERED )
+ final Holder<T> holder = iterator.next();
+ synchronized(holder)
{
- long elapsed = System.currentTimeMillis() - holder.stateChangeMs.get();
- if ( elapsed >= CLEAN_THRESHOLD_MS )
+ if ( holder.state.get() == State.UNREGISTERED )
{
- iterator.remove();
+ long elapsed = System.currentTimeMillis() - holder.stateChangeMs.get();
+ if ( elapsed >= CLEAN_THRESHOLD_MS )
+ {
+ iterator.remove();
+ }
}
}
}
}
}
- private void internalUnregisterService(Holder<T> holder) throws Exception
+ private void internalUnregisterService(final Holder<T> holder) throws Exception
{
if ( holder != null )
{
- holder.setState(State.UNREGISTERED);
- NodeCache cache = holder.cache.getAndSet(null);
- if ( cache != null )
+ synchronized(holder)
{
- CloseableUtils.closeQuietly(cache);
- }
+ holder.setState(State.UNREGISTERED);
+ NodeCache cache = holder.cache.getAndSet(null);
+ if ( cache != null )
+ {
+ CloseableUtils.closeQuietly(cache);
+ }
- ServiceInstance<T> service = holder.service.get();
- String path = pathForInstance(service.getName(), service.getId());
- try
- {
- client.delete().guaranteed().forPath(path);
- }
- catch ( KeeperException.NoNodeException ignore )
- {
- // ignore
+ ServiceInstance<T> service = holder.service.get();
+ String path = pathForInstance(service.getName(), service.getId());
+ try
+ {
+ client.delete().guaranteed().forPath(path);
+ }
+ catch ( KeeperException.NoNodeException ignore )
+ {
+ // ignore
+ }
}
}
}
[04/12] curator git commit: CLEAN_THRESHOLD_MS can't be static due to
testing
Posted by ra...@apache.org.
CLEAN_THRESHOLD_MS can't be static due to testing
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/fa0c9da0
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/fa0c9da0
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/fa0c9da0
Branch: refs/heads/master
Commit: fa0c9da0b022eedc0b0608062433244ad1b3fde5
Parents: f489dfe
Author: randgalt <ra...@apache.org>
Authored: Tue Apr 21 15:23:42 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Apr 21 15:23:42 2015 -0500
----------------------------------------------------------------------
.../apache/curator/x/discovery/details/ServiceDiscoveryImpl.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/fa0c9da0/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
index f53c7ce..80b012e 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
@@ -89,7 +89,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
}
};
- private static final int CLEAN_THRESHOLD_MS = Integer.getInteger("curator-discovery-clean-threshold-ms", (int)TimeUnit.MINUTES.toMillis(5));
+ private final int CLEAN_THRESHOLD_MS = Integer.getInteger("curator-discovery-clean-threshold-ms", (int)TimeUnit.MINUTES.toMillis(5));
private enum State
{
[12/12] curator git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/curator
Posted by ra...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/curator
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/06af6ff1
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/06af6ff1
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/06af6ff1
Branch: refs/heads/master
Commit: 06af6ff1cc7f4f4de89571dfd7024efe7fd03d38
Parents: 8b250cc 6e16d0d
Author: randgalt <ra...@apache.org>
Authored: Mon Apr 27 17:09:46 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Apr 27 17:09:46 2015 -0500
----------------------------------------------------------------------
.../recipes/nodes/PersistentEphemeralNode.java | 52 +++++++++++--
.../nodes/TestPersistentEphemeralNode.java | 82 +++++++++++++++++++-
2 files changed, 122 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
[07/12] curator git commit: removed some unneeded APIs
Posted by ra...@apache.org.
removed some unneeded APIs
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/16650843
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/16650843
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/16650843
Branch: refs/heads/master
Commit: 1665084376546b518965294aab2e32e2ad24d6ac
Parents: d6a51f4
Author: randgalt <ra...@apache.org>
Authored: Tue Apr 21 17:49:53 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Apr 21 17:49:53 2015 -0500
----------------------------------------------------------------------
.../curator/x/discovery/details/Holder.java | 42 +-------------------
.../discovery/details/ServiceDiscoveryImpl.java | 5 +--
2 files changed, 4 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/16650843/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/Holder.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/Holder.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/Holder.java
index 69c7667..c126b29 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/Holder.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/Holder.java
@@ -19,8 +19,9 @@ class Holder<T>
private long stateChangeMs;
private final ReentrantLock lock = new ReentrantLock();
- Holder(ServiceInstance<T> service)
+ Holder(ServiceInstance<T> service, NodeCache nodeCache)
{
+ cache = nodeCache;
this.service = service;
setState(State.NEW);
}
@@ -64,19 +65,6 @@ class Holder<T>
}
}
- NodeCache getCache()
- {
- lock.lock();
- try
- {
- return cache;
- }
- finally
- {
- lock.unlock();
- }
- }
-
NodeCache getAndClearCache()
{
lock.lock();
@@ -92,32 +80,6 @@ class Holder<T>
}
}
- void setCache(NodeCache cache)
- {
- lock.lock();
- try
- {
- this.cache = cache;
- }
- finally
- {
- lock.unlock();
- }
- }
-
- State getState()
- {
- lock.lock();
- try
- {
- return state;
- }
- finally
- {
- lock.unlock();
- }
- }
-
boolean isRegistered()
{
lock.lock();
http://git-wip-us.apache.org/repos/asf/curator/blob/16650843/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
index ec049fd..f15a387 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
@@ -185,7 +185,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
holder.getLock().lock();
try
{
- if ( holder.getState() == Holder.State.UNREGISTERED )
+ if ( !holder.isRegistered() )
{
throw new Exception("Service has been unregistered: " + service);
}
@@ -516,10 +516,9 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
private Holder<T> getOrMakeHolder(ServiceInstance<T> instance, NodeCache nodeCache)
{
- Holder<T> newHolder = new Holder<T>(instance);
+ Holder<T> newHolder = new Holder<T>(instance, nodeCache);
Holder<T> oldHolder = services.putIfAbsent(instance.getId(), newHolder);
Holder<T> useHolder = (oldHolder != null) ? oldHolder : newHolder;
- useHolder.setCache(nodeCache);
return useHolder;
}
[11/12] curator git commit: some reformatting
Posted by ra...@apache.org.
some reformatting
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/8b250ccd
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/8b250ccd
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/8b250ccd
Branch: refs/heads/master
Commit: 8b250ccd60d1536a930cf4102b36fa0098ed486a
Parents: 03879d1
Author: randgalt <ra...@apache.org>
Authored: Mon Apr 27 16:07:22 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Apr 27 16:07:22 2015 -0500
----------------------------------------------------------------------
.../discovery/details/ServiceDiscoveryImpl.java | 63 ++++++++++----------
1 file changed, 33 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/8b250ccd/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
index 7b2a9ec..7b0bffe 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
@@ -195,7 +195,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
Entry<T> entry = services.get(service.getId());
if ( entry == null )
{
- throw new Exception("Service has been unregistered: " + service);
+ throw new Exception("Service not registered: " + service);
}
synchronized(entry)
{
@@ -238,7 +238,8 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
@Override
public void unregisterService(ServiceInstance<T> service) throws Exception
{
- internalUnregisterService(services.remove(service.getId()));
+ Entry<T> entry = services.remove(service.getId());
+ internalUnregisterService(entry);
}
/**
@@ -445,42 +446,44 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
private NodeCache makeNodeCache(final ServiceInstance<T> instance)
{
- final NodeCache nodeCache = watchInstances ? new NodeCache(client, pathForInstance(instance.getName(), instance.getId())) : null;
- if ( nodeCache != null )
+ if ( !watchInstances )
{
- try
- {
- nodeCache.start(true);
- }
- catch ( Exception e )
- {
- log.error("Could not start node cache for: " + instance, e);
- }
- NodeCacheListener listener = new NodeCacheListener()
+ return null;
+ }
+
+ final NodeCache nodeCache = new NodeCache(client, pathForInstance(instance.getName(), instance.getId()));
+ try
+ {
+ nodeCache.start(true);
+ }
+ catch ( Exception e )
+ {
+ log.error("Could not start node cache for: " + instance, e);
+ }
+ NodeCacheListener listener = new NodeCacheListener()
+ {
+ @Override
+ public void nodeChanged() throws Exception
{
- @Override
- public void nodeChanged() throws Exception
+ if ( nodeCache.getCurrentData() != null )
{
- if ( nodeCache.getCurrentData() != null )
+ ServiceInstance<T> newInstance = serializer.deserialize(nodeCache.getCurrentData().getData());
+ Entry<T> entry = services.get(newInstance.getId());
+ if ( entry != null )
{
- ServiceInstance<T> newInstance = serializer.deserialize(nodeCache.getCurrentData().getData());
- Entry<T> entry = services.get(newInstance.getId());
- if ( entry != null )
+ synchronized(entry)
{
- synchronized(entry)
- {
- entry.service = newInstance;
- }
+ entry.service = newInstance;
}
}
- else
- {
- log.warn("Instance data has been deleted for: " + instance);
- }
}
- };
- nodeCache.getListenable().addListener(listener);
- }
+ else
+ {
+ log.warn("Instance data has been deleted for: " + instance);
+ }
+ }
+ };
+ nodeCache.getListenable().addListener(listener);
return nodeCache;
}