You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2015/01/04 23:03:41 UTC
[2/2] curator git commit: Added a way to watch registered services so
that tools such as admin consoles can change values and have SD recognize the
changes
Added a way to watch registered services so that tools such as admin consoles can change values and have SD recognize the changes
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/37dc4478
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/37dc4478
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/37dc4478
Branch: refs/heads/CURATOR-176
Commit: 37dc4478597c6db0dcab83b636318b51bb389c58
Parents: 742e092
Author: randgalt <ra...@apache.org>
Authored: Sun Jan 4 17:03:29 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sun Jan 4 17:03:29 2015 -0500
----------------------------------------------------------------------
.../x/discovery/ServiceDiscoveryBuilder.java | 45 +++++++----
.../discovery/details/ServiceDiscoveryImpl.java | 59 +++++++++++++--
.../x/discovery/TestServiceDiscovery.java | 79 ++++++++++----------
.../discovery/details/TestWatchedInstances.java | 76 +++++++++++++++++++
4 files changed, 200 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/37dc4478/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceDiscoveryBuilder.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceDiscoveryBuilder.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceDiscoveryBuilder.java
index 2b972ca..e25fc67 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceDiscoveryBuilder.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceDiscoveryBuilder.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.curator.x.discovery;
import org.apache.curator.framework.CuratorFramework;
@@ -25,20 +26,21 @@ import org.apache.curator.x.discovery.details.ServiceDiscoveryImpl;
public class ServiceDiscoveryBuilder<T>
{
- private CuratorFramework client;
- private String basePath;
- private InstanceSerializer<T> serializer;
- private ServiceInstance<T> thisInstance;
- private Class<T> payloadClass;
+ private CuratorFramework client;
+ private String basePath;
+ private InstanceSerializer<T> serializer;
+ private ServiceInstance<T> thisInstance;
+ private Class<T> payloadClass;
+ private boolean watchInstances = false;
/**
* Return a new builder.
*
* @param payloadClass the class of the payload of your service instance (you can use {@link Void}
- * if your instances don't need a payload)
+ * if your instances don't need a payload)
* @return new builder
*/
- public static<T> ServiceDiscoveryBuilder<T> builder(Class<T> payloadClass)
+ public static <T> ServiceDiscoveryBuilder<T> builder(Class<T> payloadClass)
{
return new ServiceDiscoveryBuilder<T>(payloadClass);
}
@@ -49,12 +51,13 @@ public class ServiceDiscoveryBuilder<T>
*
* @return new service discovery
*/
- public ServiceDiscovery<T> build()
+ public ServiceDiscovery<T> build()
{
- if ( serializer == null ) {
+ if ( serializer == null )
+ {
serializer(new JsonInstanceSerializer<T>(payloadClass));
}
- return new ServiceDiscoveryImpl<T>(client, basePath, serializer, thisInstance);
+ return new ServiceDiscoveryImpl<T>(client, basePath, serializer, thisInstance, watchInstances);
}
/**
@@ -63,7 +66,7 @@ public class ServiceDiscoveryBuilder<T>
* @param client client
* @return this
*/
- public ServiceDiscoveryBuilder<T> client(CuratorFramework client)
+ public ServiceDiscoveryBuilder<T> client(CuratorFramework client)
{
this.client = client;
return this;
@@ -75,7 +78,7 @@ public class ServiceDiscoveryBuilder<T>
* @param basePath base path
* @return this
*/
- public ServiceDiscoveryBuilder<T> basePath(String basePath)
+ public ServiceDiscoveryBuilder<T> basePath(String basePath)
{
this.basePath = basePath;
return this;
@@ -87,7 +90,7 @@ public class ServiceDiscoveryBuilder<T>
* @param serializer the serializer
* @return this
*/
- public ServiceDiscoveryBuilder<T> serializer(InstanceSerializer<T> serializer)
+ public ServiceDiscoveryBuilder<T> serializer(InstanceSerializer<T> serializer)
{
this.serializer = serializer;
return this;
@@ -99,12 +102,26 @@ public class ServiceDiscoveryBuilder<T>
* @param thisInstance initial instance
* @return this
*/
- public ServiceDiscoveryBuilder<T> thisInstance(ServiceInstance<T> thisInstance)
+ public ServiceDiscoveryBuilder<T> thisInstance(ServiceInstance<T> thisInstance)
{
this.thisInstance = thisInstance;
return this;
}
+ /**
+ * Optional - if true, watches for changes to locally registered instances
+ * (via {@link #thisInstance(ServiceInstance)} or {@link ServiceDiscovery#registerService(ServiceInstance)}).
+ * If the data for instances changes, they are reloaded.
+ *
+ * @param watchInstances true to watch instances
+ * @return this
+ */
+ public ServiceDiscoveryBuilder<T> watchInstances(boolean watchInstances)
+ {
+ this.watchInstances = watchInstances;
+ return this;
+ }
+
ServiceDiscoveryBuilder(Class<T> payloadClass)
{
this.payloadClass = payloadClass;
http://git-wip-us.apache.org/repos/asf/curator/blob/37dc4478/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
index a55f678..ca8eabe 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
@@ -24,11 +24,14 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-
-import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.utils.ThreadUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.curator.x.discovery.ServiceCache;
@@ -41,12 +44,11 @@ import org.apache.curator.x.discovery.ServiceType;
import org.apache.curator.x.discovery.strategies.RoundRobinStrategy;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import java.io.IOException;
-import java.io.UnsupportedEncodingException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
@@ -64,6 +66,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
private final Map<String, ServiceInstance<T>> services = Maps.newConcurrentMap();
private final Collection<ServiceCache<T>> caches = Sets.newSetFromMap(Maps.<ServiceCache<T>, Boolean>newConcurrentMap());
private final Collection<ServiceProvider<T>> providers = Sets.newSetFromMap(Maps.<ServiceProvider<T>, Boolean>newConcurrentMap());
+ private final boolean watchInstances;
private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
{
@Override
@@ -89,9 +92,11 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
* @param basePath base path to store data
* @param serializer serializer for instances (e.g. {@link JsonInstanceSerializer})
* @param thisInstance instance that represents the service that is running. The instance will get auto-registered
+ * @param watchInstances if true, watches for changes to locally registered instances
*/
- public ServiceDiscoveryImpl(CuratorFramework client, String basePath, InstanceSerializer<T> serializer, ServiceInstance<T> thisInstance)
+ public ServiceDiscoveryImpl(CuratorFramework client, String basePath, InstanceSerializer<T> serializer, ServiceInstance<T> thisInstance, boolean watchInstances)
{
+ this.watchInstances = watchInstances;
this.client = Preconditions.checkNotNull(client, "client cannot be null");
this.basePath = Preconditions.checkNotNull(basePath, "basePath cannot be null");
this.serializer = Preconditions.checkNotNull(serializer, "serializer cannot be null");
@@ -192,6 +197,10 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
{
CreateMode mode = (service.getServiceType() == ServiceType.DYNAMIC) ? CreateMode.EPHEMERAL : CreateMode.PERSISTENT;
client.create().creatingParentsIfNeeded().withMode(mode).forPath(path, bytes);
+ if ( watchInstances )
+ {
+ resetWatchedInstance(service);
+ }
isDone = true;
}
catch ( KeeperException.NodeExistsException e )
@@ -365,6 +374,37 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
return builder.build();
}
+ private void resetWatchedInstance(final ServiceInstance<T> service) throws Exception
+ {
+ CuratorWatcher watcher = new CuratorWatcher()
+ {
+ @Override
+ public void process(WatchedEvent event) throws Exception
+ {
+ if ( event.getType() == Watcher.Event.EventType.NodeDataChanged )
+ {
+ resetWatchedInstance(service);
+ }
+ }
+ };
+
+ BackgroundCallback callback = new BackgroundCallback()
+ {
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+ {
+ if ( event.getType() == CuratorEventType.GET_DATA )
+ {
+ ServiceInstance<T> newInstance = serializer.deserialize(event.getData());
+ services.put(newInstance.getId(), newInstance);
+ }
+ }
+ };
+
+ String path = pathForInstance(service.getName(), service.getId());
+ client.getData().usingWatcher(watcher).inBackground(callback).forPath(path);
+ }
+
private List<String> getChildrenWatched(String path, Watcher watcher, boolean recurse) throws Exception
{
List<String> instanceIds;
@@ -394,11 +434,18 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
return instanceIds;
}
- private String pathForInstance(String name, String id) throws UnsupportedEncodingException
+ @VisibleForTesting
+ String pathForInstance(String name, String id)
{
return ZKPaths.makePath(pathForName(name), id);
}
+ @VisibleForTesting
+ ServiceInstance<T> getRegisteredService(String id)
+ {
+ return services.get(id);
+ }
+
private void reRegisterServices() throws Exception
{
for ( ServiceInstance<T> service : services.values() )
http://git-wip-us.apache.org/repos/asf/curator/blob/37dc4478/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java
index 73de7fc..0465599 100644
--- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java
+++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java
@@ -16,17 +16,18 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.curator.x.discovery;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.KillSession;
import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
import org.apache.curator.x.discovery.details.ServiceDiscoveryImpl;
import org.testng.Assert;
@@ -40,7 +41,7 @@ import java.util.concurrent.Semaphore;
public class TestServiceDiscovery extends BaseClassForTests
{
- private static final Comparator<ServiceInstance<Void>> comparator = new Comparator<ServiceInstance<Void>>()
+ private static final Comparator<ServiceInstance<Void>> comparator = new Comparator<ServiceInstance<Void>>()
{
@Override
public int compare(ServiceInstance<Void> o1, ServiceInstance<Void> o2)
@@ -50,20 +51,20 @@ public class TestServiceDiscovery extends BaseClassForTests
};
@Test
- public void testCrashedServerMultiInstances() throws Exception
+ public void testCrashedServerMultiInstances() throws Exception
{
- List<Closeable> closeables = Lists.newArrayList();
+ List<Closeable> closeables = Lists.newArrayList();
try
{
- Timing timing = new Timing();
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
closeables.add(client);
client.start();
- final Semaphore semaphore = new Semaphore(0);
- ServiceInstance<String> instance1 = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
- ServiceInstance<String> instance2 = ServiceInstance.<String>builder().payload("thing").name("test").port(10065).build();
- ServiceDiscovery<String> discovery = new ServiceDiscoveryImpl<String>(client, "/test", new JsonInstanceSerializer<String>(String.class), instance1)
+ final Semaphore semaphore = new Semaphore(0);
+ ServiceInstance<String> instance1 = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
+ ServiceInstance<String> instance2 = ServiceInstance.<String>builder().payload("thing").name("test").port(10065).build();
+ ServiceDiscovery<String> discovery = new ServiceDiscoveryImpl<String>(client, "/test", new JsonInstanceSerializer<String>(String.class), instance1, false)
{
@Override
protected void internalRegisterService(ServiceInstance<String> service) throws Exception
@@ -98,19 +99,19 @@ public class TestServiceDiscovery extends BaseClassForTests
}
@Test
- public void testCrashedServer() throws Exception
+ public void testCrashedServer() throws Exception
{
- List<Closeable> closeables = Lists.newArrayList();
+ List<Closeable> closeables = Lists.newArrayList();
try
{
- Timing timing = new Timing();
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
closeables.add(client);
client.start();
- final Semaphore semaphore = new Semaphore(0);
- ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
- ServiceDiscovery<String> discovery = new ServiceDiscoveryImpl<String>(client, "/test", new JsonInstanceSerializer<String>(String.class), instance)
+ final Semaphore semaphore = new Semaphore(0);
+ ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
+ ServiceDiscovery<String> discovery = new ServiceDiscoveryImpl<String>(client, "/test", new JsonInstanceSerializer<String>(String.class), instance, false)
{
@Override
protected void internalRegisterService(ServiceInstance<String> service) throws Exception
@@ -144,24 +145,24 @@ public class TestServiceDiscovery extends BaseClassForTests
}
@Test
- public void testCrashedInstance() throws Exception
+ public void testCrashedInstance() throws Exception
{
- List<Closeable> closeables = Lists.newArrayList();
+ List<Closeable> closeables = Lists.newArrayList();
try
{
- Timing timing = new Timing();
+ Timing timing = new Timing();
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
closeables.add(client);
client.start();
- ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
- ServiceDiscovery<String> discovery = new ServiceDiscoveryImpl<String>(client, "/test", new JsonInstanceSerializer<String>(String.class), instance);
+ ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
+ ServiceDiscovery<String> discovery = new ServiceDiscoveryImpl<String>(client, "/test", new JsonInstanceSerializer<String>(String.class), instance, false);
closeables.add(discovery);
discovery.start();
Assert.assertEquals(discovery.queryForInstances("test").size(), 1);
-
+
KillSession.kill(client.getZookeeperClient().getZooKeeper(), server.getConnectString());
Thread.sleep(timing.multiple(1.5).session());
@@ -178,24 +179,24 @@ public class TestServiceDiscovery extends BaseClassForTests
}
@Test
- public void testMultipleInstances() throws Exception
+ public void testMultipleInstances() throws Exception
{
- final String SERVICE_ONE = "one";
- final String SERVICE_TWO = "two";
+ final String SERVICE_ONE = "one";
+ final String SERVICE_TWO = "two";
- List<Closeable> closeables = Lists.newArrayList();
+ List<Closeable> closeables = Lists.newArrayList();
try
{
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
closeables.add(client);
client.start();
- ServiceInstance<Void> s1_i1 = ServiceInstance.<Void>builder().name(SERVICE_ONE).build();
- ServiceInstance<Void> s1_i2 = ServiceInstance.<Void>builder().name(SERVICE_ONE).build();
- ServiceInstance<Void> s2_i1 = ServiceInstance.<Void>builder().name(SERVICE_TWO).build();
- ServiceInstance<Void> s2_i2 = ServiceInstance.<Void>builder().name(SERVICE_TWO).build();
+ ServiceInstance<Void> s1_i1 = ServiceInstance.<Void>builder().name(SERVICE_ONE).build();
+ ServiceInstance<Void> s1_i2 = ServiceInstance.<Void>builder().name(SERVICE_ONE).build();
+ ServiceInstance<Void> s2_i1 = ServiceInstance.<Void>builder().name(SERVICE_TWO).build();
+ ServiceInstance<Void> s2_i2 = ServiceInstance.<Void>builder().name(SERVICE_TWO).build();
- ServiceDiscovery<Void> discovery = ServiceDiscoveryBuilder.builder(Void.class).client(client).basePath("/test").build();
+ ServiceDiscovery<Void> discovery = ServiceDiscoveryBuilder.builder(Void.class).client(client).basePath("/test").build();
closeables.add(discovery);
discovery.start();
@@ -234,17 +235,17 @@ public class TestServiceDiscovery extends BaseClassForTests
}
@Test
- public void testBasic() throws Exception
+ public void testBasic() throws Exception
{
- List<Closeable> closeables = Lists.newArrayList();
+ List<Closeable> closeables = Lists.newArrayList();
try
{
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
closeables.add(client);
client.start();
-
- ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
- ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build();
+
+ ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
+ ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build();
closeables.add(discovery);
discovery.start();
http://git-wip-us.apache.org/repos/asf/curator/blob/37dc4478/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestWatchedInstances.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestWatchedInstances.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestWatchedInstances.java
new file mode 100644
index 0000000..0a19b41
--- /dev/null
+++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestWatchedInstances.java
@@ -0,0 +1,76 @@
+package org.apache.curator.x.discovery.details;
+
+import com.google.common.collect.Lists;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.x.discovery.ServiceDiscovery;
+import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
+import org.apache.curator.x.discovery.ServiceInstance;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.io.Closeable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class TestWatchedInstances extends BaseClassForTests
+{
+ @Test
+ public void testWatchedInstances() throws Exception
+ {
+ Timing timing = new Timing();
+ List<Closeable> closeables = Lists.newArrayList();
+ try
+ {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ closeables.add(client);
+ client.start();
+
+ ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
+ ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder
+ .builder(String.class)
+ .basePath("/test")
+ .client(client)
+ .thisInstance(instance)
+ .watchInstances(true)
+ .build();
+ closeables.add(discovery);
+ discovery.start();
+
+ Assert.assertEquals(discovery.queryForNames(), Arrays.asList("test"));
+
+ List<ServiceInstance<String>> list = Lists.newArrayList();
+ list.add(instance);
+ Assert.assertEquals(discovery.queryForInstances("test"), list);
+
+ ServiceDiscoveryImpl<String> discoveryImpl = (ServiceDiscoveryImpl<String>)discovery;
+ ServiceInstance<String> changedInstance = ServiceInstance.<String>builder()
+ .id(instance.getId())
+ .address(instance.getAddress())
+ .payload("different")
+ .name(instance.getName())
+ .port(instance.getPort())
+ .build();
+ String path = discoveryImpl.pathForInstance("test", instance.getId());
+ byte[] bytes = discoveryImpl.getSerializer().serialize(changedInstance);
+ client.setData().forPath(path, bytes);
+ timing.sleepABit();
+
+ ServiceInstance<String> registeredService = discoveryImpl.getRegisteredService(instance.getId());
+ Assert.assertNotNull(registeredService);
+ Assert.assertEquals(registeredService.getPayload(), "different");
+ }
+ finally
+ {
+ Collections.reverse(closeables);
+ for ( Closeable c : closeables )
+ {
+ CloseableUtils.closeQuietly(c);
+ }
+ }
+ }
+}