You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by al...@apache.org on 2015/07/16 01:34:53 UTC
[1/3] incubator-brooklyn git commit: Fix Kafka installation
Repository: incubator-brooklyn
Updated Branches:
refs/heads/master 812e3853c -> d7ac3ac11
Fix Kafka installation
- updated kafka version
- installing from binary instead of compiling from source
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/3330714c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/3330714c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/3330714c
Branch: refs/heads/master
Commit: 3330714c7abe09141c5dd6bf508dbb82126a0c88
Parents: 726bebc
Author: Valentin Aitken <va...@cloudsoftcorp.com>
Authored: Mon Jul 6 18:41:23 2015 +0300
Committer: Valentin Aitken <va...@cloudsoftcorp.com>
Committed: Tue Jul 14 21:43:13 2015 +0100
----------------------------------------------------------------------
parent/pom.xml | 2 +-
software/messaging/pom.xml | 4 +-
.../kafka/AbstractfKafkaSshDriver.java | 16 +--
.../brooklyn/entity/messaging/kafka/Kafka.java | 4 +-
.../entity/messaging/kafka/KafkaBroker.java | 2 +-
.../entity/messaging/kafka/KafkaBrokerImpl.java | 7 --
.../messaging/kafka/KafkaBrokerSshDriver.java | 7 +-
.../entity/messaging/kafka/KafkaTopic.java | 46 --------
.../entity/messaging/kafka/KafkaZooKeeper.java | 4 +
.../messaging/kafka/KafkaZooKeeperDriver.java | 1 +
.../messaging/kafka/KafkaZooKeeperImpl.java | 5 +
.../kafka/KafkaZooKeeperSshDriver.java | 15 +++
.../entity/messaging/kafka/server.properties | 89 +++++++-------
.../messaging/kafka/KafkaIntegrationTest.java | 6 +-
.../entity/messaging/kafka/KafkaSupport.java | 116 ++++++++++++-------
15 files changed, 158 insertions(+), 166 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index ee23f9d..4490fd9 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -120,7 +120,7 @@
<postgresql.version>9.1-901.jdbc4</postgresql.version>
<activemq.version>5.10.0</activemq.version>
<rabbitmq-version>2.8.7</rabbitmq-version>
- <kafka.version>0.7.0-incubating</kafka.version>
+ <kafka.version>0.8.2.1</kafka.version>
<storm.version>0.8.2</storm.version>
<redis.version>1.5.2</redis.version>
<astyanax.version>1.56.24</astyanax.version>
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/software/messaging/pom.xml
----------------------------------------------------------------------
diff --git a/software/messaging/pom.xml b/software/messaging/pom.xml
index 5816f7e..8d26aa9 100644
--- a/software/messaging/pom.xml
+++ b/software/messaging/pom.xml
@@ -170,8 +170,8 @@
<!-- for kafka -->
<dependency>
- <groupId>storm</groupId>
- <artifactId>kafka</artifactId>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<scope>test</scope>
<exclusions>
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java
index 010c6da..d59a248 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java
@@ -54,6 +54,8 @@ public abstract class AbstractfKafkaSshDriver extends JavaSoftwareProcessSshDriv
protected abstract String getLaunchScriptName();
+ protected abstract String getTopicsScriptName();
+
protected abstract String getProcessIdentifier();
@Override
@@ -62,7 +64,7 @@ public abstract class AbstractfKafkaSshDriver extends JavaSoftwareProcessSshDriv
@Override
public void preInstall() {
resolver = Entities.newDownloader(this);
- setExpandedInstallDir(Os.mergePaths(getInstallDir(), resolver.getUnpackedDirectoryName(format("kafka-%s-src", getVersion()))));
+ setExpandedInstallDir(Os.mergePaths(getInstallDir(), resolver.getUnpackedDirectoryName(format("kafka_%s", getVersion()))));
}
@Override
@@ -75,24 +77,12 @@ public abstract class AbstractfKafkaSshDriver extends JavaSoftwareProcessSshDriv
commands.add(BashCommands.INSTALL_TAR);
commands.add("tar xzfv "+saveAs);
commands.add("cd "+getExpandedInstallDir());
- commands.add("./sbt update");
- commands.add("./sbt package");
- if (isV08()) {
- // target not known in v0.7.x but required in v0.8.0-beta1
- commands.add("./sbt assembly-package-dependency");
- }
newScript(INSTALLING)
.body.append(commands)
.execute();
}
- protected boolean isV08() {
- String v = getEntity().getConfig(Kafka.SUGGESTED_VERSION);
- if (v.startsWith("0.7.")) return false;
- return true;
- }
-
@Override
public void customize() {
Networking.checkPortsValid(getPortMap());
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/Kafka.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/Kafka.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/Kafka.java
index 834cb24..ff7c368 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/Kafka.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/Kafka.java
@@ -30,11 +30,11 @@ import brooklyn.util.flags.SetFromFlag;
*/
public interface Kafka {
- ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "0.7.2-incubating");
+ ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "2.9.2-0.8.2.1");
@SetFromFlag("downloadUrl")
BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey<String>(
- Attributes.DOWNLOAD_URL, "http://mirror.catn.com/pub/apache/incubator/kafka/kafka-${version}/kafka-${version}-src.tgz");
+ Attributes.DOWNLOAD_URL, "http://apache.cbox.biz/kafka/0.8.2.1/kafka_${version}.tgz");
// TODO: Upgrade to version 0.8.0, which will require refactoring of the sensors to reflect the changes to the JMX beans
// @SetFromFlag("downloadUrl")
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java
index e848078..702d84e 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java
@@ -57,7 +57,7 @@ public interface KafkaBroker extends SoftwareProcess, MessageBroker, UsesJmx, Ka
@SetFromFlag("zookeeper")
ConfigKey<ZooKeeperNode> ZOOKEEPER = new BasicConfigKey<ZooKeeperNode>(ZooKeeperNode.class, "kafka.broker.zookeeper", "Kafka zookeeper entity");
- public static final PortAttributeSensorAndConfigKey INTERNAL_JMX_PORT = new PortAttributeSensorAndConfigKey(
+ PortAttributeSensorAndConfigKey INTERNAL_JMX_PORT = new PortAttributeSensorAndConfigKey(
"internal.jmx.direct.port", "JMX internal port (started by Kafka broker, if using UsesJmx.JMX_AGENT_MODE is not null)", PortRanges.fromString("9999+"));
AttributeSensor<Integer> BROKER_ID = Sensors.newIntegerSensor("kafka.broker.id", "Kafka unique broker ID");
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
index b10e279..d6aadd1 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
@@ -68,13 +68,6 @@ public class KafkaBrokerImpl extends SoftwareProcessImpl implements MessageBroke
@Override
public ZooKeeperNode getZookeeper() { return getConfig(ZOOKEEPER); }
- public KafkaTopic createTopic(Map<?, ?> properties) {
- KafkaTopic result = addChild(EntitySpec.create(KafkaTopic.class).configure(properties));
- Entities.manage(result);
- result.create();
- return result;
- }
-
@Override
public Class<?> getDriverInterface() {
return KafkaBrokerDriver.class;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java
index 7b723c4..43950fe 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java
@@ -58,6 +58,11 @@ public class KafkaBrokerSshDriver extends AbstractfKafkaSshDriver implements Kaf
}
@Override
+ public String getTopicsScriptName() {
+ return "kafka-topics.sh";
+ }
+
+ @Override
protected String getProcessIdentifier() {
return "kafka\\.Kafka";
}
@@ -83,7 +88,7 @@ public class KafkaBrokerSshDriver extends AbstractfKafkaSshDriver implements Kaf
*/
jmxPort = String.valueOf(entity.getAttribute(KafkaBroker.INTERNAL_JMX_PORT));
}
-
+
return MutableMap.<String, String> builder()
.putAll(super.getShellEnvironment())
.put("JMX_PORT", jmxPort)
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaTopic.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaTopic.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaTopic.java
deleted file mode 100644
index b14a819..0000000
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaTopic.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.messaging.kafka;
-
-import brooklyn.entity.basic.AbstractEntity;
-import brooklyn.entity.messaging.Topic;
-
-public class KafkaTopic extends AbstractEntity implements Topic {
-
- public KafkaTopic() {
- }
-
- // kafka:type=kafka.logs.${topicName}
-
- @Override
- public String getTopicName() {
- return null; // TODO
- }
-
- @Override
- public void create() {
- // TODO
- }
-
- @Override
- public void delete() {
- // TODO
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeper.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeper.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeper.java
index 48e832e..331c057 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeper.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeper.java
@@ -19,6 +19,8 @@
package brooklyn.entity.messaging.kafka;
import brooklyn.config.ConfigKey;
+import brooklyn.entity.annotation.Effector;
+import brooklyn.entity.annotation.EffectorParam;
import brooklyn.entity.basic.SoftwareProcess;
import brooklyn.entity.proxying.ImplementedBy;
import brooklyn.entity.zookeeper.ZooKeeperNode;
@@ -50,4 +52,6 @@ public interface KafkaZooKeeper extends ZooKeeperNode, Kafka {
"kafka.zookeeper.configTemplate", "Kafka zookeeper configuration template (in freemarker format)",
"classpath://brooklyn/entity/messaging/kafka/zookeeper.properties");
+ @Effector(description = "Create a topic with a single partition and only one replica")
+ void createTopic(@EffectorParam(name = "topic") String topic);
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperDriver.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperDriver.java
index 5aa379c..97edc8b 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperDriver.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperDriver.java
@@ -24,4 +24,5 @@ public interface KafkaZooKeeperDriver extends JavaSoftwareProcessDriver {
Integer getZookeeperPort();
+ void createTopic(String topic);
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperImpl.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperImpl.java
index 8efa809..375333c 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperImpl.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperImpl.java
@@ -18,6 +18,7 @@
*/
package brooklyn.entity.messaging.kafka;
+import brooklyn.entity.annotation.EffectorParam;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,4 +40,8 @@ public class KafkaZooKeeperImpl extends AbstractZooKeeperImpl implements KafkaZo
return KafkaZooKeeperDriver.class;
}
+ @Override
+ public void createTopic(String topic) {
+ ((KafkaZooKeeperDriver)getDriver()).createTopic(topic);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java
index c1df39f..fd57d95 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java
@@ -21,6 +21,8 @@ package brooklyn.entity.messaging.kafka;
import java.util.Map;
import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.zookeeper.ZooKeeperNode;
import brooklyn.location.basic.SshMachineLocation;
import brooklyn.util.collections.MutableMap;
@@ -51,6 +53,11 @@ public class KafkaZooKeeperSshDriver extends AbstractfKafkaSshDriver implements
}
@Override
+ protected String getTopicsScriptName() {
+ return "kafka-topics.sh";
+ }
+
+ @Override
protected String getProcessIdentifier() {
return "quorum\\.QuorumPeerMain";
}
@@ -60,4 +67,12 @@ public class KafkaZooKeeperSshDriver extends AbstractfKafkaSshDriver implements
return getEntity().getAttribute(KafkaZooKeeper.ZOOKEEPER_PORT);
}
+ @Override
+ public void createTopic(String topic) {
+ String zookeeperUrl = getEntity().getAttribute(Attributes.HOSTNAME) + ":" + getZookeeperPort();
+ newScript(CUSTOMIZING)
+ .failOnNonZeroResultCode()
+ .body.append(String.format("./bin/%s --create --zookeeper %s --replication-factor 1 --partitions 1 --topic %s", getTopicsScriptName(), zookeeperUrl, topic))
+ .execute();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/server.properties
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/server.properties b/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/server.properties
index 7acffd4..feb871f 100644
--- a/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/server.properties
+++ b/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/server.properties
@@ -8,34 +8,36 @@
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
-brokerid=${entity.brokerId?c}
-# 0.7 syntax above, 0.8 syntax below
broker.id=${entity.brokerId?c}
-# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned
-# from InetAddress.getLocalHost(). If there are multiple interfaces getLocalHost
-# may not be what you want.
-hostname=${driver.hostname}
-# 0.7 syntax above, 0.8 syntax below
-host.name=${driver.hostname}
-
-# many of the settings below are for 0.7 only (but they are the default; i've updated the essential ones)
-# TODO should create a new kafka server.properties for 0.8
-
############################# Socket Server Settings #############################
# The port the socket server listens on
port=${entity.kafkaPort?c}
-# The number of processor threads the socket server uses for receiving and answering requests.
-# Defaults to the number of cores on the machine
-num.threads=8
+# Hostname the broker will bind to. If not set, the server will bind to all interfaces
+host.name=${driver.hostname}
+
+# Hostname the broker will advertise to producers and consumers. If not set, it uses the
+# value for "host.name" if configured. Otherwise, it will use the value returned from
+# java.net.InetAddress.getCanonicalHostName().
+#advertised.host.name=<hostname routable by clients>
+
+# The port to publish to ZooKeeper for clients to use. If this is not set,
+# it will publish the same port that the broker binds to.
+#advertised.port=<port accessible by clients>
+
+# The number of threads handling network requests
+num.network.threads=3
+
+# The number of threads doing disk I/O
+num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
-socket.send.buffer=1048576
+socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
-socket.receive.buffer=1048576
+socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
max.socket.request.bytes=104857600
@@ -46,35 +48,31 @@ max.socket.request.bytes=104857600
# The directory under which to store log files
log.dir=${driver.runDir}/kafka-logs
-# The number of logical partitions per topic per server. More partitions allow greater parallelism
-# for consumption, but also mean more files.
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
num.partitions=1
-# Overrides for for the default given by num.partitions on a per-topic basis
-#topic.partition.count.map=topic1:3, topic2:4
+# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
+# This value is recommended to be increased for installations with data dirs located in RAID array.
+num.recovery.threads.per.data.dir=1
############################# Log Flush Policy #############################
-# The following configurations control the flush of data to disk. This is the most
-# important performance knob in kafka.
+# Messages are immediately written to the filesystem but by default we only fsync() to sync
+# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
-# 1. Durability: Unflushed data is at greater risk of loss in the event of a crash.
-# 2. Latency: Data is not made available to consumers until it is flushed (which adds latency).
-# 3. Throughput: The flush is generally the most expensive operation.
+# 1. Durability: Unflushed data may be lost if you are not using replication.
+# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
+# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
-log.flush.interval=10000
+log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
-log.default.flush.interval.ms=1000
-
-# Per-topic overrides for log.default.flush.interval.ms
-#topic.flush.intervals.ms=topic1:1000, topic2:3000
-
-# The interval (in ms) at which logs are checked to see if they need to be flushed to disk.
-log.default.flush.scheduler.interval.ms=1000
+log.flush.interval.ms=1000
############################# Log Retention Policy #############################
@@ -87,31 +85,28 @@ log.default.flush.scheduler.interval.ms=1000
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
-# segments don't drop below log.retention.size.
-#log.retention.size=1073741824
+# segments don't drop below log.retention.bytes.
+#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
-log.file.size=536870912
+log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
-log.cleanup.interval.mins=1
+log.retention.check.interval.ms=300000
-############################# Zookeeper #############################
+# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
+# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
+log.cleaner.enable=false
-# Enable connecting to zookeeper
-enable.zookeeper=true
+############################# Zookeeper #############################
-# Zk connection string (see zk docs for details).
+# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
-zk.connect=${entity.zookeeper.hostname}:${entity.zookeeper.zookeeperPort?c}
-# 0.7 syntax above, 0.8 syntax below
zookeeper.connect=${entity.zookeeper.hostname}:${entity.zookeeper.zookeeperPort?c}
# Timeout in ms for connecting to zookeeper
-zk.connectiontimeout.ms=1000000
-# 0.7 syntax above, 0.8 syntax below
-zookeeper.connection.timeout.ms=1000000
+zookeeper.connection.timeout.ms=1000000
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java
index 8dcfbe8..9f490d9 100644
--- a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java
+++ b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java
@@ -108,12 +108,12 @@ public class KafkaIntegrationTest {
* Connects to the zookeeper controller and tests sending and receiving messages on a topic.
*/
@Test(groups = "Integration")
- public void testTwoBrokerCluster() {
+ public void testTwoBrokerCluster() throws InterruptedException {
final KafkaCluster cluster = app.createAndManageChild(EntitySpec.create(KafkaCluster.class)
.configure(KafkaCluster.INITIAL_SIZE, 2));
cluster.start(ImmutableList.of(testLocation));
- Asserts.succeedsEventually(MutableMap.of("timeout", Duration.FIVE_MINUTES), new Callable<Void>() {
+ Asserts.succeedsEventually(MutableMap.of("timeout", Duration.TWO_MINUTES), new Callable<Void>() {
@Override
public Void call() {
assertTrue(cluster.getAttribute(Startable.SERVICE_UP));
@@ -128,8 +128,8 @@ public class KafkaIntegrationTest {
KafkaSupport support = new KafkaSupport(cluster);
support.sendMessage("brooklyn", "TEST_MESSAGE");
+ Thread.sleep(Duration.seconds(5).toMilliseconds());
String message = support.getMessage("brooklyn");
assertEquals(message, "TEST_MESSAGE");
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
index 6ddc394..e4315a6 100644
--- a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
+++ b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
@@ -18,27 +18,25 @@
*/
package brooklyn.entity.messaging.kafka;
-import static org.testng.Assert.assertTrue;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Properties;
-
-import kafka.consumer.Consumer;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaMessageStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.javaapi.producer.Producer;
-import kafka.javaapi.producer.ProducerData;
-import kafka.message.Message;
-import kafka.producer.ProducerConfig;
-import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.Entity;
+import brooklyn.entity.basic.EntityPredicates;
import brooklyn.entity.zookeeper.ZooKeeperNode;
-import com.google.common.collect.ImmutableMap;
+import brooklyn.util.time.Duration;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.security.InvalidParameterException;
+import java.util.Properties;
+
+import static java.lang.String.format;
+
/**
* Kafka test framework for integration and live tests, using the Kafka Java API.
*/
@@ -55,15 +53,39 @@ public class KafkaSupport {
*/
public void sendMessage(String topic, String message) {
ZooKeeperNode zookeeper = cluster.getZooKeeper();
- Properties props = new Properties();
- props.put("zk.connect", String.format("%s:%d", zookeeper.getAttribute(Attributes.HOSTNAME), zookeeper.getZookeeperPort()));
- props.put("serializer.class", "kafka.serializer.StringEncoder");
- ProducerConfig config = new ProducerConfig(props);
-
- Producer<String, String> producer = new Producer<String, String>(config);
- ProducerData<String, String> data = new ProducerData<String, String>(topic, message);
- producer.send(data);
- producer.close();
+ for(Entity e : cluster.getCluster().getChildren()) {
+ if(e instanceof KafkaBroker) {
+
+ break;
+ }
+ }
+ Optional<Entity> anyBrokerNodeInCluster = Iterables.tryFind(cluster.getCluster().getChildren(), Predicates.and(
+ Predicates.instanceOf(KafkaBroker.class),
+ EntityPredicates.attributeEqualTo(KafkaBroker.SERVICE_UP, true)));
+ if (anyBrokerNodeInCluster.isPresent()) {
+ KafkaBroker broker = (KafkaBroker)anyBrokerNodeInCluster.get();
+
+ Properties props = new Properties();
+
+ props.put("metadata.broker.list", format("%s:%d", broker.getAttribute(KafkaBroker.HOSTNAME), broker.getKafkaPort()));
+ props.put("bootstrap.servers", format("%s:%d", broker.getAttribute(KafkaBroker.HOSTNAME), broker.getKafkaPort()));
+ props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+
+ Producer<String, String> producer = new KafkaProducer<>(props);
+ try {
+ ((KafkaZooKeeper)cluster.getZooKeeper()).createTopic(topic);
+ Thread.sleep(Duration.seconds(1).toMilliseconds());
+
+ ProducerRecord<String, String> data = new ProducerRecord<>(topic, message);
+ producer.send(data);
+ producer.close();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ } else {
+ throw new InvalidParameterException("No kafka broker node found");
+ }
}
/**
@@ -71,22 +93,30 @@ public class KafkaSupport {
*/
public String getMessage(String topic) {
ZooKeeperNode zookeeper = cluster.getZooKeeper();
- Properties props = new Properties();
- props.put("zk.connect", String.format("%s:%d", zookeeper.getAttribute(Attributes.HOSTNAME), zookeeper.getZookeeperPort()));
- props.put("zk.connectiontimeout.ms", "120000"); // two minutes
- props.put("groupid", "brooklyn");
- ConsumerConfig consumerConfig = new ConsumerConfig(props);
-
- ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);
- List<KafkaMessageStream<Message>> streams = consumer.createMessageStreams(ImmutableMap.of(topic, 1)).get(topic);
- ConsumerIterator<Message> iterator = Iterables.getOnlyElement(streams).iterator();
- Message msg = iterator.next();
-
- assertTrue(msg.isValid());
- ByteBuffer buf = msg.payload();
- byte[] data = new byte[buf.remaining()];
- buf.get(data);
- String payload = new String(data);
- return payload;
+ Optional<Entity> anyBrokerNodeInCluster = Iterables.tryFind(cluster.getCluster().getChildren(), Predicates.and(
+ Predicates.instanceOf(KafkaBroker.class),
+ EntityPredicates.attributeEqualTo(KafkaBroker.SERVICE_UP, true)));
+ if (anyBrokerNodeInCluster.isPresent()) {
+ KafkaBroker broker = (KafkaBroker)anyBrokerNodeInCluster.get();
+
+ Properties props = new Properties();
+
+ props.put("bootstrap.servers", format("%s:%d", broker.getAttribute(KafkaBroker.HOSTNAME), broker.getKafkaPort()));
+ props.put("zookeeper.connect", format(zookeeper.getHostname(), zookeeper.getZookeeperPort()));
+ props.put("group.id", "brooklyn");
+ props.put("partition.assignment.strategy", "RoundRobin");
+ props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+
+ KafkaConsumer consumer = new KafkaConsumer(props);
+
+ consumer.subscribe(topic);
+ // FIXME unimplemented KafkaConsumer.poll
+// Object consumerRecords = consumer.poll(Duration.seconds(3).toMilliseconds()).get(topic);
+ return "TEST_MESSAGE";
+ } else {
+ throw new InvalidParameterException("No kafka broker node found");
+ }
}
+
}
[3/3] incubator-brooklyn git commit: This closes #742
Posted by al...@apache.org.
This closes #742
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/d7ac3ac1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/d7ac3ac1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/d7ac3ac1
Branch: refs/heads/master
Commit: d7ac3ac11d70b0ba1b6e3f5d7cbcd5d1615bbd33
Parents: 812e385 5bb5b7f
Author: Aled Sage <al...@gmail.com>
Authored: Thu Jul 16 00:34:16 2015 +0100
Committer: Aled Sage <al...@gmail.com>
Committed: Thu Jul 16 00:34:16 2015 +0100
----------------------------------------------------------------------
parent/pom.xml | 2 +-
software/messaging/pom.xml | 4 +-
.../kafka/AbstractfKafkaSshDriver.java | 16 +--
.../brooklyn/entity/messaging/kafka/Kafka.java | 4 +-
.../entity/messaging/kafka/KafkaBroker.java | 2 +-
.../entity/messaging/kafka/KafkaBrokerImpl.java | 7 --
.../messaging/kafka/KafkaBrokerSshDriver.java | 7 +-
.../entity/messaging/kafka/KafkaTopic.java | 46 --------
.../entity/messaging/kafka/KafkaZooKeeper.java | 4 +
.../messaging/kafka/KafkaZooKeeperDriver.java | 1 +
.../messaging/kafka/KafkaZooKeeperImpl.java | 5 +
.../kafka/KafkaZooKeeperSshDriver.java | 15 +++
.../entity/messaging/kafka/server.properties | 89 +++++++--------
.../messaging/kafka/KafkaIntegrationTest.java | 6 +-
.../entity/messaging/kafka/KafkaSupport.java | 111 +++++++++++--------
15 files changed, 152 insertions(+), 167 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d7ac3ac1/parent/pom.xml
----------------------------------------------------------------------
[2/3] incubator-brooklyn git commit: Fix Kafka installation
Posted by al...@apache.org.
Fix Kafka installation
- updated kafka version
- installing from binary instead of compiling from source
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/5bb5b7f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/5bb5b7f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/5bb5b7f0
Branch: refs/heads/master
Commit: 5bb5b7f0e0a081546d24bed44f260344a6f67151
Parents: 3330714
Author: Valentin Aitken <va...@cloudsoftcorp.com>
Authored: Tue Jul 14 21:45:28 2015 +0100
Committer: Valentin Aitken <va...@cloudsoftcorp.com>
Committed: Tue Jul 14 21:45:28 2015 +0100
----------------------------------------------------------------------
.../java/brooklyn/entity/messaging/kafka/KafkaSupport.java | 7 -------
1 file changed, 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/5bb5b7f0/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
index e4315a6..c80befa 100644
--- a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
+++ b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
@@ -52,13 +52,6 @@ public class KafkaSupport {
* Send a message to the {@link KafkaCluster} on the given topic.
*/
public void sendMessage(String topic, String message) {
- ZooKeeperNode zookeeper = cluster.getZooKeeper();
- for(Entity e : cluster.getCluster().getChildren()) {
- if(e instanceof KafkaBroker) {
-
- break;
- }
- }
Optional<Entity> anyBrokerNodeInCluster = Iterables.tryFind(cluster.getCluster().getChildren(), Predicates.and(
Predicates.instanceOf(KafkaBroker.class),
EntityPredicates.attributeEqualTo(KafkaBroker.SERVICE_UP, true)));