You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by sv...@apache.org on 2016/11/16 17:20:28 UTC
[4/6] brooklyn-library git commit: ZooKeeperNode ID is config rather
than a sensor
ZooKeeperNode ID is config rather than a sensor
Defaults to 1. ZooKeeperEnsemble increments it on each member. This fixes
communication between nodes.
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-library/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-library/commit/ae000a50
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-library/tree/ae000a50
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-library/diff/ae000a50
Branch: refs/heads/master
Commit: ae000a503823c89eb9e49908e160b64047c90026
Parents: 9ce90b8
Author: Sam Corbett <sa...@cloudsoftcorp.com>
Authored: Tue Nov 15 10:40:38 2016 +0000
Committer: Sam Corbett <sa...@cloudsoftcorp.com>
Committed: Wed Nov 16 14:52:19 2016 +0000
----------------------------------------------------------------------
.../entity/zookeeper/AbstractZooKeeperImpl.java | 4 +-
.../entity/zookeeper/ZooKeeperEnsemble.java | 18 ++-
.../entity/zookeeper/ZooKeeperEnsembleImpl.java | 73 ++++++------
.../entity/zookeeper/ZooKeeperNode.java | 10 +-
.../entity/zookeeper/ZooKeeperNodeImpl.java | 17 ++-
.../entity/zookeeper/ZooKeeperSshDriver.java | 13 +-
.../zookeeper/ZooKeeperEnsembleLiveTest.java | 118 ++++++-------------
.../zookeeper/ZooKeeperTestSupport.java | 9 +-
8 files changed, 127 insertions(+), 135 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/ae000a50/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/AbstractZooKeeperImpl.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/AbstractZooKeeperImpl.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/AbstractZooKeeperImpl.java
index 60175c9..b3ced27 100644
--- a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/AbstractZooKeeperImpl.java
+++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/AbstractZooKeeperImpl.java
@@ -94,9 +94,9 @@ public abstract class AbstractZooKeeperImpl extends SoftwareProcessImpl implemen
@Override
public void disconnectSensors() {
- super.disconnectSensors();
- disconnectServiceUpIsRunning();
if (jmxFeed != null) jmxFeed.stop();
+ disconnectServiceUpIsRunning();
+ super.disconnectSensors();
}
@Override
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/ae000a50/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsemble.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsemble.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsemble.java
index a5ba570..2ed6206 100644
--- a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsemble.java
+++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsemble.java
@@ -29,7 +29,10 @@ import org.apache.brooklyn.core.sensor.BasicAttributeSensorAndConfigKey;
import org.apache.brooklyn.core.sensor.Sensors;
import org.apache.brooklyn.entity.group.DynamicCluster;
import org.apache.brooklyn.util.core.flags.SetFromFlag;
+import org.apache.brooklyn.util.guava.Suppliers;
+import com.google.common.base.Predicates;
+import com.google.common.base.Supplier;
import com.google.common.reflect.TypeToken;
@Catalog(name="ZooKeeper ensemble", description="A cluster of ZooKeeper servers. "
@@ -38,15 +41,24 @@ import com.google.common.reflect.TypeToken;
public interface ZooKeeperEnsemble extends DynamicCluster {
@SetFromFlag("clusterName")
- BasicAttributeSensorAndConfigKey<String> CLUSTER_NAME = new BasicAttributeSensorAndConfigKey<String>(String
- .class, "zookeeper.cluster.name", "Name of the Zookeeper cluster", "BrooklynZookeeperCluster");
+ BasicAttributeSensorAndConfigKey<String> CLUSTER_NAME = new BasicAttributeSensorAndConfigKey<String>(String.class,
+ "zookeeper.cluster.name", "Name of the Zookeeper cluster", "BrooklynZookeeperCluster");
@SetFromFlag("initialSize")
- public static final ConfigKey<Integer> INITIAL_SIZE = ConfigKeys.newConfigKeyWithDefault(DynamicCluster.INITIAL_SIZE, 3);
+ ConfigKey<Integer> INITIAL_SIZE = ConfigKeys.newConfigKeyWithDefault(DynamicCluster.INITIAL_SIZE, 3);
+
+ ConfigKey<Supplier<Integer>> NODE_ID_SUPPLIER = ConfigKeys.builder(new TypeToken<Supplier<Integer>>() {})
+ .name("zookeeper.nodeId.supplier")
+ .description("Supplies values for members id in zoo.cfg")
+ .defaultValue(Suppliers.incrementing())
+ .constraint(Predicates.notNull())
+ .build();
@SuppressWarnings("serial")
AttributeSensor<List<String>> ZOOKEEPER_SERVERS = Sensors.newSensor(new TypeToken<List<String>>() { },
"zookeeper.servers", "Hostnames to connect to cluster with");
+ /** @deprecated since 0.10.0 use <code>sensors().get(ZooKeeperEnsemble.CLUSTER_NAME)</code> instead */
+ @Deprecated
String getClusterName();
}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/ae000a50/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsembleImpl.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsembleImpl.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsembleImpl.java
index c2c3e3f..06ea472 100644
--- a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsembleImpl.java
+++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsembleImpl.java
@@ -20,36 +20,33 @@ package org.apache.brooklyn.entity.zookeeper;
import java.util.Collection;
import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.entity.EntitySpec;
import org.apache.brooklyn.api.location.Location;
-import org.apache.brooklyn.api.policy.PolicySpec;
import org.apache.brooklyn.core.entity.Attributes;
-import org.apache.brooklyn.core.entity.EntityInternal;
import org.apache.brooklyn.entity.group.AbstractMembershipTrackingPolicy;
import org.apache.brooklyn.entity.group.DynamicClusterImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.brooklyn.util.guava.Suppliers;
+
+import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
public class ZooKeeperEnsembleImpl extends DynamicClusterImpl implements ZooKeeperEnsemble {
- private static final Logger log = LoggerFactory.getLogger(ZooKeeperEnsembleImpl.class);
- private static final AtomicInteger myId = new AtomicInteger();
-
- private MemberTrackingPolicy policy;
-
public ZooKeeperEnsembleImpl() {}
/**
* Sets the default {@link #MEMBER_SPEC} to describe the ZooKeeper nodes.
+ * Overwrites any value configured for {@link ZooKeeperNode#MY_ID} to use
+ * the value given by {@link ZooKeeperEnsemble#NODE_ID_SUPPLIER}.
*/
@Override
protected EntitySpec<?> getMemberSpec() {
- return getConfig(MEMBER_SPEC, EntitySpec.create(ZooKeeperNode.class));
+ EntitySpec<?> spec = getConfig(MEMBER_SPEC, EntitySpec.create(ZooKeeperNode.class));
+ spec.configure(ZooKeeperNode.MY_ID, config().get(ZooKeeperEnsemble.NODE_ID_SUPPLIER).get());
+ return spec;
}
@Override
@@ -58,33 +55,6 @@ public class ZooKeeperEnsembleImpl extends DynamicClusterImpl implements ZooKeep
}
@Override
- public void init() {
- log.info("Initializing the ZooKeeper Ensemble");
- super.init();
-
- policy = policies().add(PolicySpec.create(MemberTrackingPolicy.class)
- .displayName("Members tracker")
- .configure("group", this));
- }
-
- public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy {
- @Override
- protected void onEntityChange(Entity member) {
- }
-
- @Override
- protected void onEntityAdded(Entity member) {
- if (member.getAttribute(ZooKeeperNode.MY_ID) == null) {
- ((EntityInternal) member).sensors().set(ZooKeeperNode.MY_ID, myId.incrementAndGet());
- }
- }
-
- @Override
- protected void onEntityRemoved(Entity member) {
- }
- };
-
- @Override
protected void initEnrichers() {
super.initEnrichers();
@@ -101,4 +71,31 @@ public class ZooKeeperEnsembleImpl extends DynamicClusterImpl implements ZooKeep
sensors().set(ZOOKEEPER_SERVERS, zookeeperServers);
}
+ /**
+ * @deprecated since 0.10.0 class is unused but kept for persistence backwards compatibility
+ */
+ @Deprecated
+ private static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy {
+ private final Object[] mutex = new Object[0];
+
+ @Override
+ protected void onEntityAdded(Entity member) {
+ if (member.config().get(ZooKeeperNode.MY_ID) == null) {
+ Supplier<Integer> id;
+ synchronized (mutex) {
+ // Entities may not have been created with NODE_ID_SUPPLIER, so create it if
+ // it's not there. We can't provide any good guarantees about what number to
+ // start with, but then again the previous version of the entity gave no
+ // guarantee either.
+ id = entity.config().get(ZooKeeperEnsemble.NODE_ID_SUPPLIER);
+ if (id == null) {
+ id = Suppliers.incrementing();
+ entity.config().set(ZooKeeperEnsemble.NODE_ID_SUPPLIER, id);
+ }
+ }
+ member.config().set(ZooKeeperNode.MY_ID, id.get());
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/ae000a50/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNode.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNode.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNode.java
index e0644df..13a9c7e 100644
--- a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNode.java
+++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNode.java
@@ -24,6 +24,7 @@ import org.apache.brooklyn.api.sensor.AttributeSensor;
import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.config.ConfigKeys;
import org.apache.brooklyn.core.sensor.AttributeSensorAndConfigKey;
+import org.apache.brooklyn.core.sensor.BasicAttributeSensorAndConfigKey;
import org.apache.brooklyn.core.sensor.PortAttributeSensorAndConfigKey;
import org.apache.brooklyn.core.sensor.Sensors;
import org.apache.brooklyn.entity.software.base.SoftwareProcess;
@@ -64,12 +65,19 @@ public interface ZooKeeperNode extends SoftwareProcess {
"zookeeper.configTemplate", "Zookeeper configuration template (in freemarker format)",
"classpath://org/apache/brooklyn/entity/messaging/zookeeper/zoo.cfg");
+ @SetFromFlag("zookeeperId")
+ BasicAttributeSensorAndConfigKey<Integer> MY_ID = new BasicAttributeSensorAndConfigKey<>(Integer.class,
+ "zookeeper.myid", "ZooKeeper node's myId", 1);
+
AttributeSensor<Long> OUTSTANDING_REQUESTS = Sensors.newLongSensor("zookeeper.outstandingRequests", "Outstanding request count");
AttributeSensor<Long> PACKETS_RECEIVED = Sensors.newLongSensor("zookeeper.packets.received", "Total packets received");
AttributeSensor<Long> PACKETS_SENT = Sensors.newLongSensor("zookeeper.packets.sent", "Total packets sent");
- AttributeSensor<Integer> MY_ID = Sensors.newIntegerSensor("zookeeper.myid", "ZooKeeper node's myId");
+ /** @deprecated since 0.10.0 use <code>sensors().get(ZooKeeperNode.ZOOKEEPER_PORT)</code> instead */
+ @Deprecated
Integer getZookeeperPort();
+ /** @deprecated since 0.10.0 use <code>sensors().get(ZooKeeperNode.HOSTNAME)</code> instead */
+ @Deprecated
String getHostname();
}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/ae000a50/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNodeImpl.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNodeImpl.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNodeImpl.java
index f0eee04..cbf055e 100644
--- a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNodeImpl.java
+++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNodeImpl.java
@@ -26,13 +26,20 @@ public class ZooKeeperNodeImpl extends AbstractZooKeeperImpl implements ZooKeepe
public ZooKeeperNodeImpl() {}
@Override
- public Class<?> getDriverInterface() {
- return ZooKeeperDriver.class;
+ public void init() {
+ super.init();
+ // MY_ID was changed from a sensor to config. Publish it as a sensor to maintain
+ // compatibility with any blueprints that reference it.
+ Integer myId = config().get(MY_ID);
+ if (myId == null) {
+ throw new NullPointerException("Require value for " + MY_ID.getName());
+ }
+ sensors().set(MY_ID, myId);
}
@Override
- public void init() {
- super.init();
- sensors().set(ZooKeeperNode.MY_ID, 1);
+ public Class<?> getDriverInterface() {
+ return ZooKeeperDriver.class;
}
+
}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/ae000a50/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperSshDriver.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperSshDriver.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperSshDriver.java
index e4ed338..c7b1dc9 100644
--- a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperSshDriver.java
+++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperSshDriver.java
@@ -54,8 +54,8 @@ public class ZooKeeperSshDriver extends JavaSoftwareProcessSshDriver implements
return entity.getConfig(ZooKeeperNode.ZOOKEEPER_CONFIG_TEMPLATE);
}
- protected int getMyId() {
- return entity.getAttribute(ZooKeeperNode.MY_ID);
+ protected Integer getMyId() {
+ return entity.config().get(ZooKeeperNode.MY_ID);
}
// FIXME All for one, and one for all! If any node fails then we're stuck waiting for its hostname/port forever.
@@ -64,16 +64,19 @@ public class ZooKeeperSshDriver extends JavaSoftwareProcessSshDriver implements
public List<ZooKeeperServerConfig> getZookeeperServers() throws ExecutionException, InterruptedException {
List<ZooKeeperServerConfig> result = Lists.newArrayList();
- if (entity.getParent().getClass().isAssignableFrom(ZooKeeperEnsemble.class)) {
+ if (entity.getParent() instanceof ZooKeeperEnsemble) {
ZooKeeperEnsemble ensemble = (ZooKeeperEnsemble) entity.getParent();
for (Entity member : ensemble.getMembers()) {
- Integer myid = Entities.attributeSupplierWhenReady(member, ZooKeeperNode.MY_ID).get();
+ Integer memberId = member.config().get(ZooKeeperNode.MY_ID);
+ if (memberId == null) {
+ throw new IllegalStateException(member + " has null value for " + ZooKeeperNode.MY_ID);
+ }
String hostname = Entities.attributeSupplierWhenReady(member, ZooKeeperNode.HOSTNAME).get();
Integer port = Entities.attributeSupplierWhenReady(member, ZooKeeperNode.ZOOKEEPER_PORT).get();
Integer leaderPort = Entities.attributeSupplierWhenReady(member, ZooKeeperNode.ZOOKEEPER_LEADER_PORT).get();
Integer electionPort = Entities.attributeSupplierWhenReady(member, ZooKeeperNode.ZOOKEEPER_ELECTION_PORT).get();
- result.add(new ZooKeeperServerConfig(myid, hostname, port, leaderPort, electionPort));
+ result.add(new ZooKeeperServerConfig(memberId, hostname, port, leaderPort, electionPort));
}
}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/ae000a50/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/zookeeper/ZooKeeperEnsembleLiveTest.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/zookeeper/ZooKeeperEnsembleLiveTest.java b/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/zookeeper/ZooKeeperEnsembleLiveTest.java
index 1015c76..16d7047 100644
--- a/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/zookeeper/ZooKeeperEnsembleLiveTest.java
+++ b/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/zookeeper/ZooKeeperEnsembleLiveTest.java
@@ -20,17 +20,13 @@ package org.apache.brooklyn.entity.messaging.zookeeper;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
-import java.net.Socket;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.entity.EntitySpec;
import org.apache.brooklyn.api.location.Location;
-import org.apache.brooklyn.core.entity.Attributes;
import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.entity.EntityAsserts;
import org.apache.brooklyn.core.entity.trait.Startable;
@@ -51,12 +47,10 @@ import org.testng.annotations.Test;
import com.google.common.base.Predicates;
import com.google.common.base.Supplier;
-import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.net.HostAndPort;
-import com.google.common.util.concurrent.Uninterruptibles;
/**
* A live test of the {@link org.apache.brooklyn.entity.zookeeper.ZooKeeperEnsemble} entity.
@@ -70,10 +64,9 @@ public class ZooKeeperEnsembleLiveTest extends BrooklynAppLiveTestSupport {
private static final String DEFAULT_LOCATION = "jclouds:aws-ec2:eu-west-1";
private Location testLocation;
- private ZooKeeperEnsemble cluster;
private String locationSpec;
- @BeforeClass(groups = "Live")
+ @BeforeClass(alwaysRun = true)
@Parameters({"locationSpec"})
public void setLocationSpec(@Optional String locationSpec) {
this.locationSpec = !Strings.isBlank(locationSpec)
@@ -96,60 +89,46 @@ public class ZooKeeperEnsembleLiveTest extends BrooklynAppLiveTestSupport {
public void testStartUpConnectAndResize() throws Exception {
final String zkDataPath = "/ensembletest";
final int initialSize = 3;
- try {
- cluster = app.createAndManageChild(EntitySpec.create(ZooKeeperEnsemble.class)
- .configure(DynamicCluster.INITIAL_SIZE, initialSize)
- .configure(ZooKeeperEnsemble.CLUSTER_NAME, "ZooKeeperEnsembleLiveTest"));
-
- app.start(ImmutableList.of(testLocation));
-
- Entities.dumpInfo(app);
- EntityAsserts.assertAttributeEqualsEventually(cluster, ZooKeeperEnsemble.GROUP_SIZE, 3);
- EntityAsserts.assertAttributeEqualsEventually(cluster, Startable.SERVICE_UP, true);
- Set<Integer> nodeIds = Sets.newHashSet();
- for (Entity zkNode : cluster.getMembers()) {
- assertSocketOpen(zkNode);
- nodeIds.add(zkNode.sensors().get(ZooKeeperNode.MY_ID));
- }
- assertEquals(nodeIds.size(), initialSize, "expected " + initialSize + " node ids, found " + Iterables.toString(nodeIds));
-
- // Write data to one and read from the others.
- List<String> servers = cluster.sensors().get(ZooKeeperEnsemble.ZOOKEEPER_SERVERS);
- assertNotNull(servers, "value for sensor should not be null: " + ZooKeeperEnsemble.ZOOKEEPER_SERVERS);
- assertEquals(servers.size(), initialSize, "expected " + initialSize + " entries in " + servers);
-
- // Write to one
- String firstServer = servers.get(0);
- HostAndPort conn = HostAndPort.fromString(firstServer);
- log.info("Writing data to {}", conn);
- try (ZooKeeperTestSupport zkts = new ZooKeeperTestSupport(conn)) {
- zkts.create(zkDataPath, "data".getBytes());
- assertEquals(new String(zkts.get(zkDataPath)), "data");
- }
-
- // And read from the others.
- for (int i = 1; i < servers.size(); i++) {
- conn = HostAndPort.fromString(servers.get(i));
- log.info("Asserting that data can be read from {}", conn);
- assertPathDataEventually(conn, zkDataPath, "data");
- }
-
- cluster.resize(1);
- EntityAsserts.assertAttributeEqualsEventually(cluster, ZooKeeperEnsemble.GROUP_SIZE, 1);
- EntityAsserts.assertAttributeEqualsContinually(cluster, Startable.SERVICE_UP, true);
-
- // TODO: assert that data can still be read.
- for (Entity zkNode : cluster.getMembers()) {
- assertSocketOpen(zkNode);
- }
- } catch (Throwable e) {
- throw Throwables.propagate(e);
+ ZooKeeperEnsemble ensemble = app.createAndManageChild(EntitySpec.create(ZooKeeperEnsemble.class)
+ .configure(DynamicCluster.INITIAL_SIZE, initialSize)
+ .configure(ZooKeeperEnsemble.CLUSTER_NAME, "ZooKeeperEnsembleLiveTest"));
+
+ app.start(ImmutableList.of(testLocation));
+ Entities.dumpInfo(app);
+
+ EntityAsserts.assertAttributeEqualsEventually(ensemble, ZooKeeperEnsemble.GROUP_SIZE, 3);
+ EntityAsserts.assertAttributeEqualsEventually(ensemble, Startable.SERVICE_UP, true);
+ Set<Integer> nodeIds = Sets.newHashSet();
+ for (Entity zkNode : ensemble.getMembers()) {
+ nodeIds.add(zkNode.config().get(ZooKeeperNode.MY_ID));
+ }
+ assertEquals(nodeIds.size(), initialSize, "expected " + initialSize + " node ids, found " + Iterables.toString(nodeIds));
+
+ // Write data to one and read from the others.
+ List<String> servers = ensemble.sensors().get(ZooKeeperEnsemble.ZOOKEEPER_SERVERS);
+ assertNotNull(servers, "value for sensor should not be null: " + ZooKeeperEnsemble.ZOOKEEPER_SERVERS);
+ assertEquals(servers.size(), initialSize, "expected " + initialSize + " entries in " + servers);
+
+ // Write to one
+ String firstServer = servers.get(0);
+ HostAndPort conn = HostAndPort.fromString(firstServer);
+ log.info("Writing data to {}", conn);
+ try (ZooKeeperTestSupport zkts = new ZooKeeperTestSupport(conn)) {
+ zkts.create(zkDataPath, "data".getBytes());
+ assertEquals(new String(zkts.get(zkDataPath)), "data");
+ }
+
+ // And read from the others.
+ for (int i = 1; i < servers.size(); i++) {
+ conn = HostAndPort.fromString(servers.get(i));
+ log.info("Asserting that data can be read from {}", conn);
+ assertPathDataEventually(conn, zkDataPath, "data");
}
}
protected void assertPathDataEventually(HostAndPort hostAndPort, final String path, String expected) throws Exception {
try (ZooKeeperTestSupport zkts = new ZooKeeperTestSupport(hostAndPort)) {
- Asserts.eventually(new Supplier<String>() {
+ final Supplier<String> dataSupplier = new Supplier<String>() {
@Override
public String get() {
try {
@@ -158,30 +137,9 @@ public class ZooKeeperEnsembleLiveTest extends BrooklynAppLiveTestSupport {
throw Exceptions.propagate(e);
}
}
- }, Predicates.equalTo(expected));
+ };
+ Asserts.eventually(dataSupplier, Predicates.equalTo(expected));
}
-
}
- protected void assertSocketOpen(Entity node) {
- assertTrue(isSocketOpen(node));
- }
-
- protected static boolean isSocketOpen(Entity node) {
- int attempt = 0, maxAttempts = 20;
- while(attempt < maxAttempts) {
- try {
- final String host = node.sensors().get(Attributes.HOSTNAME);
- final int port = node.sensors().get(ZooKeeperNode.ZOOKEEPER_PORT);
- Socket s = new Socket(host, port);
- s.close();
- return true;
- } catch (Exception e) {
- attempt++;
- }
- Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
- }
- return false;
- }
-
}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/ae000a50/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/zookeeper/ZooKeeperTestSupport.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/zookeeper/ZooKeeperTestSupport.java b/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/zookeeper/ZooKeeperTestSupport.java
index 042e9bf..f1987d2 100644
--- a/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/zookeeper/ZooKeeperTestSupport.java
+++ b/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/zookeeper/ZooKeeperTestSupport.java
@@ -22,6 +22,7 @@ package org.apache.brooklyn.entity.messaging.zookeeper;
import java.io.Closeable;
import java.util.concurrent.CountDownLatch;
+import org.apache.brooklyn.location.paas.PaasLocation;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
@@ -29,6 +30,8 @@ import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.net.HostAndPort;
@@ -37,16 +40,20 @@ import com.google.common.net.HostAndPort;
*/
public class ZooKeeperTestSupport implements Closeable {
+ private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperTestSupport.class);
private final ZooKeeper zk;
private final CountDownLatch connSignal = new CountDownLatch(1);
- public ZooKeeperTestSupport(HostAndPort hostAndPort) throws Exception {
+ public ZooKeeperTestSupport(final HostAndPort hostAndPort) throws Exception {
final int sessionTimeout = 3000;
zk = new ZooKeeper(hostAndPort.toString(), sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
+ LOG.debug("Connected to ZooKeeper at {}", hostAndPort);
connSignal.countDown();
+ } else {
+ LOG.info("WatchedEvent at {}: {}", hostAndPort, event.getState());
}
}
});