You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2015/04/28 00:09:55 UTC
[03/12] curator git commit: Concurrent registrations/unregistrations
and connection issues can cause inconsistent state. Change to a model whereby
'unregistering' an instance doesn't remove it from management but changes the
state. Instance will still be
Concurrent registrations/unregistrations and connection issues can cause inconsistent state. Change to a model whereby 'unregistering' an instance doesn't remove it from management but changes the state. Instance will still be managed for a period of time and clean after a reasonable period
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/f489dfeb
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/f489dfeb
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/f489dfeb
Branch: refs/heads/master
Commit: f489dfebeed4ecf004ded37a0f05a0a8a2dc7e6d
Parents: 0178c83
Author: randgalt <ra...@apache.org>
Authored: Tue Apr 21 15:13:26 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Apr 21 15:13:26 2015 -0500
----------------------------------------------------------------------
.../discovery/details/ServiceDiscoveryImpl.java | 190 ++++++---
.../x/discovery/TestServiceDiscovery.java | 368 -----------------
.../discovery/details/TestServiceDiscovery.java | 402 +++++++++++++++++++
3 files changed, 546 insertions(+), 414 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/f489dfeb/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
index 824eb75..f53c7ce 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
@@ -50,7 +50,10 @@ import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
/**
* A mechanism to register and query service instances using ZooKeeper
@@ -61,10 +64,11 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
private final CuratorFramework client;
private final String basePath;
private final InstanceSerializer<T> serializer;
- private final Map<String, ServiceInstance<T>> services = Maps.newConcurrentMap();
- private final Map<String, NodeCache> watchedServices;
+ private final ConcurrentMap<String, Holder<T>> services = Maps.newConcurrentMap();
private final Collection<ServiceCache<T>> caches = Sets.newSetFromMap(Maps.<ServiceCache<T>, Boolean>newConcurrentMap());
private final Collection<ServiceProvider<T>> providers = Sets.newSetFromMap(Maps.<ServiceProvider<T>, Boolean>newConcurrentMap());
+ private final boolean watchInstances;
+ private final AtomicLong lastCleanMs = new AtomicLong(System.currentTimeMillis());
private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
{
@Override
@@ -85,6 +89,35 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
}
};
+ private static final int CLEAN_THRESHOLD_MS = Integer.getInteger("curator-discovery-clean-threshold-ms", (int)TimeUnit.MINUTES.toMillis(5));
+
+ private enum State
+ {
+ NEW,
+ REGISTERED,
+ UNREGISTERED
+ }
+
+ private static class Holder<T>
+ {
+ private final AtomicReference<ServiceInstance<T>> service = new AtomicReference<ServiceInstance<T>>();
+ private final AtomicReference<NodeCache> cache = new AtomicReference<NodeCache>();
+ private final AtomicReference<State> state = new AtomicReference<State>();
+ private final AtomicLong stateChangeMs = new AtomicLong();
+
+ public Holder(ServiceInstance<T> instance)
+ {
+ service.set(instance);
+ setState(State.NEW);
+ }
+
+ public void setState(State state)
+ {
+ this.state.set(state);
+ stateChangeMs.set(System.currentTimeMillis());
+ }
+ }
+
/**
* @param client the client
* @param basePath base path to store data
@@ -94,10 +127,10 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
*/
public ServiceDiscoveryImpl(CuratorFramework client, String basePath, InstanceSerializer<T> serializer, ServiceInstance<T> thisInstance, boolean watchInstances)
{
+ this.watchInstances = watchInstances;
this.client = Preconditions.checkNotNull(client, "client cannot be null");
this.basePath = Preconditions.checkNotNull(basePath, "basePath cannot be null");
this.serializer = Preconditions.checkNotNull(serializer, "serializer cannot be null");
- watchedServices = watchInstances ? Maps.<String, NodeCache>newConcurrentMap() : null;
if ( thisInstance != null )
{
setService(thisInstance);
@@ -134,26 +167,12 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
{
CloseableUtils.closeQuietly(provider);
}
- if ( watchedServices != null )
- {
- for ( NodeCache nodeCache : watchedServices.values() )
- {
- CloseableUtils.closeQuietly(nodeCache);
- }
- }
- Iterator<ServiceInstance<T>> it = services.values().iterator();
- while ( it.hasNext() )
+ for ( Holder<T> holder : services.values() )
{
- // Should not use unregisterService because of potential ConcurrentModificationException
- // so we in-line the bulk of the method here
- ServiceInstance<T> service = it.next();
- String path = pathForInstance(service.getName(), service.getId());
- boolean doRemove = true;
-
try
{
- client.delete().forPath(path);
+ internalUnregisterService(holder);
}
catch ( KeeperException.NoNodeException ignore )
{
@@ -161,13 +180,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
}
catch ( Exception e )
{
- doRemove = false;
- log.error("Could not unregister instance: " + service.getName(), e);
- }
-
- if ( doRemove )
- {
- it.remove();
+ log.error("Could not unregister instance: " + holder.service.get().getName(), e);
}
}
@@ -183,6 +196,8 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
@Override
public void registerService(ServiceInstance<T> service) throws Exception
{
+ clean();
+
setService(service);
internalRegisterService(service);
}
@@ -190,10 +205,18 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
@Override
public void updateService(ServiceInstance<T> service) throws Exception
{
+ clean();
+
+ Holder<T> holder = getOrMakeHolder(service, null);
+ if ( holder.state.get() == State.UNREGISTERED )
+ {
+ throw new Exception("Service has been unregistered: " + service);
+ }
+
+ holder.service.set(service);
byte[] bytes = serializer.serialize(service);
String path = pathForInstance(service.getName(), service.getId());
client.setData().forPath(path, bytes);
- services.put(service.getId(), service);
}
@VisibleForTesting
@@ -228,17 +251,9 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
@Override
public void unregisterService(ServiceInstance<T> service) throws Exception
{
- services.remove(service.getId());
+ clean();
- String path = pathForInstance(service.getName(), service.getId());
- try
- {
- client.delete().guaranteed().forPath(path);
- }
- catch ( KeeperException.NoNodeException ignore )
- {
- // ignore
- }
+ internalUnregisterService(getOrMakeHolder(service, null));
}
/**
@@ -249,6 +264,8 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
@Override
public ServiceProviderBuilder<T> serviceProviderBuilder()
{
+ clean();
+
return new ServiceProviderBuilderImpl<T>(this)
.providerStrategy(new RoundRobinStrategy<T>())
.threadFactory(ThreadUtils.newThreadFactory("ServiceProvider"));
@@ -262,6 +279,8 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
@Override
public ServiceCacheBuilder<T> serviceCacheBuilder()
{
+ clean();
+
return new ServiceCacheBuilderImpl<T>(this)
.threadFactory(ThreadUtils.newThreadFactory("ServiceCache"));
}
@@ -275,6 +294,8 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
@Override
public Collection<String> queryForNames() throws Exception
{
+ clean();
+
List<String> names = client.getChildren().forPath(basePath);
return ImmutableList.copyOf(names);
}
@@ -303,6 +324,8 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
@Override
public ServiceInstance<T> queryForInstance(String name, String id) throws Exception
{
+ clean();
+
String path = pathForInstance(name, id);
try
{
@@ -338,6 +361,8 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
CuratorFramework getClient()
{
+ clean();
+
return client;
}
@@ -353,6 +378,8 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
List<ServiceInstance<T>> queryForInstances(String name, Watcher watcher) throws Exception
{
+ clean();
+
ImmutableList.Builder<ServiceInstance<T>> builder = ImmutableList.builder();
String path = pathForName(name);
List<String> instanceIds;
@@ -384,6 +411,12 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
return builder.build();
}
+ @VisibleForTesting
+ int debugServicesQty()
+ {
+ return services.size();
+ }
+
private List<String> getChildrenWatched(String path, Watcher watcher, boolean recurse) throws Exception
{
List<String> instanceIds;
@@ -422,23 +455,29 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
@VisibleForTesting
ServiceInstance<T> getRegisteredService(String id)
{
- return services.get(id);
+ Holder<T> holder = services.get(id);
+ return ((holder != null) && (holder.state.get() == State.REGISTERED)) ? holder.service.get() : null;
}
private void reRegisterServices() throws Exception
{
- for ( ServiceInstance<T> service : services.values() )
+ for ( Holder<T> service : services.values() )
{
- internalRegisterService(service);
+ if ( service.state.get() == State.REGISTERED )
+ {
+ internalRegisterService(service.service.get());
+ }
}
}
private void setService(final ServiceInstance<T> instance)
{
- services.put(instance.getId(), instance);
- if ( watchedServices != null )
+ final NodeCache nodeCache = watchInstances ? new NodeCache(client, pathForInstance(instance.getName(), instance.getId())) : null;
+ Holder<T> holder = getOrMakeHolder(instance, nodeCache);
+ holder.setState(State.REGISTERED);
+
+ if ( nodeCache != null )
{
- final NodeCache nodeCache = new NodeCache(client, pathForInstance(instance.getName(), instance.getId()));
try
{
nodeCache.start(true);
@@ -455,7 +494,11 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
if ( nodeCache.getCurrentData() != null )
{
ServiceInstance<T> newInstance = serializer.deserialize(nodeCache.getCurrentData().getData());
- services.put(newInstance.getId(), newInstance);
+ Holder<T> holder = services.get(newInstance.getId());
+ if ( holder != null )
+ {
+ holder.service.set(newInstance);
+ }
}
else
{
@@ -464,7 +507,62 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
}
};
nodeCache.getListenable().addListener(listener);
- watchedServices.put(instance.getId(), nodeCache);
+ }
+ }
+
+ private Holder<T> getOrMakeHolder(ServiceInstance<T> instance, NodeCache nodeCache)
+ {
+ Holder<T> newHolder = new Holder<T>(instance);
+ Holder<T> oldHolder = services.putIfAbsent(instance.getId(), newHolder);
+ Holder<T> useHolder = (oldHolder != null) ? oldHolder : newHolder;
+ useHolder.cache.set(nodeCache);
+ return useHolder;
+ }
+
+ private void clean()
+ {
+ long localLastCleanMs = lastCleanMs.get();
+ long now = System.currentTimeMillis();
+ long elpased = now - localLastCleanMs;
+ if ( (elpased >= CLEAN_THRESHOLD_MS) && lastCleanMs.compareAndSet(localLastCleanMs, now + 1) )
+ {
+ Iterator<Holder<T>> iterator = services.values().iterator();
+ while ( iterator.hasNext() )
+ {
+ Holder<T> holder = iterator.next();
+ if ( holder.state.get() == State.UNREGISTERED )
+ {
+ long elapsed = System.currentTimeMillis() - holder.stateChangeMs.get();
+ if ( elapsed >= CLEAN_THRESHOLD_MS )
+ {
+ iterator.remove();
+ }
+ }
+ }
+ }
+ }
+
+ private void internalUnregisterService(Holder<T> holder) throws Exception
+ {
+ if ( holder != null )
+ {
+ holder.setState(State.UNREGISTERED);
+ NodeCache cache = holder.cache.getAndSet(null);
+ if ( cache != null )
+ {
+ CloseableUtils.closeQuietly(cache);
+ }
+
+ ServiceInstance<T> service = holder.service.get();
+ String path = pathForInstance(service.getName(), service.getId());
+ try
+ {
+ client.delete().guaranteed().forPath(path);
+ }
+ catch ( KeeperException.NoNodeException ignore )
+ {
+ // ignore
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/f489dfeb/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java
deleted file mode 100644
index 3b45494..0000000
--- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java
+++ /dev/null
@@ -1,368 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.curator.x.discovery;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.RetryOneTime;
-import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.test.KillSession;
-import org.apache.curator.test.Timing;
-import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.x.discovery.details.InstanceSerializer;
-import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
-import org.apache.curator.x.discovery.details.ServiceDiscoveryImpl;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-import java.io.Closeable;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-
-public class TestServiceDiscovery extends BaseClassForTests
-{
- private static final Comparator<ServiceInstance<Void>> comparator = new Comparator<ServiceInstance<Void>>()
- {
- @Override
- public int compare(ServiceInstance<Void> o1, ServiceInstance<Void> o2)
- {
- return o1.getId().compareTo(o2.getId());
- }
- };
-
- @Test
- public void testCrashedServerMultiInstances() throws Exception
- {
- List<Closeable> closeables = Lists.newArrayList();
- try
- {
- Timing timing = new Timing();
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- closeables.add(client);
- client.start();
-
- final Semaphore semaphore = new Semaphore(0);
- ServiceInstance<String> instance1 = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
- ServiceInstance<String> instance2 = ServiceInstance.<String>builder().payload("thing").name("test").port(10065).build();
- ServiceDiscovery<String> discovery = new ServiceDiscoveryImpl<String>(client, "/test", new JsonInstanceSerializer<String>(String.class), instance1, false)
- {
- @Override
- protected void internalRegisterService(ServiceInstance<String> service) throws Exception
- {
- super.internalRegisterService(service);
- semaphore.release();
- }
- };
- closeables.add(discovery);
- discovery.start();
- discovery.registerService(instance2);
-
- timing.acquireSemaphore(semaphore, 2);
- Assert.assertEquals(discovery.queryForInstances("test").size(), 2);
-
- KillSession.kill(client.getZookeeperClient().getZooKeeper(), server.getConnectString());
- server.stop();
-
- server.restart();
- closeables.add(server);
-
- timing.acquireSemaphore(semaphore, 2);
- Assert.assertEquals(discovery.queryForInstances("test").size(), 2);
- }
- finally
- {
- for ( Closeable c : closeables )
- {
- CloseableUtils.closeQuietly(c);
- }
- }
- }
-
- @Test
- public void testCrashedServer() throws Exception
- {
- List<Closeable> closeables = Lists.newArrayList();
- try
- {
- Timing timing = new Timing();
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- closeables.add(client);
- client.start();
-
- final Semaphore semaphore = new Semaphore(0);
- ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
- ServiceDiscovery<String> discovery = new ServiceDiscoveryImpl<String>(client, "/test", new JsonInstanceSerializer<String>(String.class), instance, false)
- {
- @Override
- protected void internalRegisterService(ServiceInstance<String> service) throws Exception
- {
- super.internalRegisterService(service);
- semaphore.release();
- }
- };
- closeables.add(discovery);
- discovery.start();
-
- timing.acquireSemaphore(semaphore);
- Assert.assertEquals(discovery.queryForInstances("test").size(), 1);
-
- KillSession.kill(client.getZookeeperClient().getZooKeeper(), server.getConnectString());
- server.stop();
-
- server.restart();
- closeables.add(server);
-
- timing.acquireSemaphore(semaphore);
- Assert.assertEquals(discovery.queryForInstances("test").size(), 1);
- }
- finally
- {
- for ( Closeable c : closeables )
- {
- CloseableUtils.closeQuietly(c);
- }
- }
- }
-
- @Test
- public void testCrashedInstance() throws Exception
- {
- List<Closeable> closeables = Lists.newArrayList();
- try
- {
- Timing timing = new Timing();
-
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- closeables.add(client);
- client.start();
-
- ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
- ServiceDiscovery<String> discovery = new ServiceDiscoveryImpl<String>(client, "/test", new JsonInstanceSerializer<String>(String.class), instance, false);
- closeables.add(discovery);
- discovery.start();
-
- Assert.assertEquals(discovery.queryForInstances("test").size(), 1);
-
- KillSession.kill(client.getZookeeperClient().getZooKeeper(), server.getConnectString());
- Thread.sleep(timing.multiple(1.5).session());
-
- Assert.assertEquals(discovery.queryForInstances("test").size(), 1);
- }
- finally
- {
- Collections.reverse(closeables);
- for ( Closeable c : closeables )
- {
- CloseableUtils.closeQuietly(c);
- }
- }
- }
-
- @Test
- public void testMultipleInstances() throws Exception
- {
- final String SERVICE_ONE = "one";
- final String SERVICE_TWO = "two";
-
- List<Closeable> closeables = Lists.newArrayList();
- try
- {
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
- closeables.add(client);
- client.start();
-
- ServiceInstance<Void> s1_i1 = ServiceInstance.<Void>builder().name(SERVICE_ONE).build();
- ServiceInstance<Void> s1_i2 = ServiceInstance.<Void>builder().name(SERVICE_ONE).build();
- ServiceInstance<Void> s2_i1 = ServiceInstance.<Void>builder().name(SERVICE_TWO).build();
- ServiceInstance<Void> s2_i2 = ServiceInstance.<Void>builder().name(SERVICE_TWO).build();
-
- ServiceDiscovery<Void> discovery = ServiceDiscoveryBuilder.builder(Void.class).client(client).basePath("/test").build();
- closeables.add(discovery);
- discovery.start();
-
- discovery.registerService(s1_i1);
- discovery.registerService(s1_i2);
- discovery.registerService(s2_i1);
- discovery.registerService(s2_i2);
-
- Assert.assertEquals(Sets.newHashSet(discovery.queryForNames()), Sets.newHashSet(SERVICE_ONE, SERVICE_TWO));
-
- List<ServiceInstance<Void>> list = Lists.newArrayList();
- list.add(s1_i1);
- list.add(s1_i2);
- Collections.sort(list, comparator);
- List<ServiceInstance<Void>> queriedInstances = Lists.newArrayList(discovery.queryForInstances(SERVICE_ONE));
- Collections.sort(queriedInstances, comparator);
- Assert.assertEquals(queriedInstances, list, String.format("Not equal l: %s - d: %s", list, queriedInstances));
-
- list.clear();
-
- list.add(s2_i1);
- list.add(s2_i2);
- Collections.sort(list, comparator);
- queriedInstances = Lists.newArrayList(discovery.queryForInstances(SERVICE_TWO));
- Collections.sort(queriedInstances, comparator);
- Assert.assertEquals(queriedInstances, list, String.format("Not equal 2: %s - d: %s", list, queriedInstances));
- }
- finally
- {
- Collections.reverse(closeables);
- for ( Closeable c : closeables )
- {
- CloseableUtils.closeQuietly(c);
- }
- }
- }
-
- @Test
- public void testBasic() throws Exception
- {
- List<Closeable> closeables = Lists.newArrayList();
- try
- {
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
- closeables.add(client);
- client.start();
-
- ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
- ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build();
- closeables.add(discovery);
- discovery.start();
-
- Assert.assertEquals(discovery.queryForNames(), Arrays.asList("test"));
-
- List<ServiceInstance<String>> list = Lists.newArrayList();
- list.add(instance);
- Assert.assertEquals(discovery.queryForInstances("test"), list);
- }
- finally
- {
- Collections.reverse(closeables);
- for ( Closeable c : closeables )
- {
- CloseableUtils.closeQuietly(c);
- }
- }
- }
-
- @Test
- public void testNoServerOnStart() throws Exception
- {
- server.stop();
- List<Closeable> closeables = Lists.newArrayList();
- try
- {
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
- closeables.add(client);
- client.start();
-
- ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
- ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build();
- closeables.add(discovery);
- discovery.start();
-
- server.restart();
- Assert.assertEquals(discovery.queryForNames(), Arrays.asList("test"));
-
- List<ServiceInstance<String>> list = Lists.newArrayList();
- list.add(instance);
- Assert.assertEquals(discovery.queryForInstances("test"), list);
- }
- finally
- {
- Collections.reverse(closeables);
- for ( Closeable c : closeables )
- {
- CloseableUtils.closeQuietly(c);
- }
- }
- }
-
- // CURATOR-164
- @Test
- public void testUnregisterService() throws Exception
- {
- final String name = "name";
-
- final CountDownLatch restartLatch = new CountDownLatch(1);
- List<Closeable> closeables = Lists.newArrayList();
-
- InstanceSerializer<String> slowSerializer = new JsonInstanceSerializer<String>(String.class)
- {
- private boolean first = true;
-
- @Override
- public byte[] serialize(ServiceInstance<String> instance) throws Exception
- {
- if ( first )
- {
- System.out.println("Serializer first registration.");
- first = false;
- }
- else
- {
- System.out.println("Waiting for reconnect to finish.");
- // Simulate the serialize method being slow.
- // This could just be a timed wait, but that's kind of non-deterministic.
- restartLatch.await();
- }
- return super.serialize(instance);
- }
- };
-
- try
- {
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
- closeables.add(client);
- client.start();
-
- ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name(name).port(10064).build();
- ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).serializer(slowSerializer).watchInstances(true).build();
- closeables.add(discovery);
- discovery.start();
-
- Assert.assertFalse(discovery.queryForInstances(name).isEmpty(), "Service should start registered.");
-
- server.stop();
- server.restart();
-
- discovery.unregisterService(instance);
- restartLatch.countDown();
-
- new Timing().sleepABit(); // Wait for the rest of registration to finish.
-
- Assert.assertTrue(discovery.queryForInstances(name).isEmpty(), "Service should have unregistered.");
- }
- finally
- {
- Collections.reverse(closeables);
- for ( Closeable c : closeables )
- {
- CloseableUtils.closeQuietly(c);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/curator/blob/f489dfeb/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
new file mode 100644
index 0000000..2808c5c
--- /dev/null
+++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
@@ -0,0 +1,402 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.discovery.details;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.KillSession;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.x.discovery.ServiceDiscovery;
+import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
+import org.apache.curator.x.discovery.ServiceInstance;
+import org.apache.curator.x.discovery.details.InstanceSerializer;
+import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
+import org.apache.curator.x.discovery.details.ServiceDiscoveryImpl;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.io.Closeable;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+
+public class TestServiceDiscovery extends BaseClassForTests
+{
+ private static final Comparator<ServiceInstance<Void>> comparator = new Comparator<ServiceInstance<Void>>()
+ {
+ @Override
+ public int compare(ServiceInstance<Void> o1, ServiceInstance<Void> o2)
+ {
+ return o1.getId().compareTo(o2.getId());
+ }
+ };
+
+ @Test
+ public void testCrashedServerMultiInstances() throws Exception
+ {
+ List<Closeable> closeables = Lists.newArrayList();
+ try
+ {
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ closeables.add(client);
+ client.start();
+
+ final Semaphore semaphore = new Semaphore(0);
+ ServiceInstance<String> instance1 = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
+ ServiceInstance<String> instance2 = ServiceInstance.<String>builder().payload("thing").name("test").port(10065).build();
+ ServiceDiscovery<String> discovery = new ServiceDiscoveryImpl<String>(client, "/test", new JsonInstanceSerializer<String>(String.class), instance1, false)
+ {
+ @Override
+ protected void internalRegisterService(ServiceInstance<String> service) throws Exception
+ {
+ super.internalRegisterService(service);
+ semaphore.release();
+ }
+ };
+ closeables.add(discovery);
+ discovery.start();
+ discovery.registerService(instance2);
+
+ timing.acquireSemaphore(semaphore, 2);
+ Assert.assertEquals(discovery.queryForInstances("test").size(), 2);
+
+ KillSession.kill(client.getZookeeperClient().getZooKeeper(), server.getConnectString());
+ server.stop();
+
+ server.restart();
+ closeables.add(server);
+
+ timing.acquireSemaphore(semaphore, 2);
+ Assert.assertEquals(discovery.queryForInstances("test").size(), 2);
+ }
+ finally
+ {
+ for ( Closeable c : closeables )
+ {
+ CloseableUtils.closeQuietly(c);
+ }
+ }
+ }
+
+ @Test
+ public void testCrashedServer() throws Exception
+ {
+ List<Closeable> closeables = Lists.newArrayList();
+ try
+ {
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ closeables.add(client);
+ client.start();
+
+ final Semaphore semaphore = new Semaphore(0);
+ ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
+ ServiceDiscovery<String> discovery = new ServiceDiscoveryImpl<String>(client, "/test", new JsonInstanceSerializer<String>(String.class), instance, false)
+ {
+ @Override
+ protected void internalRegisterService(ServiceInstance<String> service) throws Exception
+ {
+ super.internalRegisterService(service);
+ semaphore.release();
+ }
+ };
+ closeables.add(discovery);
+ discovery.start();
+
+ timing.acquireSemaphore(semaphore);
+ Assert.assertEquals(discovery.queryForInstances("test").size(), 1);
+
+ KillSession.kill(client.getZookeeperClient().getZooKeeper(), server.getConnectString());
+ server.stop();
+
+ server.restart();
+ closeables.add(server);
+
+ timing.acquireSemaphore(semaphore);
+ Assert.assertEquals(discovery.queryForInstances("test").size(), 1);
+ }
+ finally
+ {
+ for ( Closeable c : closeables )
+ {
+ CloseableUtils.closeQuietly(c);
+ }
+ }
+ }
+
+ @Test
+ public void testCrashedInstance() throws Exception
+ {
+ List<Closeable> closeables = Lists.newArrayList();
+ try
+ {
+ Timing timing = new Timing();
+
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ closeables.add(client);
+ client.start();
+
+ ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
+ ServiceDiscovery<String> discovery = new ServiceDiscoveryImpl<String>(client, "/test", new JsonInstanceSerializer<String>(String.class), instance, false);
+ closeables.add(discovery);
+ discovery.start();
+
+ Assert.assertEquals(discovery.queryForInstances("test").size(), 1);
+
+ KillSession.kill(client.getZookeeperClient().getZooKeeper(), server.getConnectString());
+ Thread.sleep(timing.multiple(1.5).session());
+
+ Assert.assertEquals(discovery.queryForInstances("test").size(), 1);
+ }
+ finally
+ {
+ Collections.reverse(closeables);
+ for ( Closeable c : closeables )
+ {
+ CloseableUtils.closeQuietly(c);
+ }
+ }
+ }
+
+ @Test
+ public void testMultipleInstances() throws Exception
+ {
+ final String SERVICE_ONE = "one";
+ final String SERVICE_TWO = "two";
+
+ List<Closeable> closeables = Lists.newArrayList();
+ try
+ {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ closeables.add(client);
+ client.start();
+
+ ServiceInstance<Void> s1_i1 = ServiceInstance.<Void>builder().name(SERVICE_ONE).build();
+ ServiceInstance<Void> s1_i2 = ServiceInstance.<Void>builder().name(SERVICE_ONE).build();
+ ServiceInstance<Void> s2_i1 = ServiceInstance.<Void>builder().name(SERVICE_TWO).build();
+ ServiceInstance<Void> s2_i2 = ServiceInstance.<Void>builder().name(SERVICE_TWO).build();
+
+ ServiceDiscovery<Void> discovery = ServiceDiscoveryBuilder.builder(Void.class).client(client).basePath("/test").build();
+ closeables.add(discovery);
+ discovery.start();
+
+ discovery.registerService(s1_i1);
+ discovery.registerService(s1_i2);
+ discovery.registerService(s2_i1);
+ discovery.registerService(s2_i2);
+
+ Assert.assertEquals(Sets.newHashSet(discovery.queryForNames()), Sets.newHashSet(SERVICE_ONE, SERVICE_TWO));
+
+ List<ServiceInstance<Void>> list = Lists.newArrayList();
+ list.add(s1_i1);
+ list.add(s1_i2);
+ Collections.sort(list, comparator);
+ List<ServiceInstance<Void>> queriedInstances = Lists.newArrayList(discovery.queryForInstances(SERVICE_ONE));
+ Collections.sort(queriedInstances, comparator);
+ Assert.assertEquals(queriedInstances, list, String.format("Not equal l: %s - d: %s", list, queriedInstances));
+
+ list.clear();
+
+ list.add(s2_i1);
+ list.add(s2_i2);
+ Collections.sort(list, comparator);
+ queriedInstances = Lists.newArrayList(discovery.queryForInstances(SERVICE_TWO));
+ Collections.sort(queriedInstances, comparator);
+ Assert.assertEquals(queriedInstances, list, String.format("Not equal 2: %s - d: %s", list, queriedInstances));
+ }
+ finally
+ {
+ Collections.reverse(closeables);
+ for ( Closeable c : closeables )
+ {
+ CloseableUtils.closeQuietly(c);
+ }
+ }
+ }
+
+ @Test
+ public void testBasic() throws Exception
+ {
+ List<Closeable> closeables = Lists.newArrayList();
+ try
+ {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ closeables.add(client);
+ client.start();
+
+ ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
+ ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build();
+ closeables.add(discovery);
+ discovery.start();
+
+ Assert.assertEquals(discovery.queryForNames(), Collections.singletonList("test"));
+
+ List<ServiceInstance<String>> list = Lists.newArrayList();
+ list.add(instance);
+ Assert.assertEquals(discovery.queryForInstances("test"), list);
+ }
+ finally
+ {
+ Collections.reverse(closeables);
+ for ( Closeable c : closeables )
+ {
+ CloseableUtils.closeQuietly(c);
+ }
+ }
+ }
+
+ @Test
+ public void testNoServerOnStart() throws Exception
+ {
+ server.stop();
+ List<Closeable> closeables = Lists.newArrayList();
+ try
+ {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ closeables.add(client);
+ client.start();
+
+ ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
+ ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build();
+ closeables.add(discovery);
+ discovery.start();
+
+ server.restart();
+ Assert.assertEquals(discovery.queryForNames(), Collections.singletonList("test"));
+
+ List<ServiceInstance<String>> list = Lists.newArrayList();
+ list.add(instance);
+ Assert.assertEquals(discovery.queryForInstances("test"), list);
+ }
+ finally
+ {
+ Collections.reverse(closeables);
+ for ( Closeable c : closeables )
+ {
+ CloseableUtils.closeQuietly(c);
+ }
+ }
+ }
+
+ // CURATOR-164
+ @Test
+ public void testUnregisterService() throws Exception
+ {
+ final String name = "name";
+
+ final CountDownLatch restartLatch = new CountDownLatch(1);
+ List<Closeable> closeables = Lists.newArrayList();
+
+ InstanceSerializer<String> slowSerializer = new JsonInstanceSerializer<String>(String.class)
+ {
+ private boolean first = true;
+
+ @Override
+ public byte[] serialize(ServiceInstance<String> instance) throws Exception
+ {
+ if ( first )
+ {
+ System.out.println("Serializer first registration.");
+ first = false;
+ }
+ else
+ {
+ System.out.println("Waiting for reconnect to finish.");
+ // Simulate the serialize method being slow.
+ // This could just be a timed wait, but that's kind of non-deterministic.
+ restartLatch.await();
+ }
+ return super.serialize(instance);
+ }
+ };
+
+ try
+ {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ closeables.add(client);
+ client.start();
+
+ ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name(name).port(10064).build();
+ ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).serializer(slowSerializer).watchInstances(true).build();
+ closeables.add(discovery);
+ discovery.start();
+
+ Assert.assertFalse(discovery.queryForInstances(name).isEmpty(), "Service should start registered.");
+
+ server.stop();
+ server.restart();
+
+ discovery.unregisterService(instance);
+ restartLatch.countDown();
+
+ new Timing().sleepABit(); // Wait for the rest of registration to finish.
+
+ Assert.assertTrue(discovery.queryForInstances(name).isEmpty(), "Service should have unregistered.");
+ }
+ finally
+ {
+ Collections.reverse(closeables);
+ for ( Closeable c : closeables )
+ {
+ CloseableUtils.closeQuietly(c);
+ }
+ }
+ }
+
+ @Test
+ public void testCleaning() throws Exception
+ {
+ System.setProperty("curator-discovery-clean-threshold-ms", "10");
+ List<Closeable> closeables = Lists.newArrayList();
+ try
+ {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ closeables.add(client);
+ client.start();
+
+ ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
+ ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build();
+ closeables.add(discovery);
+ discovery.start();
+ discovery.unregisterService(instance);
+
+ Thread.sleep(100);
+
+ discovery.queryForNames(); // causes a clean
+ Assert.assertEquals(((ServiceDiscoveryImpl)discovery).debugServicesQty(), 0);
+ }
+ finally
+ {
+ System.clearProperty("curator-discovery-clean-threshold-ms");
+ Collections.reverse(closeables);
+ for ( Closeable c : closeables )
+ {
+ CloseableUtils.closeQuietly(c);
+ }
+ }
+ }
+}