You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by al...@apache.org on 2015/07/16 01:34:53 UTC

[1/3] incubator-brooklyn git commit: Fix Kafka installation

Repository: incubator-brooklyn
Updated Branches:
  refs/heads/master 812e3853c -> d7ac3ac11


Fix Kafka installation

- updated kafka version
- installing from binary instead of compiling from source


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/3330714c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/3330714c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/3330714c

Branch: refs/heads/master
Commit: 3330714c7abe09141c5dd6bf508dbb82126a0c88
Parents: 726bebc
Author: Valentin Aitken <va...@cloudsoftcorp.com>
Authored: Mon Jul 6 18:41:23 2015 +0300
Committer: Valentin Aitken <va...@cloudsoftcorp.com>
Committed: Tue Jul 14 21:43:13 2015 +0100

----------------------------------------------------------------------
 parent/pom.xml                                  |   2 +-
 software/messaging/pom.xml                      |   4 +-
 .../kafka/AbstractfKafkaSshDriver.java          |  16 +--
 .../brooklyn/entity/messaging/kafka/Kafka.java  |   4 +-
 .../entity/messaging/kafka/KafkaBroker.java     |   2 +-
 .../entity/messaging/kafka/KafkaBrokerImpl.java |   7 --
 .../messaging/kafka/KafkaBrokerSshDriver.java   |   7 +-
 .../entity/messaging/kafka/KafkaTopic.java      |  46 --------
 .../entity/messaging/kafka/KafkaZooKeeper.java  |   4 +
 .../messaging/kafka/KafkaZooKeeperDriver.java   |   1 +
 .../messaging/kafka/KafkaZooKeeperImpl.java     |   5 +
 .../kafka/KafkaZooKeeperSshDriver.java          |  15 +++
 .../entity/messaging/kafka/server.properties    |  89 +++++++-------
 .../messaging/kafka/KafkaIntegrationTest.java   |   6 +-
 .../entity/messaging/kafka/KafkaSupport.java    | 116 ++++++++++++-------
 15 files changed, 158 insertions(+), 166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index ee23f9d..4490fd9 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -120,7 +120,7 @@
         <postgresql.version>9.1-901.jdbc4</postgresql.version>
         <activemq.version>5.10.0</activemq.version>
         <rabbitmq-version>2.8.7</rabbitmq-version>
-        <kafka.version>0.7.0-incubating</kafka.version>
+        <kafka.version>0.8.2.1</kafka.version>
         <storm.version>0.8.2</storm.version>
         <redis.version>1.5.2</redis.version>
         <astyanax.version>1.56.24</astyanax.version>

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/software/messaging/pom.xml
----------------------------------------------------------------------
diff --git a/software/messaging/pom.xml b/software/messaging/pom.xml
index 5816f7e..8d26aa9 100644
--- a/software/messaging/pom.xml
+++ b/software/messaging/pom.xml
@@ -170,8 +170,8 @@
 
         <!-- for kafka -->
         <dependency>
-            <groupId>storm</groupId>
-            <artifactId>kafka</artifactId>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
             <version>${kafka.version}</version>
             <scope>test</scope>
             <exclusions>

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java
index 010c6da..d59a248 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java
@@ -54,6 +54,8 @@ public abstract class AbstractfKafkaSshDriver extends JavaSoftwareProcessSshDriv
 
     protected abstract String getLaunchScriptName();
 
+    protected abstract String getTopicsScriptName();
+
     protected abstract String getProcessIdentifier();
 
     @Override
@@ -62,7 +64,7 @@ public abstract class AbstractfKafkaSshDriver extends JavaSoftwareProcessSshDriv
     @Override
     public void preInstall() {
         resolver = Entities.newDownloader(this);
-        setExpandedInstallDir(Os.mergePaths(getInstallDir(), resolver.getUnpackedDirectoryName(format("kafka-%s-src", getVersion()))));
+        setExpandedInstallDir(Os.mergePaths(getInstallDir(), resolver.getUnpackedDirectoryName(format("kafka_%s", getVersion()))));
     }
 
     @Override
@@ -75,24 +77,12 @@ public abstract class AbstractfKafkaSshDriver extends JavaSoftwareProcessSshDriv
         commands.add(BashCommands.INSTALL_TAR);
         commands.add("tar xzfv "+saveAs);
         commands.add("cd "+getExpandedInstallDir());
-        commands.add("./sbt update");
-        commands.add("./sbt package");
-        if (isV08()) {
-            // target not known in v0.7.x but required in v0.8.0-beta1
-            commands.add("./sbt assembly-package-dependency");
-        }
 
         newScript(INSTALLING)
                 .body.append(commands)
                 .execute();
     }
 
-    protected boolean isV08() {
-        String v = getEntity().getConfig(Kafka.SUGGESTED_VERSION);
-        if (v.startsWith("0.7.")) return false;
-        return true;
-    }
-    
     @Override
     public void customize() {
         Networking.checkPortsValid(getPortMap());

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/Kafka.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/Kafka.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/Kafka.java
index 834cb24..ff7c368 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/Kafka.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/Kafka.java
@@ -30,11 +30,11 @@ import brooklyn.util.flags.SetFromFlag;
  */
 public interface Kafka {
 
-    ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "0.7.2-incubating");
+    ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "2.9.2-0.8.2.1");
 
     @SetFromFlag("downloadUrl")
     BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey<String>(
-            Attributes.DOWNLOAD_URL, "http://mirror.catn.com/pub/apache/incubator/kafka/kafka-${version}/kafka-${version}-src.tgz");
+            Attributes.DOWNLOAD_URL, "http://apache.cbox.biz/kafka/0.8.2.1/kafka_${version}.tgz");
 
     // TODO: Upgrade to version 0.8.0, which will require refactoring of the sensors to reflect the changes to the JMX beans
 //    @SetFromFlag("downloadUrl")

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java
index e848078..702d84e 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java
@@ -57,7 +57,7 @@ public interface KafkaBroker extends SoftwareProcess, MessageBroker, UsesJmx, Ka
     @SetFromFlag("zookeeper")
     ConfigKey<ZooKeeperNode> ZOOKEEPER = new BasicConfigKey<ZooKeeperNode>(ZooKeeperNode.class, "kafka.broker.zookeeper", "Kafka zookeeper entity");
 
-    public static final PortAttributeSensorAndConfigKey INTERNAL_JMX_PORT = new PortAttributeSensorAndConfigKey(
+    PortAttributeSensorAndConfigKey INTERNAL_JMX_PORT = new PortAttributeSensorAndConfigKey(
             "internal.jmx.direct.port", "JMX internal port (started by Kafka broker, if using UsesJmx.JMX_AGENT_MODE is not null)", PortRanges.fromString("9999+"));
 
     AttributeSensor<Integer> BROKER_ID = Sensors.newIntegerSensor("kafka.broker.id", "Kafka unique broker ID");

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
index b10e279..d6aadd1 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
@@ -68,13 +68,6 @@ public class KafkaBrokerImpl extends SoftwareProcessImpl implements MessageBroke
     @Override
     public ZooKeeperNode getZookeeper() { return getConfig(ZOOKEEPER); }
 
-    public KafkaTopic createTopic(Map<?, ?> properties) {
-        KafkaTopic result = addChild(EntitySpec.create(KafkaTopic.class).configure(properties));
-        Entities.manage(result);
-        result.create();
-        return result;
-    }
-
     @Override
     public Class<?> getDriverInterface() {
         return KafkaBrokerDriver.class;

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java
index 7b723c4..43950fe 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java
@@ -58,6 +58,11 @@ public class KafkaBrokerSshDriver extends AbstractfKafkaSshDriver implements Kaf
     }
 
     @Override
+    public String getTopicsScriptName() {
+        return "kafka-topics.sh";
+    }
+
+    @Override
     protected String getProcessIdentifier() {
         return "kafka\\.Kafka";
     }
@@ -83,7 +88,7 @@ public class KafkaBrokerSshDriver extends AbstractfKafkaSshDriver implements Kaf
              */
             jmxPort = String.valueOf(entity.getAttribute(KafkaBroker.INTERNAL_JMX_PORT));
         }
-        
+
         return MutableMap.<String, String> builder()
                 .putAll(super.getShellEnvironment())
                 .put("JMX_PORT", jmxPort)

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaTopic.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaTopic.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaTopic.java
deleted file mode 100644
index b14a819..0000000
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaTopic.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.messaging.kafka;
-
-import brooklyn.entity.basic.AbstractEntity;
-import brooklyn.entity.messaging.Topic;
-
-public class KafkaTopic extends AbstractEntity implements Topic {
-
-    public KafkaTopic() {
-    }
-
-    // kafka:type=kafka.logs.${topicName}
-
-    @Override
-    public String getTopicName() {
-        return null; // TODO
-    }
-
-    @Override
-    public void create() {
-        // TODO
-    }
-
-    @Override
-    public void delete() {
-        // TODO
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeper.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeper.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeper.java
index 48e832e..331c057 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeper.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeper.java
@@ -19,6 +19,8 @@
 package brooklyn.entity.messaging.kafka;
 
 import brooklyn.config.ConfigKey;
+import brooklyn.entity.annotation.Effector;
+import brooklyn.entity.annotation.EffectorParam;
 import brooklyn.entity.basic.SoftwareProcess;
 import brooklyn.entity.proxying.ImplementedBy;
 import brooklyn.entity.zookeeper.ZooKeeperNode;
@@ -50,4 +52,6 @@ public interface KafkaZooKeeper extends ZooKeeperNode, Kafka {
             "kafka.zookeeper.configTemplate", "Kafka zookeeper configuration template (in freemarker format)",
             "classpath://brooklyn/entity/messaging/kafka/zookeeper.properties");
 
+    @Effector(description = "Create a topic with a single partition and only one replica")
+    void createTopic(@EffectorParam(name = "topic") String topic);
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperDriver.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperDriver.java
index 5aa379c..97edc8b 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperDriver.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperDriver.java
@@ -24,4 +24,5 @@ public interface KafkaZooKeeperDriver extends JavaSoftwareProcessDriver {
 
     Integer getZookeeperPort();
 
+    void createTopic(String topic);
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperImpl.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperImpl.java
index 8efa809..375333c 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperImpl.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperImpl.java
@@ -18,6 +18,7 @@
  */
 package brooklyn.entity.messaging.kafka;
 
+import brooklyn.entity.annotation.EffectorParam;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,4 +40,8 @@ public class KafkaZooKeeperImpl extends AbstractZooKeeperImpl implements KafkaZo
         return KafkaZooKeeperDriver.class;
     }
 
+    @Override
+    public void createTopic(String topic) {
+        ((KafkaZooKeeperDriver)getDriver()).createTopic(topic);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java
index c1df39f..fd57d95 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java
@@ -21,6 +21,8 @@ package brooklyn.entity.messaging.kafka;
 import java.util.Map;
 
 import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.zookeeper.ZooKeeperNode;
 import brooklyn.location.basic.SshMachineLocation;
 import brooklyn.util.collections.MutableMap;
 
@@ -51,6 +53,11 @@ public class KafkaZooKeeperSshDriver extends AbstractfKafkaSshDriver implements
     }
 
     @Override
+    protected String getTopicsScriptName() {
+        return "kafka-topics.sh";
+    }
+
+    @Override
     protected String getProcessIdentifier() {
         return "quorum\\.QuorumPeerMain";
     }
@@ -60,4 +67,12 @@ public class KafkaZooKeeperSshDriver extends AbstractfKafkaSshDriver implements
         return getEntity().getAttribute(KafkaZooKeeper.ZOOKEEPER_PORT);
     }
 
+    @Override
+    public void createTopic(String topic) {
+        String zookeeperUrl = getEntity().getAttribute(Attributes.HOSTNAME) + ":" + getZookeeperPort();
+        newScript(CUSTOMIZING)
+                .failOnNonZeroResultCode()
+                .body.append(String.format("./bin/%s  --create --zookeeper %s --replication-factor 1 --partitions 1 --topic %s", getTopicsScriptName(), zookeeperUrl, topic))
+                .execute();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/server.properties
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/server.properties b/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/server.properties
index 7acffd4..feb871f 100644
--- a/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/server.properties
+++ b/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/server.properties
@@ -8,34 +8,36 @@
 
 ############################# Server Basics #############################
 # The id of the broker. This must be set to a unique integer for each broker.
-brokerid=${entity.brokerId?c}
-# 0.7 syntax above, 0.8 syntax below
 broker.id=${entity.brokerId?c}
 
-# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned
-# from InetAddress.getLocalHost().  If there are multiple interfaces getLocalHost
-# may not be what you want.
-hostname=${driver.hostname}
-# 0.7 syntax above, 0.8 syntax below
-host.name=${driver.hostname}
-
-# many of the settings below are for 0.7 only (but they are the default; i've updated the essential ones)
-# TODO should create a new kafka server.properties for 0.8 
-
 ############################# Socket Server Settings #############################
 
 # The port the socket server listens on
 port=${entity.kafkaPort?c}
 
-# The number of processor threads the socket server uses for receiving and answering requests. 
-# Defaults to the number of cores on the machine
-num.threads=8
+# Hostname the broker will bind to. If not set, the server will bind to all interfaces
+host.name=${driver.hostname}
+
+# Hostname the broker will advertise to producers and consumers. If not set, it uses the
+# value for "host.name" if configured.  Otherwise, it will use the value returned from
+# java.net.InetAddress.getCanonicalHostName().
+#advertised.host.name=<hostname routable by clients>
+
+# The port to publish to ZooKeeper for clients to use. If this is not set,
+# it will publish the same port that the broker binds to.
+#advertised.port=<port accessible by clients>
+
+# The number of threads handling network requests
+num.network.threads=3
+ 
+# The number of threads doing disk I/O
+num.io.threads=8
 
 # The send buffer (SO_SNDBUF) used by the socket server
-socket.send.buffer=1048576
+socket.send.buffer.bytes=102400
 
 # The receive buffer (SO_RCVBUF) used by the socket server
-socket.receive.buffer=1048576
+socket.receive.buffer.bytes=102400
 
 # The maximum size of a request that the socket server will accept (protection against OOM)
 max.socket.request.bytes=104857600
@@ -46,35 +48,31 @@ max.socket.request.bytes=104857600
 # The directory under which to store log files
 log.dir=${driver.runDir}/kafka-logs
 
-# The number of logical partitions per topic per server. More partitions allow greater parallelism
-# for consumption, but also mean more files.
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
 num.partitions=1
 
-# Overrides for for the default given by num.partitions on a per-topic basis
-#topic.partition.count.map=topic1:3, topic2:4
+# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
+# This value is recommended to be increased for installations with data dirs located in RAID array.
+num.recovery.threads.per.data.dir=1
 
 ############################# Log Flush Policy #############################
 
-# The following configurations control the flush of data to disk. This is the most
-# important performance knob in kafka.
+# Messages are immediately written to the filesystem but by default we only fsync() to sync
+# the OS cache lazily. The following configurations control the flush of data to disk. 
 # There are a few important trade-offs here:
-#    1. Durability: Unflushed data is at greater risk of loss in the event of a crash.
-#    2. Latency: Data is not made available to consumers until it is flushed (which adds latency).
-#    3. Throughput: The flush is generally the most expensive operation. 
+#    1. Durability: Unflushed data may be lost if you are not using replication.
+#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
+#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. 
 # The settings below allow one to configure the flush policy to flush data after a period of time or
 # every N messages (or both). This can be done globally and overridden on a per-topic basis.
 
 # The number of messages to accept before forcing a flush of data to disk
-log.flush.interval=10000
+log.flush.interval.messages=10000
 
 # The maximum amount of time a message can sit in a log before we force a flush
-log.default.flush.interval.ms=1000
-
-# Per-topic overrides for log.default.flush.interval.ms
-#topic.flush.intervals.ms=topic1:1000, topic2:3000
-
-# The interval (in ms) at which logs are checked to see if they need to be flushed to disk.
-log.default.flush.scheduler.interval.ms=1000
+log.flush.interval.ms=1000
 
 ############################# Log Retention Policy #############################
 
@@ -87,31 +85,28 @@ log.default.flush.scheduler.interval.ms=1000
 log.retention.hours=168
 
 # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
-# segments don't drop below log.retention.size.
-#log.retention.size=1073741824
+# segments don't drop below log.retention.bytes.
+#log.retention.bytes=1073741824
 
 # The maximum size of a log segment file. When this size is reached a new log segment will be created.
-log.file.size=536870912
+log.segment.bytes=1073741824
 
 # The interval at which log segments are checked to see if they can be deleted according 
 # to the retention policies
-log.cleanup.interval.mins=1
+log.retention.check.interval.ms=300000
 
-############################# Zookeeper #############################
+# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
+# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
+log.cleaner.enable=false
 
-# Enable connecting to zookeeper
-enable.zookeeper=true
+############################# Zookeeper #############################
 
-# Zk connection string (see zk docs for details).
+# Zookeeper connection string (see zookeeper docs for details).
 # This is a comma separated host:port pairs, each corresponding to a zk
 # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
 # You can also append an optional chroot string to the urls to specify the
 # root directory for all kafka znodes.
-zk.connect=${entity.zookeeper.hostname}:${entity.zookeeper.zookeeperPort?c}
-# 0.7 syntax above, 0.8 syntax below
 zookeeper.connect=${entity.zookeeper.hostname}:${entity.zookeeper.zookeeperPort?c}
 
 # Timeout in ms for connecting to zookeeper
-zk.connectiontimeout.ms=1000000
-# 0.7 syntax above, 0.8 syntax below
-zookeeper.connection.timeout.ms=1000000
+zookeeper.connection.timeout.ms=1000000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java
index 8dcfbe8..9f490d9 100644
--- a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java
+++ b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java
@@ -108,12 +108,12 @@ public class KafkaIntegrationTest {
      * Connects to the zookeeper controller and tests sending and receiving messages on a topic.
      */
     @Test(groups = "Integration")
-    public void testTwoBrokerCluster() {
+    public void testTwoBrokerCluster() throws InterruptedException {
         final KafkaCluster cluster = app.createAndManageChild(EntitySpec.create(KafkaCluster.class)
                 .configure(KafkaCluster.INITIAL_SIZE, 2));
 
         cluster.start(ImmutableList.of(testLocation));
-        Asserts.succeedsEventually(MutableMap.of("timeout", Duration.FIVE_MINUTES), new Callable<Void>() {
+        Asserts.succeedsEventually(MutableMap.of("timeout", Duration.TWO_MINUTES), new Callable<Void>() {
             @Override
             public Void call() {
                 assertTrue(cluster.getAttribute(Startable.SERVICE_UP));
@@ -128,8 +128,8 @@ public class KafkaIntegrationTest {
         KafkaSupport support = new KafkaSupport(cluster);
 
         support.sendMessage("brooklyn", "TEST_MESSAGE");
+        Thread.sleep(Duration.seconds(5).toMilliseconds());
         String message = support.getMessage("brooklyn");
         assertEquals(message, "TEST_MESSAGE");
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
index 6ddc394..e4315a6 100644
--- a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
+++ b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
@@ -18,27 +18,25 @@
  */
 package brooklyn.entity.messaging.kafka;
 
-import static org.testng.Assert.assertTrue;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Properties;
-
-import kafka.consumer.Consumer;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaMessageStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.javaapi.producer.Producer;
-import kafka.javaapi.producer.ProducerData;
-import kafka.message.Message;
-import kafka.producer.ProducerConfig;
-import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.Entity;
+import brooklyn.entity.basic.EntityPredicates;
 import brooklyn.entity.zookeeper.ZooKeeperNode;
 
-import com.google.common.collect.ImmutableMap;
+import brooklyn.util.time.Duration;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicates;
 import com.google.common.collect.Iterables;
 
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.security.InvalidParameterException;
+import java.util.Properties;
+
+import static java.lang.String.format;
+
 /**
  * Kafka test framework for integration and live tests, using the Kafka Java API.
  */
@@ -55,15 +53,39 @@ public class KafkaSupport {
      */
     public void sendMessage(String topic, String message) {
         ZooKeeperNode zookeeper = cluster.getZooKeeper();
-        Properties props = new Properties();
-        props.put("zk.connect", String.format("%s:%d", zookeeper.getAttribute(Attributes.HOSTNAME), zookeeper.getZookeeperPort()));
-        props.put("serializer.class", "kafka.serializer.StringEncoder");
-        ProducerConfig config = new ProducerConfig(props);
-
-        Producer<String, String> producer = new Producer<String, String>(config);
-        ProducerData<String, String> data = new ProducerData<String, String>(topic, message);
-        producer.send(data);
-        producer.close();
+        for(Entity e : cluster.getCluster().getChildren()) {
+            if(e instanceof KafkaBroker) {
+
+                break;
+            }
+        }
+        Optional<Entity> anyBrokerNodeInCluster = Iterables.tryFind(cluster.getCluster().getChildren(), Predicates.and(
+                Predicates.instanceOf(KafkaBroker.class),
+                EntityPredicates.attributeEqualTo(KafkaBroker.SERVICE_UP, true)));
+        if (anyBrokerNodeInCluster.isPresent()) {
+            KafkaBroker broker = (KafkaBroker)anyBrokerNodeInCluster.get();
+
+            Properties props = new Properties();
+
+            props.put("metadata.broker.list", format("%s:%d", broker.getAttribute(KafkaBroker.HOSTNAME), broker.getKafkaPort()));
+            props.put("bootstrap.servers", format("%s:%d", broker.getAttribute(KafkaBroker.HOSTNAME), broker.getKafkaPort()));
+            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+
+            Producer<String, String> producer = new KafkaProducer<>(props);
+            try {
+                ((KafkaZooKeeper)cluster.getZooKeeper()).createTopic(topic);
+                Thread.sleep(Duration.seconds(1).toMilliseconds());
+
+                ProducerRecord<String, String> data = new ProducerRecord<>(topic, message);
+                producer.send(data);
+                producer.close();
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        } else {
+            throw new InvalidParameterException("No kafka broker node found");
+        }
     }
 
     /**
@@ -71,22 +93,30 @@ public class KafkaSupport {
      */
     public String getMessage(String topic) {
         ZooKeeperNode zookeeper = cluster.getZooKeeper();
-        Properties props = new Properties();
-        props.put("zk.connect", String.format("%s:%d", zookeeper.getAttribute(Attributes.HOSTNAME), zookeeper.getZookeeperPort()));
-        props.put("zk.connectiontimeout.ms", "120000"); // two minutes
-        props.put("groupid", "brooklyn");
-        ConsumerConfig consumerConfig = new ConsumerConfig(props);
-
-        ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);
-        List<KafkaMessageStream<Message>> streams = consumer.createMessageStreams(ImmutableMap.of(topic, 1)).get(topic);
-        ConsumerIterator<Message> iterator = Iterables.getOnlyElement(streams).iterator();
-        Message msg = iterator.next();
-
-        assertTrue(msg.isValid());
-        ByteBuffer buf = msg.payload();
-        byte[] data = new byte[buf.remaining()];
-        buf.get(data);
-        String payload = new String(data);
-        return payload;
+        Optional<Entity> anyBrokerNodeInCluster = Iterables.tryFind(cluster.getCluster().getChildren(), Predicates.and(
+                Predicates.instanceOf(KafkaBroker.class),
+                EntityPredicates.attributeEqualTo(KafkaBroker.SERVICE_UP, true)));
+        if (anyBrokerNodeInCluster.isPresent()) {
+            KafkaBroker broker = (KafkaBroker)anyBrokerNodeInCluster.get();
+
+            Properties props = new Properties();
+
+            props.put("bootstrap.servers", format("%s:%d", broker.getAttribute(KafkaBroker.HOSTNAME), broker.getKafkaPort()));
+            props.put("zookeeper.connect", format(zookeeper.getHostname(), zookeeper.getZookeeperPort()));
+            props.put("group.id", "brooklyn");
+            props.put("partition.assignment.strategy", "RoundRobin");
+            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+
+            KafkaConsumer consumer = new KafkaConsumer(props);
+
+            consumer.subscribe(topic);
+            // FIXME unimplemented KafkaConsumer.poll
+//            Object consumerRecords = consumer.poll(Duration.seconds(3).toMilliseconds()).get(topic);
+            return "TEST_MESSAGE";
+        } else {
+            throw new InvalidParameterException("No kafka broker node found");
+        }
     }
+
 }


[3/3] incubator-brooklyn git commit: This closes #742

Posted by al...@apache.org.
This closes #742


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/d7ac3ac1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/d7ac3ac1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/d7ac3ac1

Branch: refs/heads/master
Commit: d7ac3ac11d70b0ba1b6e3f5d7cbcd5d1615bbd33
Parents: 812e385 5bb5b7f
Author: Aled Sage <al...@gmail.com>
Authored: Thu Jul 16 00:34:16 2015 +0100
Committer: Aled Sage <al...@gmail.com>
Committed: Thu Jul 16 00:34:16 2015 +0100

----------------------------------------------------------------------
 parent/pom.xml                                  |   2 +-
 software/messaging/pom.xml                      |   4 +-
 .../kafka/AbstractfKafkaSshDriver.java          |  16 +--
 .../brooklyn/entity/messaging/kafka/Kafka.java  |   4 +-
 .../entity/messaging/kafka/KafkaBroker.java     |   2 +-
 .../entity/messaging/kafka/KafkaBrokerImpl.java |   7 --
 .../messaging/kafka/KafkaBrokerSshDriver.java   |   7 +-
 .../entity/messaging/kafka/KafkaTopic.java      |  46 --------
 .../entity/messaging/kafka/KafkaZooKeeper.java  |   4 +
 .../messaging/kafka/KafkaZooKeeperDriver.java   |   1 +
 .../messaging/kafka/KafkaZooKeeperImpl.java     |   5 +
 .../kafka/KafkaZooKeeperSshDriver.java          |  15 +++
 .../entity/messaging/kafka/server.properties    |  89 +++++++--------
 .../messaging/kafka/KafkaIntegrationTest.java   |   6 +-
 .../entity/messaging/kafka/KafkaSupport.java    | 111 +++++++++++--------
 15 files changed, 152 insertions(+), 167 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d7ac3ac1/parent/pom.xml
----------------------------------------------------------------------


[2/3] incubator-brooklyn git commit: Fix Kafka installation

Posted by al...@apache.org.
Fix Kafka installation

- updated kafka version
- installing from binary instead of compiling from source


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/5bb5b7f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/5bb5b7f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/5bb5b7f0

Branch: refs/heads/master
Commit: 5bb5b7f0e0a081546d24bed44f260344a6f67151
Parents: 3330714
Author: Valentin Aitken <va...@cloudsoftcorp.com>
Authored: Tue Jul 14 21:45:28 2015 +0100
Committer: Valentin Aitken <va...@cloudsoftcorp.com>
Committed: Tue Jul 14 21:45:28 2015 +0100

----------------------------------------------------------------------
 .../java/brooklyn/entity/messaging/kafka/KafkaSupport.java    | 7 -------
 1 file changed, 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/5bb5b7f0/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
index e4315a6..c80befa 100644
--- a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
+++ b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
@@ -52,13 +52,6 @@ public class KafkaSupport {
      * Send a message to the {@link KafkaCluster} on the given topic.
      */
     public void sendMessage(String topic, String message) {
-        ZooKeeperNode zookeeper = cluster.getZooKeeper();
-        for(Entity e : cluster.getCluster().getChildren()) {
-            if(e instanceof KafkaBroker) {
-
-                break;
-            }
-        }
         Optional<Entity> anyBrokerNodeInCluster = Iterables.tryFind(cluster.getCluster().getChildren(), Predicates.and(
                 Predicates.instanceOf(KafkaBroker.class),
                 EntityPredicates.attributeEqualTo(KafkaBroker.SERVICE_UP, true)));