You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2016/02/01 18:46:48 UTC
[30/50] brooklyn-library git commit: Changes based on review comments,
including: - Updating to use latest 0.5.0 APIs - Adding general Zookeeper
entity interface - Make KafkaCluster implement Group
Changes based on review comments, including:
- Updating to use latest 0.5.0 APIs
- Adding general Zookeeper entity interface
- Make KafkaCluster implement Group
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-library/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-library/commit/64486e44
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-library/tree/64486e44
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-library/diff/64486e44
Branch: refs/heads/0.5.0
Commit: 64486e44f0cf633f7306718ec85cdaebcb3a2435
Parents: 9825780
Author: Andrew Kennedy <an...@cloudsoftcorp.com>
Authored: Mon Apr 1 19:00:44 2013 +0100
Committer: Andrew Kennedy <an...@cloudsoftcorp.com>
Committed: Fri Apr 19 10:36:07 2013 +0100
----------------------------------------------------------------------
.../java/brooklyn/demo/KafkaClusterExample.java | 10 +-
.../brooklyn/demo/SimpleCassandraCluster.java | 2 +-
.../brooklyn/demo/SimpleCouchDBCluster.java | 2 +-
.../java/brooklyn/demo/SimpleRedisCluster.java | 2 +-
.../kafka/AbstractfKafkaSshDriver.java | 2 +-
.../entity/messaging/kafka/KafkaBroker.java | 12 +-
.../entity/messaging/kafka/KafkaBrokerImpl.java | 78 ++++------
.../messaging/kafka/KafkaBrokerSshDriver.java | 2 +-
.../entity/messaging/kafka/KafkaCluster.java | 83 +++--------
.../messaging/kafka/KafkaClusterImpl.java | 122 ++++++++--------
.../entity/messaging/kafka/KafkaZookeeper.java | 28 ++--
.../messaging/kafka/KafkaZookeeperImpl.java | 100 +------------
.../kafka/KafkaZookeeperSshDriver.java | 2 +-
.../entity/zookeeper/AbstractZookeeperImpl.java | 122 ++++++++++++++++
.../brooklyn/entity/zookeeper/Zookeeper.java | 50 +++++++
.../activemq/ActiveMQIntegrationTest.groovy | 10 +-
.../messaging/kafka/KafkaIntegrationTest.groovy | 126 ----------------
.../messaging/kafka/KafkaIntegrationTest.java | 144 +++++++++++++++++++
.../entity/messaging/kafka/KafkaSupport.java | 24 +++-
19 files changed, 487 insertions(+), 434 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/KafkaClusterExample.java
----------------------------------------------------------------------
diff --git a/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/KafkaClusterExample.java b/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/KafkaClusterExample.java
index fae6bb6..06bbbed 100644
--- a/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/KafkaClusterExample.java
+++ b/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/KafkaClusterExample.java
@@ -5,7 +5,7 @@ import java.util.List;
import brooklyn.entity.basic.ApplicationBuilder;
import brooklyn.entity.basic.Entities;
import brooklyn.entity.messaging.kafka.KafkaCluster;
-import brooklyn.entity.proxying.BasicEntitySpec;
+import brooklyn.entity.proxying.EntitySpecs;
import brooklyn.launcher.BrooklynLauncher;
import brooklyn.util.CommandLineUtil;
@@ -18,10 +18,10 @@ public class KafkaClusterExample extends ApplicationBuilder {
/** Configure the application. */
protected void doBuild() {
- createChild(BasicEntitySpec.newInstance(KafkaCluster.class)
+ addChild(EntitySpecs.spec(KafkaCluster.class)
+ .configure("startTimeout", 300) // 5 minutes
.configure("initialSize", 2));
-
- appDisplayName("Kafka cluster application");
+ // TODO set application display name?
}
public static void main(String[] argv) {
@@ -30,7 +30,7 @@ public class KafkaClusterExample extends ApplicationBuilder {
String location = CommandLineUtil.getCommandLineOption(args, "--location", DEFAULT_LOCATION);
BrooklynLauncher launcher = BrooklynLauncher.newInstance()
- .application(new KafkaClusterExample())
+ .application(new KafkaClusterExample().appDisplayName("Kafka cluster application"))
.webconsolePort(port)
.location(location)
.start();
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleCassandraCluster.java
----------------------------------------------------------------------
diff --git a/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleCassandraCluster.java b/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleCassandraCluster.java
index 50c62a8..b538ec7 100644
--- a/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleCassandraCluster.java
+++ b/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleCassandraCluster.java
@@ -24,7 +24,7 @@ public class SimpleCassandraCluster extends ApplicationBuilder {
/** Create entities. */
protected void doBuild() {
- createChild(EntitySpecs.spec(CassandraCluster.class)
+ addChild(EntitySpecs.spec(CassandraCluster.class)
.configure("initialSize", "2")
.configure("clusterName", "Brooklyn")
.configure("jmxPort", "11099+")
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleCouchDBCluster.java
----------------------------------------------------------------------
diff --git a/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleCouchDBCluster.java b/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleCouchDBCluster.java
index 179443e..5de676b 100644
--- a/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleCouchDBCluster.java
+++ b/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleCouchDBCluster.java
@@ -24,7 +24,7 @@ public class SimpleCouchDBCluster extends ApplicationBuilder {
/** Create entities. */
protected void doBuild() {
- createChild(EntitySpecs.spec(CouchDBCluster.class)
+ addChild(EntitySpecs.spec(CouchDBCluster.class)
.configure("initialSize", "2")
.configure("clusterName", "Brooklyn")
.configure("httpPort", "8000+"));
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleRedisCluster.java
----------------------------------------------------------------------
diff --git a/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleRedisCluster.java b/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleRedisCluster.java
index da80e39..0f818f5 100644
--- a/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleRedisCluster.java
+++ b/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleRedisCluster.java
@@ -24,7 +24,7 @@ public class SimpleRedisCluster extends ApplicationBuilder {
/** Create entities. */
protected void doBuild() {
- createChild(EntitySpecs.spec(RedisCluster.class)
+ addChild(EntitySpecs.spec(RedisCluster.class)
.configure("initialSize", "2")
.configure("clusterName", "Brooklyn"));
}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java
index f6c7c8d..21e7092 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java
@@ -27,7 +27,7 @@ import org.slf4j.LoggerFactory;
import brooklyn.BrooklynVersion;
import brooklyn.config.ConfigKey;
import brooklyn.entity.basic.EntityLocal;
-import brooklyn.entity.basic.lifecycle.CommonCommands;
+import brooklyn.util.ssh.CommonCommands;
import brooklyn.entity.drivers.downloads.DownloadResolver;
import brooklyn.entity.java.JavaSoftwareProcessSshDriver;
import brooklyn.location.basic.SshMachineLocation;
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java
index 2a82b13..c2d7632 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java
@@ -20,6 +20,7 @@ import brooklyn.entity.basic.SoftwareProcess;
import brooklyn.entity.java.UsesJmx;
import brooklyn.entity.messaging.MessageBroker;
import brooklyn.entity.proxying.ImplementedBy;
+import brooklyn.entity.zookeeper.Zookeeper;
import brooklyn.event.AttributeSensor;
import brooklyn.event.basic.BasicAttributeSensor;
import brooklyn.event.basic.BasicConfigKey;
@@ -42,12 +43,13 @@ public interface KafkaBroker extends SoftwareProcess, MessageBroker, UsesJmx, Ka
PortAttributeSensorAndConfigKey KAFKA_PORT = new PortAttributeSensorAndConfigKey("kafka.port", "Kafka port", "9092+");
/** Location of the configuration file template to be copied to the server.*/
- @SetFromFlag("serverConfig")
- ConfigKey<String> SERVER_CONFIG_TEMPLATE = new BasicConfigKey<String>(
- String.class, "kafka.broker.configTemplate", "Server configuration template (in freemarker format)", "classpath://brooklyn/entity/messaging/kafka/server.properties");
+ @SetFromFlag("kafkaServerConfig")
+ ConfigKey<String> KAFKA_BROKER_CONFIG_TEMPLATE = new BasicConfigKey<String>(String.class,
+ "kafka.broker.configTemplate", "Kafka broker configuration template (in freemarker format)",
+ "classpath://brooklyn/entity/messaging/kafka/server.properties");
@SetFromFlag("zookeeper")
- ConfigKey<KafkaZookeeper> ZOOKEEPER = new BasicConfigKey<KafkaZookeeper>(KafkaZookeeper.class, "kafka.broker.zookeeper", "Kafka zookeeper entity");
+ ConfigKey<Zookeeper> ZOOKEEPER = new BasicConfigKey<Zookeeper>(Zookeeper.class, "kafka.broker.zookeeper", "Kafka zookeeper entity");
AttributeSensor<Integer> BROKER_ID = new BasicAttributeSensor<Integer>(Integer.class, "kafka.broker.id", "Kafka unique broker ID");
@@ -66,6 +68,6 @@ public interface KafkaBroker extends SoftwareProcess, MessageBroker, UsesJmx, Ka
Integer getBrokerId();
- KafkaZookeeper getZookeeper();
+ Zookeeper getZookeeper();
}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
index 0dedf9c..5f8add8 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
@@ -15,11 +15,7 @@
*/
package brooklyn.entity.messaging.kafka;
-import java.io.IOException;
-import java.util.Collection;
import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import javax.management.ObjectName;
@@ -31,23 +27,24 @@ import brooklyn.entity.Entity;
import brooklyn.entity.basic.Entities;
import brooklyn.entity.basic.SoftwareProcessImpl;
import brooklyn.entity.messaging.MessageBroker;
-import brooklyn.event.feed.function.FunctionFeed;
-import brooklyn.event.feed.function.FunctionPollConfig;
+import brooklyn.entity.zookeeper.Zookeeper;
import brooklyn.event.feed.jmx.JmxAttributePollConfig;
import brooklyn.event.feed.jmx.JmxFeed;
import brooklyn.event.feed.jmx.JmxHelper;
import brooklyn.util.MutableMap;
-import brooklyn.util.exceptions.Exceptions;
import com.google.common.base.Functions;
import com.google.common.base.Objects.ToStringHelper;
-import com.google.common.collect.Sets;
/**
* An {@link brooklyn.entity.Entity} that represents a single Kafka broker instance.
*/
public class KafkaBrokerImpl extends SoftwareProcessImpl implements MessageBroker, KafkaBroker {
+
private static final Logger log = LoggerFactory.getLogger(KafkaBrokerImpl.class);
+ private static final ObjectName SOCKET_SERVER_STATS_MBEAN = JmxHelper.createObjectName("kafka:type=kafka.SocketServerStats");
+
+ private volatile JmxFeed jmxFeed;
public KafkaBrokerImpl() {
super();
@@ -63,7 +60,7 @@ public class KafkaBrokerImpl extends SoftwareProcessImpl implements MessageBroke
}
@Override
- public void postConstruct() {
+ public void init() {
setAttribute(BROKER_ID, Math.abs(hashCode())); // Must be positive for partitioning to work
}
@@ -74,7 +71,7 @@ public class KafkaBrokerImpl extends SoftwareProcessImpl implements MessageBroke
public Integer getBrokerId() { return getAttribute(BROKER_ID); }
@Override
- public KafkaZookeeper getZookeeper() { return getConfig(ZOOKEEPER); }
+ public Zookeeper getZookeeper() { return getConfig(ZOOKEEPER); }
public KafkaTopic createTopic(Map<?, ?> properties) {
KafkaTopic result = new KafkaTopic(properties, this);
@@ -88,98 +85,85 @@ public class KafkaBrokerImpl extends SoftwareProcessImpl implements MessageBroke
return KafkaBrokerDriver.class;
}
- private ObjectName socketServerStatsMbean = JmxHelper.createObjectName("kafka:type=kafka.SocketServerStats");
- private volatile FunctionFeed functionFeed;
- private volatile JmxFeed jmxFeed;
-
- /** Wait for five minutes to start. */
- @Override
- public void waitForServiceUp() { waitForServiceUp(5, TimeUnit.MINUTES); }
-
@Override
public void waitForServiceUp(long duration, TimeUnit units) {
super.waitForServiceUp(duration, units);
// Wait for the MBean to exist
- JmxHelper helper = null;
+ JmxHelper helper = new JmxHelper(this);
try {
- helper = new JmxHelper(this);
- helper.connect();
- helper.assertMBeanExistsEventually(socketServerStatsMbean, units.toMillis(duration));
- } catch (IOException e) {
- throw Exceptions.propagate(e);
+ helper.assertMBeanExistsEventually(SOCKET_SERVER_STATS_MBEAN, units.toMillis(duration));
} finally {
- if (helper != null) helper.disconnect();
+ helper.disconnect();
}
}
@Override
protected void connectSensors() {
- functionFeed = FunctionFeed.builder()
- .entity(this)
- .poll(new FunctionPollConfig<Object, Boolean>(SERVICE_UP)
- .period(500, TimeUnit.MILLISECONDS)
- .callable(new Callable<Boolean>() {
- public Boolean call() throws Exception {
- return getDriver().isRunning();
- }
- })
- .onError(Functions.constant(Boolean.FALSE)))
- .build();
+ connectServiceUpIsRunning();
jmxFeed = JmxFeed.builder()
.entity(this)
.period(500, TimeUnit.MILLISECONDS)
.pollAttribute(new JmxAttributePollConfig<Long>(FETCH_REQUEST_COUNT)
- .objectName(socketServerStatsMbean)
+ .objectName(SOCKET_SERVER_STATS_MBEAN)
.attributeName("NumFetchRequests")
.onError(Functions.constant(-1l)))
.pollAttribute(new JmxAttributePollConfig<Long>(TOTAL_FETCH_TIME)
- .objectName(socketServerStatsMbean)
+ .objectName(SOCKET_SERVER_STATS_MBEAN)
.attributeName("TotalFetchRequestMs")
.onError(Functions.constant(-1l)))
.pollAttribute(new JmxAttributePollConfig<Double>(MAX_FETCH_TIME)
- .objectName(socketServerStatsMbean)
+ .objectName(SOCKET_SERVER_STATS_MBEAN)
.attributeName("MaxFetchRequestMs")
.onError(Functions.constant(-1.0d)))
.pollAttribute(new JmxAttributePollConfig<Long>(PRODUCE_REQUEST_COUNT)
- .objectName(socketServerStatsMbean)
+ .objectName(SOCKET_SERVER_STATS_MBEAN)
.attributeName("NumProduceRequests")
.onError(Functions.constant(-1l)))
.pollAttribute(new JmxAttributePollConfig<Long>(TOTAL_PRODUCE_TIME)
- .objectName(socketServerStatsMbean)
+ .objectName(SOCKET_SERVER_STATS_MBEAN)
.attributeName("TotalProduceRequestMs")
.onError(Functions.constant(-1l)))
.pollAttribute(new JmxAttributePollConfig<Double>(MAX_PRODUCE_TIME)
- .objectName(socketServerStatsMbean)
+ .objectName(SOCKET_SERVER_STATS_MBEAN)
.attributeName("MaxProduceRequestMs")
.onError(Functions.constant(-1.0d)))
.pollAttribute(new JmxAttributePollConfig<Long>(BYTES_RECEIVED)
- .objectName(socketServerStatsMbean)
+ .objectName(SOCKET_SERVER_STATS_MBEAN)
.attributeName("TotalBytesRead")
.onError(Functions.constant(-1l)))
.pollAttribute(new JmxAttributePollConfig<Long>(BYTES_SENT)
- .objectName(socketServerStatsMbean)
+ .objectName(SOCKET_SERVER_STATS_MBEAN)
.attributeName("TotalBytesWritten")
.onError(Functions.constant(-1l)))
.build();
+
+ setBrokerUrl();
}
@Override
public void disconnectSensors() {
super.disconnectSensors();
- if (functionFeed != null) functionFeed.stop();
+ disconnectServiceUpIsRunning();
if (jmxFeed != null) jmxFeed.stop();
}
@Override
protected ToStringHelper toStringHelper() {
- return super.toStringHelper().add("kafkaPort", getKafkaPort());
+ return super.toStringHelper()
+ .add("kafkaPort", getKafkaPort());
}
+ /** Use the {@link #getZookeeper() zookeeper} details if available, otherwise use our own host and port. */
@Override
public void setBrokerUrl() {
- // TODO
+ Zookeeper zookeeper = getZookeeper();
+ if (zookeeper != null) {
+ setAttribute(BROKER_URL, String.format("zookeeper://%s:%d", zookeeper.getAttribute(HOSTNAME), zookeeper.getZookeeperPort()));
+ } else {
+ setAttribute(BROKER_URL, String.format("kafka://%s:%d", getAttribute(HOSTNAME), getKafkaPort()));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java
index 40e7234..40df6b4 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java
@@ -34,7 +34,7 @@ public class KafkaBrokerSshDriver extends AbstractfKafkaSshDriver implements Kaf
@Override
protected ConfigKey<String> getConfigTemplateKey() {
- return KafkaBroker.SERVER_CONFIG_TEMPLATE;
+ return KafkaBroker.KAFKA_BROKER_CONFIG_TEMPLATE;
}
@Override
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaCluster.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaCluster.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaCluster.java
index 96e46ff..d1e123a 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaCluster.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaCluster.java
@@ -18,16 +18,15 @@ package brooklyn.entity.messaging.kafka;
import brooklyn.catalog.Catalog;
import brooklyn.config.ConfigKey;
import brooklyn.entity.Entity;
-import brooklyn.entity.basic.Attributes;
-import brooklyn.entity.basic.ConfigurableEntityFactory;
+import brooklyn.entity.Group;
import brooklyn.entity.basic.ConfigKeys;
import brooklyn.entity.group.Cluster;
import brooklyn.entity.group.DynamicCluster;
-import brooklyn.entity.proxying.BasicEntitySpec;
import brooklyn.entity.proxying.EntitySpec;
import brooklyn.entity.proxying.ImplementedBy;
import brooklyn.entity.trait.Resizable;
import brooklyn.entity.trait.Startable;
+import brooklyn.entity.zookeeper.Zookeeper;
import brooklyn.event.AttributeSensor;
import brooklyn.event.basic.BasicAttributeSensor;
import brooklyn.event.basic.BasicAttributeSensorAndConfigKey;
@@ -35,93 +34,53 @@ import brooklyn.event.basic.BasicConfigKey;
import brooklyn.util.flags.SetFromFlag;
/**
- * This entity contains the sub-groups and entities that go in to a single location (e.g. datacenter)
- * to provide Kafka cluster functionality.
+ * Provides Kafka cluster functionality through a group of {@link KafkaBroker brokers} controlled
+ * by a single {@link KafkaZookeeper zookeeper} entity.
* <p>
- * You can customise the broker by customising the factory (by reference in calling code)
- * or supplying your own factory (as a config flag).
+ * You can customise the Kafka zookeeper and brokers by supplying {@link EntitySpec entity specifications}
+ * to be used when creating them. An existing {@link Zookeeper} entity may also be provided instead of the
+ * Kafka zookeeper.
* <p>
- * The contents of this group entity are:
+ * The contents of this entity are:
* <ul>
* <li>a {@link brooklyn.entity.group.DynamicCluster} of {@link KafkaBroker}s
- * <li>a {@link KafkaZookeeper}
- * <li>a {@link brooklyn.policy.Policy} to resize the DynamicCluster
+ * <li>a {@link KafkaZookeeper} or {@link Zookeeper}
+ * <li>a {@link brooklyn.policy.Policy} to resize the broker cluster
* </ul>
+ * The {@link Group group} and {@link Resizable} interface methods are delegated to the broker cluster, so calling
+ * {@link Resizable#resize(Integer) resize} will change the number of brokers.
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
@Catalog(name="Kafka", description="Apache Kafka is a distributed publish-subscribe messaging system")
@ImplementedBy(KafkaClusterImpl.class)
-public interface KafkaCluster extends Entity, Startable, Resizable {
-
- class Spec<T extends KafkaCluster, S extends Spec<T,S>> extends BasicEntitySpec<T,S> {
-
- private static class ConcreteSpec extends Spec<KafkaCluster, ConcreteSpec> {
- ConcreteSpec() {
- super(KafkaCluster.class);
- }
- }
-
- public static Spec<KafkaCluster, ?> newInstance() {
- return new ConcreteSpec();
- }
-
- protected Spec(Class<T> type) {
- super(type);
- }
-
- public S initialSize(int val) {
- configure(INITIAL_SIZE, val);
- return self();
- }
-
- public S zookeeper(KafkaZookeeper val) {
- configure(ZOOKEEPER, val);
- return self();
- }
-
- public S brokerSpec(EntitySpec<KafkaBroker> val) {
- configure(BROKER_SPEC, val);
- return self();
- }
-
- public S brokerFactory(ConfigurableEntityFactory<KafkaBroker> val) {
- configure(BROKER_FACTORY, val);
- return self();
- }
- }
+public interface KafkaCluster extends Entity, Startable, Resizable, Group {
@SetFromFlag("startTimeout")
- public static final ConfigKey<Integer> START_TIMEOUT = ConfigKeys.START_TIMEOUT;
+ ConfigKey<Integer> START_TIMEOUT = ConfigKeys.START_TIMEOUT;
@SetFromFlag("initialSize")
ConfigKey<Integer> INITIAL_SIZE = new BasicConfigKey<Integer>(Cluster.INITIAL_SIZE, 1);
+ /** Zookeeper for the cluster. If null a default be will created. */
@SetFromFlag("zookeeper")
- BasicAttributeSensorAndConfigKey<KafkaZookeeper> ZOOKEEPER = new BasicAttributeSensorAndConfigKey<KafkaZookeeper>(
- KafkaZookeeper.class, "kafka.cluster.zookeeper", "Kafka zookeeper for the cluster; if null a default will created");
+ BasicAttributeSensorAndConfigKey<Zookeeper> ZOOKEEPER = new BasicAttributeSensorAndConfigKey<Zookeeper>(
+ Zookeeper.class, "kafka.cluster.zookeeper", "The zookeeper for the cluster; if null a default be will created");
+ /** Spec for creating the default Kafka zookeeper entity. */
@SetFromFlag("zookeeperSpec")
BasicAttributeSensorAndConfigKey<EntitySpec<KafkaZookeeper>> ZOOKEEPER_SPEC = new BasicAttributeSensorAndConfigKey(
EntitySpec.class, "kafka.cluster.zookeeperSpec", "Spec for creating the kafka zookeeper");
- /** Factory to create a Kafka broker, given flags */
- @SetFromFlag("brokerFactory")
- BasicAttributeSensorAndConfigKey<ConfigurableEntityFactory<KafkaBroker>> BROKER_FACTORY = new BasicAttributeSensorAndConfigKey(
- ConfigurableEntityFactory.class, "kafka.cluster.brokerFactory", "Factory to create a Kafka broker");
-
- /** Spec for Kafka broker entiites to be created */
+ /** Spec for Kafka broker entities to be created. */
@SetFromFlag("brokerSpec")
BasicAttributeSensorAndConfigKey<EntitySpec<KafkaBroker>> BROKER_SPEC = new BasicAttributeSensorAndConfigKey(
EntitySpec.class, "kafka.cluster.brokerSpec", "Spec for Kafka broker entiites to be created");
+ /** Underlying Kafka broker cluster. */
AttributeSensor<DynamicCluster> CLUSTER = new BasicAttributeSensor<DynamicCluster>(
DynamicCluster.class, "kafka.cluster.brokerCluster", "Underlying Kafka broker cluster");
- AttributeSensor<String> HOSTNAME = Attributes.HOSTNAME;
-
- KafkaZookeeper getZookeeper();
-
- ConfigurableEntityFactory<KafkaBroker> getBrokerFactory();
+ Zookeeper getZookeeper();
DynamicCluster getCluster();
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java
index efc14fc..1938efa 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java
@@ -18,7 +18,6 @@ package brooklyn.entity.messaging.kafka;
import java.util.Collection;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -26,19 +25,19 @@ import org.slf4j.LoggerFactory;
import brooklyn.enricher.basic.SensorPropagatingEnricher;
import brooklyn.entity.Entity;
import brooklyn.entity.basic.AbstractEntity;
-import brooklyn.entity.basic.ConfigurableEntityFactory;
import brooklyn.entity.basic.Entities;
import brooklyn.entity.group.DynamicCluster;
-import brooklyn.entity.proxying.BasicEntitySpec;
import brooklyn.entity.proxying.EntitySpec;
-import brooklyn.entity.proxying.WrappingEntitySpec;
+import brooklyn.entity.proxying.EntitySpecs;
import brooklyn.entity.trait.Startable;
+import brooklyn.entity.zookeeper.Zookeeper;
import brooklyn.event.feed.ConfigToAttributes;
import brooklyn.location.Location;
import brooklyn.util.MutableList;
import brooklyn.util.MutableMap;
-import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.exceptions.CompoundRuntimeException;
+import com.google.common.base.Objects;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
@@ -65,104 +64,97 @@ public class KafkaClusterImpl extends AbstractEntity implements KafkaCluster {
}
@Override
- public void postConstruct() {
- ConfigToAttributes.apply(this, BROKER_FACTORY);
+ public void init() {
ConfigToAttributes.apply(this, BROKER_SPEC);
ConfigToAttributes.apply(this, ZOOKEEPER);
ConfigToAttributes.apply(this, ZOOKEEPER_SPEC);
log.debug("creating zookeeper child for {}", this);
- KafkaZookeeper zookeeper = getAttribute(ZOOKEEPER);
+ Zookeeper zookeeper = getAttribute(ZOOKEEPER);
if (zookeeper == null) {
EntitySpec<KafkaZookeeper> zookeeperSpec = getAttribute(ZOOKEEPER_SPEC);
if (zookeeperSpec == null) {
log.debug("creating zookeeper using default spec for {}", this);
- zookeeperSpec = BasicEntitySpec.newInstance(KafkaZookeeper.class);
+ zookeeperSpec = EntitySpecs.spec(KafkaZookeeper.class);
setAttribute(ZOOKEEPER_SPEC, zookeeperSpec);
} else {
log.debug("creating zookeeper using custom spec for {}", this);
}
- zookeeper = getEntityManager().createEntity(WrappingEntitySpec.newInstance(zookeeperSpec).parent(this));
+ zookeeper = addChild(zookeeperSpec);
if (Entities.isManaged(this)) Entities.manage(zookeeper);
setAttribute(ZOOKEEPER, zookeeper);
}
log.debug("creating cluster child for {}", this);
- ConfigurableEntityFactory<KafkaBroker> brokerFactory = getAttribute(BROKER_FACTORY);
EntitySpec<KafkaBroker> brokerSpec = getAttribute(BROKER_SPEC);
- if (brokerFactory == null && brokerSpec == null) {
+ if (brokerSpec == null) {
log.debug("creating default broker spec for {}", this);
- brokerSpec = BasicEntitySpec.newInstance(KafkaBroker.class);
+ brokerSpec = EntitySpecs.spec(KafkaBroker.class);
setAttribute(BROKER_SPEC, brokerSpec);
}
- // Note relies on initial_size being inherited by DynamicCluster, because key id is identical
- // We add the zookeeper configuration to the KafkaBroker specification or factory here
- Map<String,Object> flags;
- if (brokerSpec != null) {
- flags = MutableMap.<String, Object>of("memberSpec", WrappingEntitySpec.newInstance(brokerSpec).configure(KafkaBroker.ZOOKEEPER, zookeeper));
- } else {
- brokerFactory.configure(KafkaBroker.ZOOKEEPER, zookeeper);
- flags = MutableMap.<String, Object>of("factory", brokerFactory);
- }
- DynamicCluster cluster = getEntityManager().createEntity(BasicEntitySpec.newInstance(DynamicCluster.class)
- .parent(this)
- .configure(flags));
+ // Relies on initialSize being inherited by DynamicCluster, because key id is identical
+ // We add the zookeeper configuration to the KafkaBroker specification here
+ DynamicCluster cluster = addChild(EntitySpecs.spec(DynamicCluster.class)
+ .configure("memberSpec", EntitySpecs.wrapSpec(brokerSpec).configure(KafkaBroker.ZOOKEEPER, zookeeper)));
if (Entities.isManaged(this)) Entities.manage(cluster);
setAttribute(CLUSTER, cluster);
}
@Override
- public KafkaZookeeper getZookeeper() {
+ public Zookeeper getZookeeper() {
return getAttribute(ZOOKEEPER);
}
@Override
- public synchronized ConfigurableEntityFactory<KafkaBroker> getBrokerFactory() {
- return (ConfigurableEntityFactory<KafkaBroker>) getAttribute(BROKER_FACTORY);
- }
-
- @Override
- public synchronized DynamicCluster getCluster() {
+ public DynamicCluster getCluster() {
return getAttribute(CLUSTER);
}
@Override
public void start(Collection<? extends Location> locations) {
if (isLegacyConstruction()) {
- postConstruct();
+ init();
}
- if (locations.isEmpty()) locations = this.getLocations();
- Iterables.getOnlyElement(locations); //assert just one
+ if (locations.isEmpty()) locations = getLocations();
+ Iterables.getOnlyElement(locations); // Assert just one
addLocations(locations);
List<Entity> childrenToStart = MutableList.<Entity>of(getCluster());
// Set the KafkaZookeeper entity as child of cluster, if it does not already have a parent
if (getZookeeper().getParent() == null) {
addChild(getZookeeper());
- }
- // And only start zookeeper if we are parent
- if (this.equals(getZookeeper().getParent())) childrenToStart.add(getZookeeper());
- try {
- Entities.invokeEffectorList(this, childrenToStart, Startable.START, ImmutableMap.of("locations", locations)).get();
- } catch (InterruptedException e) {
- throw Exceptions.propagate(e);
- } catch (ExecutionException e) {
- throw Exceptions.propagate(e);
- }
+ } // And only start zookeeper if we are parent
+ if (Objects.equal(this, getZookeeper().getParent())) childrenToStart.add(getZookeeper());
+ Entities.invokeEffectorList(this, childrenToStart, Startable.START, ImmutableMap.of("locations", locations)).getUnchecked();
connectSensors();
}
@Override
public void stop() {
- if (this.equals(getZookeeper().getParent())) {
- getZookeeper().stop();
+ List<Exception> errors = Lists.newArrayList();
+ if (getZookeeper() != null && Objects.equal(this, getZookeeper().getParent())) {
+ try {
+ getZookeeper().stop();
+ } catch (Exception e) {
+ errors.add(e);
+ }
+ }
+ if (getCurrentSize() > 0) {
+ try {
+ getCluster().stop();
+ } catch (Exception e) {
+ errors.add(e);
+ }
}
- getCluster().stop();
- super.getLocations().clear();
+ getLocations().clear();
setAttribute(SERVICE_UP, false);
+
+ if (errors.size() != 0) {
+ throw new CompoundRuntimeException("Error stopping Kafka cluster", errors);
+ }
}
@Override
@@ -181,14 +173,32 @@ public class KafkaClusterImpl extends AbstractEntity implements KafkaCluster {
.addToEntityAndEmitAll(this);
}
+ /*
+ * All Group and Resizable interface methods are delegated to the broker cluster.
+ */
+
+ /** {@inheritDoc} */
@Override
- public Integer resize(Integer desiredSize) {
- return getCluster().resize(desiredSize);
- }
+ public Collection<Entity> getMembers() { return getCluster().getMembers(); }
- /** @return the current size of the group. */
- public Integer getCurrentSize() {
- return getCluster().getCurrentSize();
- }
+ /** {@inheritDoc} */
+ @Override
+ public boolean hasMember(Entity member) { return getCluster().hasMember(member); }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean addMember(Entity member) { return getCluster().addMember(member); }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean removeMember(Entity member) { return getCluster().removeMember(member); }
+
+ /** {@inheritDoc} */
+ @Override
+ public Integer getCurrentSize() { return getCluster().getCurrentSize(); }
+
+ /** {@inheritDoc} */
+ @Override
+ public Integer resize(Integer desiredSize) { return getCluster().resize(desiredSize); }
}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeper.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeper.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeper.java
index a001a29..a0d7a46 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeper.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeper.java
@@ -17,40 +17,28 @@ package brooklyn.entity.messaging.kafka;
import brooklyn.config.ConfigKey;
import brooklyn.entity.basic.SoftwareProcess;
-import brooklyn.entity.java.UsesJmx;
import brooklyn.entity.proxying.ImplementedBy;
-import brooklyn.event.AttributeSensor;
-import brooklyn.event.basic.BasicAttributeSensor;
+import brooklyn.entity.zookeeper.Zookeeper;
import brooklyn.event.basic.BasicConfigKey;
-import brooklyn.event.basic.PortAttributeSensorAndConfigKey;
import brooklyn.util.flags.SetFromFlag;
/**
* An {@link brooklyn.entity.Entity} that represents a single Kafka zookeeper instance.
*/
@ImplementedBy(KafkaZookeeperImpl.class)
-public interface KafkaZookeeper extends SoftwareProcess, UsesJmx, Kafka {
+public interface KafkaZookeeper extends Zookeeper, Kafka {
@SetFromFlag("startTimeout")
public static final ConfigKey<Integer> START_TIMEOUT = SoftwareProcess.START_TIMEOUT;
+ /** The Kafka version, not the Zookeeper version. */
@SetFromFlag("version")
ConfigKey<String> SUGGESTED_VERSION = Kafka.SUGGESTED_VERSION;
- @SetFromFlag("zookeeperPort")
- PortAttributeSensorAndConfigKey ZOOKEEPER_PORT = new PortAttributeSensorAndConfigKey("zookeeper.port", "Zookeeper port", "2181+");
-
- /** Location of the configuration file template to be copied to the server. */
- @SetFromFlag("zookeeperConfig")
- ConfigKey<String> ZOOKEEPER_CONFIG_TEMPLATE = new BasicConfigKey<String>(
- String.class, "kafka.zookeeper.configTemplate", "Zookeeper configuration template (in freemarker format)", "classpath://brooklyn/entity/messaging/kafka/zookeeper.properties");
-
- AttributeSensor<Long> OUTSTANDING_REQUESTS = new BasicAttributeSensor<Long>(Long.class, "kafka.zookeeper.outstandingRequests", "Outstanding request count");
- AttributeSensor<Long> PACKETS_RECEIVED = new BasicAttributeSensor<Long>(Long.class, "kafka.zookeeper.packets.received", "Total packets received");
- AttributeSensor<Long> PACKETS_SENT = new BasicAttributeSensor<Long>(Long.class, "kafka.zookeeper.packets.sent", "Total packets sent");
-
- Integer getZookeeperPort();
-
- String getHostname();
+ /** Location of the kafka configuration file template to be copied to the server. */
+ @SetFromFlag("kafkaZookeeperConfig")
+ ConfigKey<String> KAFKA_ZOOKEEPER_CONFIG_TEMPLATE = new BasicConfigKey<String>(String.class,
+ "kafka.zookeeper.configTemplate", "Kafka zookeeper configuration template (in freemarker format)",
+ "classpath://brooklyn/entity/messaging/kafka/zookeeper.properties");
}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperImpl.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperImpl.java
index 00f892b..79a6cf6 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperImpl.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperImpl.java
@@ -15,37 +15,22 @@
*/
package brooklyn.entity.messaging.kafka;
-import java.io.IOException;
-import java.util.Collection;
import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-
-import javax.management.ObjectName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Objects.ToStringHelper;
+
import brooklyn.entity.Entity;
-import brooklyn.entity.basic.SoftwareProcessImpl;
-import brooklyn.event.feed.function.FunctionFeed;
-import brooklyn.event.feed.function.FunctionPollConfig;
-import brooklyn.event.feed.jmx.JmxAttributePollConfig;
-import brooklyn.event.feed.jmx.JmxFeed;
-import brooklyn.event.feed.jmx.JmxHelper;
+import brooklyn.entity.zookeeper.AbstractZookeeperImpl;
import brooklyn.util.MutableMap;
-import brooklyn.util.exceptions.Exceptions;
-
-import com.google.common.base.Functions;
-import com.google.common.base.Objects.ToStringHelper;
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.Sets;
/**
* An {@link brooklyn.entity.Entity} that represents a single Kafka zookeeper instance.
*/
-public class KafkaZookeeperImpl extends SoftwareProcessImpl implements KafkaZookeeper {
+public class KafkaZookeeperImpl extends AbstractZookeeperImpl implements KafkaZookeeper {
+
private static final Logger log = LoggerFactory.getLogger(KafkaZookeeperImpl.class);
public KafkaZookeeperImpl() {
@@ -62,83 +47,8 @@ public class KafkaZookeeperImpl extends SoftwareProcessImpl implements KafkaZook
}
@Override
- public Integer getZookeeperPort() { return getAttribute(ZOOKEEPER_PORT); }
-
- @Override
- public String getHostname() { return getAttribute(HOSTNAME); }
-
- @Override
public Class<?> getDriverInterface() {
return KafkaZookeeperDriver.class;
}
- private ObjectName zookeeperMbean = JmxHelper.createObjectName("org.apache.ZooKeeperService:name0=StandaloneServer_port-1");
- private volatile FunctionFeed functionFeed;
- private volatile JmxFeed jmxFeed;
-
- /** Wait for five minutes to start. */
- @Override
- public void waitForServiceUp() { waitForServiceUp(5, TimeUnit.MINUTES); }
-
- @Override
- public void waitForServiceUp(long duration, TimeUnit units) {
- super.waitForServiceUp(duration, units);
-
- // Wait for the MBean to exist
- JmxHelper helper = null;
- try {
- helper = new JmxHelper(this);
- helper.connect();
- helper.assertMBeanExistsEventually(zookeeperMbean, units.toMillis(duration));
- } catch (IOException e) {
- throw Exceptions.propagate(e);
- } finally {
- if (helper != null) helper.disconnect();
- }
- }
-
- @Override
- protected void connectSensors() {
- functionFeed = FunctionFeed.builder()
- .entity(this)
- .poll(new FunctionPollConfig<Object, Boolean>(SERVICE_UP)
- .period(500, TimeUnit.MILLISECONDS)
- .callable(new Callable<Boolean>() {
- public Boolean call() throws Exception {
- return getDriver().isRunning();
- }
- })
- .onError(Functions.constant(Boolean.FALSE)))
- .build();
-
- jmxFeed = JmxFeed.builder()
- .entity(this)
- .period(500, TimeUnit.MILLISECONDS)
- .pollAttribute(new JmxAttributePollConfig<Long>(OUTSTANDING_REQUESTS)
- .objectName(zookeeperMbean)
- .attributeName("OutstandingRequests")
- .onError(Functions.constant(-1l)))
- .pollAttribute(new JmxAttributePollConfig<Long>(PACKETS_RECEIVED)
- .objectName(zookeeperMbean)
- .attributeName("PacketsReceived")
- .onError(Functions.constant(-1l)))
- .pollAttribute(new JmxAttributePollConfig<Long>(PACKETS_SENT)
- .objectName(zookeeperMbean)
- .attributeName("PacketsSent")
- .onError(Functions.constant(-1l)))
- .build();
- }
-
- @Override
- public void disconnectSensors() {
- super.disconnectSensors();
- if (functionFeed != null) functionFeed.stop();
- if (jmxFeed != null) jmxFeed.stop();
- }
-
- @Override
- protected ToStringHelper toStringHelper() {
- return super.toStringHelper().add("zookeeperPort", getZookeeperPort());
- }
-
}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java
index a35aab6..df417e0 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java
@@ -34,7 +34,7 @@ public class KafkaZookeeperSshDriver extends AbstractfKafkaSshDriver implements
@Override
protected ConfigKey<String> getConfigTemplateKey() {
- return KafkaZookeeper.ZOOKEEPER_CONFIG_TEMPLATE;
+ return KafkaZookeeper.KAFKA_ZOOKEEPER_CONFIG_TEMPLATE;
}
@Override
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/main/java/brooklyn/entity/zookeeper/AbstractZookeeperImpl.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/zookeeper/AbstractZookeeperImpl.java b/software/messaging/src/main/java/brooklyn/entity/zookeeper/AbstractZookeeperImpl.java
new file mode 100644
index 0000000..2e2fc73
--- /dev/null
+++ b/software/messaging/src/main/java/brooklyn/entity/zookeeper/AbstractZookeeperImpl.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2013 by Cloudsoft Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package brooklyn.entity.zookeeper;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import javax.management.ObjectName;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.Entity;
+import brooklyn.entity.basic.SoftwareProcessImpl;
+import brooklyn.event.feed.function.FunctionFeed;
+import brooklyn.event.feed.function.FunctionPollConfig;
+import brooklyn.event.feed.jmx.JmxAttributePollConfig;
+import brooklyn.event.feed.jmx.JmxFeed;
+import brooklyn.event.feed.jmx.JmxHelper;
+import brooklyn.util.MutableMap;
+import brooklyn.util.exceptions.Exceptions;
+
+import com.google.common.base.Functions;
+import com.google.common.base.Objects.ToStringHelper;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Sets;
+
+/**
+ * An {@link brooklyn.entity.Entity} that represents a single Apache Zookeeper instance.
+ */
+public abstract class AbstractZookeeperImpl extends SoftwareProcessImpl implements Zookeeper {
+
+ private static final Logger log = LoggerFactory.getLogger(AbstractZookeeperImpl.class);
+ private static final ObjectName ZOOKEEPER_MBEAN = JmxHelper.createObjectName("org.apache.ZooKeeperService:name0=StandaloneServer_port-1");
+
+ private volatile JmxFeed jmxFeed;
+
+ public AbstractZookeeperImpl() {
+ super();
+ }
+ public AbstractZookeeperImpl(Map<?, ?> properties) {
+ this(properties, null);
+ }
+ public AbstractZookeeperImpl(Entity parent) {
+ this(MutableMap.of(), parent);
+ }
+ public AbstractZookeeperImpl(Map<?, ?> properties, Entity parent) {
+ super(properties, parent);
+ }
+
+ @Override
+ public Integer getZookeeperPort() { return getAttribute(ZOOKEEPER_PORT); }
+
+ @Override
+ public String getHostname() { return getAttribute(HOSTNAME); }
+
+ @Override
+ public void waitForServiceUp(long duration, TimeUnit units) {
+ super.waitForServiceUp(duration, units);
+
+ // Wait for the MBean to exist
+ JmxHelper helper = new JmxHelper(this);
+ try {
+ helper.assertMBeanExistsEventually(ZOOKEEPER_MBEAN, units.toMillis(duration));
+ } finally {
+ helper.disconnect();
+ }
+ }
+
+ @Override
+ protected void connectSensors() {
+ connectServiceUpIsRunning();
+
+ jmxFeed = JmxFeed.builder()
+ .entity(this)
+ .period(500, TimeUnit.MILLISECONDS)
+ .pollAttribute(new JmxAttributePollConfig<Long>(OUTSTANDING_REQUESTS)
+ .objectName(ZOOKEEPER_MBEAN)
+ .attributeName("OutstandingRequests")
+ .onError(Functions.constant(-1l)))
+ .pollAttribute(new JmxAttributePollConfig<Long>(PACKETS_RECEIVED)
+ .objectName(ZOOKEEPER_MBEAN)
+ .attributeName("PacketsReceived")
+ .onError(Functions.constant(-1l)))
+ .pollAttribute(new JmxAttributePollConfig<Long>(PACKETS_SENT)
+ .objectName(ZOOKEEPER_MBEAN)
+ .attributeName("PacketsSent")
+ .onError(Functions.constant(-1l)))
+ .build();
+ }
+
+ @Override
+ public void disconnectSensors() {
+ super.disconnectSensors();
+ disconnectServiceUpIsRunning();
+ if (jmxFeed != null) jmxFeed.stop();
+ }
+
+ @Override
+ protected ToStringHelper toStringHelper() {
+ return super.toStringHelper()
+ .add("zookeeperPort", getZookeeperPort());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/main/java/brooklyn/entity/zookeeper/Zookeeper.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/zookeeper/Zookeeper.java b/software/messaging/src/main/java/brooklyn/entity/zookeeper/Zookeeper.java
new file mode 100644
index 0000000..369ff61
--- /dev/null
+++ b/software/messaging/src/main/java/brooklyn/entity/zookeeper/Zookeeper.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2013 by Cloudsoft Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package brooklyn.entity.zookeeper;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.SoftwareProcess;
+import brooklyn.entity.java.UsesJmx;
+import brooklyn.entity.proxying.ImplementedBy;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.basic.BasicAttributeSensor;
+import brooklyn.event.basic.BasicConfigKey;
+import brooklyn.event.basic.PortAttributeSensorAndConfigKey;
+import brooklyn.util.flags.SetFromFlag;
+
+/**
+ * An {@link brooklyn.entity.Entity} that represents a single Apache Zookeeper instance.
+ * <p>
+ * Currently {@code abstract} as there is no generic Zookeeper driver.
+ */
+@ImplementedBy(AbstractZookeeperImpl.class)
+public interface Zookeeper extends SoftwareProcess, UsesJmx {
+
+ @SetFromFlag("version")
+ ConfigKey<String> SUGGESTED_VERSION = new BasicConfigKey<String>(SoftwareProcess.SUGGESTED_VERSION, "3.3.3");
+
+ @SetFromFlag("zookeeperPort")
+ PortAttributeSensorAndConfigKey ZOOKEEPER_PORT = new PortAttributeSensorAndConfigKey("zookeeper.port", "Zookeeper port", "2181+");
+
+ AttributeSensor<Long> OUTSTANDING_REQUESTS = new BasicAttributeSensor<Long>(Long.class, "zookeeper.outstandingRequests", "Outstanding request count");
+ AttributeSensor<Long> PACKETS_RECEIVED = new BasicAttributeSensor<Long>(Long.class, "zookeeper.packets.received", "Total packets received");
+ AttributeSensor<Long> PACKETS_SENT = new BasicAttributeSensor<Long>(Long.class, "zookeeper.packets.sent", "Total packets sent");
+
+ Integer getZookeeperPort();
+
+ String getHostname();
+
+}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQIntegrationTest.groovy
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQIntegrationTest.groovy b/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQIntegrationTest.groovy
index 28ff308..8733cb0 100644
--- a/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQIntegrationTest.groovy
+++ b/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQIntegrationTest.groovy
@@ -41,19 +41,15 @@ public class ActiveMQIntegrationTest {
private Location testLocation
private ActiveMQBroker activeMQ
- @BeforeMethod(groups = "Integration")
+ @BeforeMethod(alwaysRun = true)
public void setup() {
app = ApplicationBuilder.newManagedApp(TestApplication.class);
testLocation = new LocalhostMachineProvisioningLocation()
}
- @AfterMethod(groups = "Integration")
+ @AfterMethod(alwaysRun = true)
public void shutdown() {
- try {
- if (app != null) Entities.destroyAll(app);
- } catch (Exception e) {
- log.warn("Error stopping entities", e);
- }
+ if (app != null) Entities.destroyAll(app);
}
/**
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy
deleted file mode 100644
index 2ef95c5..0000000
--- a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Copyright 2013 by Cloudsoft Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package brooklyn.entity.messaging.kafka;
-
-import static brooklyn.test.TestUtils.*
-import static java.util.concurrent.TimeUnit.*
-import static org.testng.Assert.*
-
-import java.util.concurrent.TimeUnit
-
-import javax.jms.Connection
-import javax.jms.MessageConsumer
-import javax.jms.MessageProducer
-import javax.jms.Queue
-import javax.jms.Session
-import javax.jms.TextMessage
-
-import org.apache.activemq.ActiveMQConnectionFactory
-import org.slf4j.Logger
-import org.slf4j.LoggerFactory
-import org.testng.annotations.AfterMethod
-import org.testng.annotations.BeforeMethod
-import org.testng.annotations.Test
-
-import brooklyn.entity.basic.ApplicationBuilder
-import brooklyn.entity.basic.Entities
-import brooklyn.entity.proxying.BasicEntitySpec
-import brooklyn.entity.trait.Startable
-import brooklyn.location.Location
-import brooklyn.location.basic.LocalhostMachineProvisioningLocation
-import brooklyn.test.entity.TestApplication
-import brooklyn.util.internal.TimeExtras
-
-/**
- * Test the operation of the {@link ActiveMQBroker} class.
- *
- * TODO test that sensors update.
- */
-public class KafkaIntegrationTest {
- private static final Logger log = LoggerFactory.getLogger(KafkaIntegrationTest.class)
-
- static { TimeExtras.init() }
-
- private TestApplication app
- private Location testLocation
-
- @BeforeMethod(groups = "Integration")
- public void setup() {
- app = ApplicationBuilder.builder(TestApplication.class).manage();
- testLocation = new LocalhostMachineProvisioningLocation()
- }
-
- @AfterMethod(groups = "Integration")
- public void shutdown() {
- if (app != null) Entities.destroyAll(app);
- }
-
- /**
- * Test that we can start a zookeeper.
- */
- @Test(groups = "Integration")
- public void testZookeeper() {
- KafkaZookeeper zookeeper = app.createAndManageChild(BasicEntitySpec.newInstance(KafkaZookeeper.class));
-
- zookeeper.start([ testLocation ])
- executeUntilSucceedsWithShutdown(zookeeper, timeout:600*TimeUnit.SECONDS) {
- assertTrue zookeeper.getAttribute(Startable.SERVICE_UP)
- }
- assertFalse zookeeper.getAttribute(Startable.SERVICE_UP)
- }
-
- /**
- * Test that we can start a broker and zookeeper together.
- */
- @Test(groups = "Integration")
- public void testBrokerPlusZookeeper() {
- KafkaZookeeper zookeeper = app.createAndManageChild(BasicEntitySpec.newInstance(KafkaZookeeper.class));
- KafkaBroker broker = app.createAndManageChild(BasicEntitySpec.newInstance(KafkaBroker.class).configure(KafkaBroker.ZOOKEEPER, zookeeper));
-
- zookeeper.start([ testLocation ])
- executeUntilSucceeds(timeout:600*TimeUnit.SECONDS) {
- assertTrue zookeeper.getAttribute(Startable.SERVICE_UP)
- }
-
- broker.start([ testLocation ])
- executeUntilSucceeds(timeout:600*TimeUnit.SECONDS) {
- assertTrue broker.getAttribute(Startable.SERVICE_UP)
- }
- }
-
- /**
- * Test that we can start a cluster with zookeeper and one broker.
- *
- * Connects to the zookeeper controller and tests sending and receiving messages on a topic.
- */
- @Test(groups = "Integration")
- public void testSingleBrokerCluster() {
- KafkaCluster cluster = app.createAndManageChild(BasicEntitySpec.newInstance(KafkaCluster.class).configure(KafkaCluster.INITIAL_SIZE, 1));
-
- cluster.start([ testLocation ])
- executeUntilSucceeds(timeout:600*TimeUnit.SECONDS) {
- assertTrue cluster.getAttribute(Startable.SERVICE_UP)
- }
-
- Entities.dumpInfo(cluster);
-
- KafkaSupport support = new KafkaSupport(cluster.getZookeeper());
- support.sendMessage("brooklyn", "TEST_MESSAGE")
- String message = support.getMessage("brooklyn");
- assertEquals(message, "TEST_MESSAGE");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java
new file mode 100644
index 0000000..54f698a
--- /dev/null
+++ b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java
@@ -0,0 +1,144 @@
+/*
+ * Copyright 2013 by Cloudsoft Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package brooklyn.entity.messaging.kafka;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+import java.util.concurrent.Callable;
+
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.basic.ApplicationBuilder;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.messaging.activemq.ActiveMQBroker;
+import brooklyn.entity.proxying.EntitySpecs;
+import brooklyn.entity.trait.Startable;
+import brooklyn.location.Location;
+import brooklyn.location.basic.LocalhostMachineProvisioningLocation;
+import brooklyn.test.Asserts;
+import brooklyn.test.entity.TestApplication;
+import brooklyn.util.MutableMap;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Test the operation of the {@link ActiveMQBroker} class.
+ *
+ * TODO test that sensors update.
+ */
+public class KafkaIntegrationTest {
+
+ private TestApplication app;
+ private Location testLocation;
+
+ @BeforeMethod(alwaysRun = true)
+ public void setup() {
+ app = ApplicationBuilder.newManagedApp(TestApplication.class);
+ testLocation = new LocalhostMachineProvisioningLocation();
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void shutdown() {
+ if (app != null) Entities.destroyAll(app);
+ }
+
+ /**
+ * Test that we can start a zookeeper.
+ */
+ @Test(groups = "Integration")
+ public void testZookeeper() {
+ final KafkaZookeeper zookeeper = app.createAndManageChild(EntitySpecs.spec(KafkaZookeeper.class));
+
+ zookeeper.start(ImmutableList.of(testLocation));
+ Asserts.succeedsEventually(MutableMap.of("timeout", 60000l), new Callable<Void>() {
+ @Override
+ public Void call() {
+ assertTrue(zookeeper.getAttribute(Startable.SERVICE_UP));
+ return null;
+ }
+ });
+
+ zookeeper.stop();
+ assertFalse(zookeeper.getAttribute(Startable.SERVICE_UP));
+ }
+
+ /**
+ * Test that we can start a broker and zookeeper together.
+ */
+ @Test(groups = "Integration")
+ public void testBrokerPlusZookeeper() {
+ final KafkaZookeeper zookeeper = app.createAndManageChild(EntitySpecs.spec(KafkaZookeeper.class));
+ final KafkaBroker broker = app.createAndManageChild(EntitySpecs.spec(KafkaBroker.class).configure(KafkaBroker.ZOOKEEPER, zookeeper));
+
+ zookeeper.start(ImmutableList.of(testLocation));
+ Asserts.succeedsEventually(MutableMap.of("timeout", 60000l), new Callable<Void>() {
+ @Override
+ public Void call() {
+ assertTrue(zookeeper.getAttribute(Startable.SERVICE_UP));
+ return null;
+ }
+ });
+
+ broker.start(ImmutableList.of(testLocation));
+ Asserts.succeedsEventually(MutableMap.of("timeout", 60000l), new Callable<Void>() {
+ @Override
+ public Void call() {
+ assertTrue(broker.getAttribute(Startable.SERVICE_UP));
+ return null;
+ }
+ });
+
+ zookeeper.stop();
+ assertFalse(zookeeper.getAttribute(Startable.SERVICE_UP));
+
+ broker.stop();
+ assertFalse(broker.getAttribute(Startable.SERVICE_UP));
+ }
+
+ /**
+ * Test that we can start a cluster with zookeeper and one broker.
+ *
+ * Connects to the zookeeper controller and tests sending and receiving messages on a topic.
+ */
+ @Test(groups = "Integration")
+ public void testSingleBrokerCluster() {
+ final KafkaCluster cluster = app.createAndManageChild(EntitySpecs.spec(KafkaCluster.class));
+
+ cluster.start(ImmutableList.of(testLocation));
+ Asserts.succeedsEventually(MutableMap.of("timeout", 60000l), new Callable<Void>() {
+ @Override
+ public Void call() {
+ assertTrue(cluster.getAttribute(Startable.SERVICE_UP));
+ assertTrue(cluster.getZookeeper().getAttribute(Startable.SERVICE_UP));
+ assertEquals(cluster.getCurrentSize().intValue(), 1);
+ return null;
+ }
+ });
+
+ Entities.dumpInfo(cluster);
+
+ KafkaSupport support = new KafkaSupport(cluster);
+
+ support.sendMessage("brooklyn", "TEST_MESSAGE");
+ String message = support.getMessage("brooklyn");
+ assertEquals(message, "TEST_MESSAGE");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
index d9372a9..019a65b 100644
--- a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
+++ b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
@@ -31,40 +31,54 @@ import kafka.javaapi.producer.ProducerData;
import kafka.message.Message;
import kafka.producer.ProducerConfig;
import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.zookeeper.Zookeeper;
-import com.beust.jcommander.internal.Lists;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
+/**
+ * Kafka test framework for integration and live tests, using the Kafka Java API.
+ */
public class KafkaSupport {
- private final KafkaZookeeper zookeeper;
+ private final KafkaCluster cluster;
- public KafkaSupport(KafkaZookeeper zookeeper) {
- this.zookeeper = zookeeper;
+ public KafkaSupport(KafkaCluster cluster) {
+ this.cluster = cluster;
}
+ /**
+ * Send a message to the {@link KafkaCluster} on the given topic.
+ */
public void sendMessage(String topic, String message) {
+ Zookeeper zookeeper = cluster.getZookeeper();
Properties props = new Properties();
props.put("zk.connect", String.format("%s:%d", zookeeper.getAttribute(Attributes.HOSTNAME), zookeeper.getZookeeperPort()));
props.put("serializer.class", "kafka.serializer.StringEncoder");
ProducerConfig config = new ProducerConfig(props);
+
Producer<String, String> producer = new Producer<String, String>(config);
ProducerData<String, String> data = new ProducerData<String, String>(topic, message);
producer.send(data);
producer.close();
}
+ /**
+ * Retrieve the next message on the given topic from the {@link KafkaCluster}.
+ */
public String getMessage(String topic) {
+ Zookeeper zookeeper = cluster.getZookeeper();
Properties props = new Properties();
props.put("zk.connect", String.format("%s:%d", zookeeper.getAttribute(Attributes.HOSTNAME), zookeeper.getZookeeperPort()));
- props.put("zk.connectiontimeout.ms", "1000000");
+ props.put("zk.connectiontimeout.ms", "120000"); // two minutes
props.put("groupid", "brooklyn");
ConsumerConfig consumerConfig = new ConsumerConfig(props);
+
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);
List<KafkaMessageStream<Message>> streams = consumer.createMessageStreams(ImmutableMap.of(topic, 1)).get(topic);
ConsumerIterator<Message> iterator = Iterables.getOnlyElement(streams).iterator();
Message msg = iterator.next();
+
assertTrue(msg.isValid());
ByteBuffer buf = msg.payload();
byte[] data = new byte[buf.remaining()];