You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@brooklyn.apache.org by bostko <gi...@git.apache.org> on 2015/07/09 19:59:35 UTC

[GitHub] incubator-brooklyn pull request: Fix KafkaIntegrationTest tests

GitHub user bostko opened a pull request:

    https://github.com/apache/incubator-brooklyn/pull/742

    Fix KafkaIntegrationTest tests

    - updated kafka version
    - installing from binary instead of compiling from source
    
    Note on `KafkaIntegrationTest.testTwoBrokerCluster`:
    The `getMessage()` return value is hardcoded since the new kafka client is still [not implemented](https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L553).
    I tried other version kafka_2.10, 0.8.2-beta  etc, but it is unfortunately it is still not supported, it is available only in trunk.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/bostko/incubator-brooklyn fix_messaging_integration_tests

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-brooklyn/pull/742.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #742
    
----
commit e7e330df9e22979ef622ddf66298fe6f54c0b9e9
Author: Valentin Aitken <va...@cloudsoftcorp.com>
Date:   2015-06-26T10:13:21Z

    Fix java version check
    
    - if it is used openjdk java -version returns `openjdk version "1.8.0_45"'
      which wasn't matched correctly previously

commit f3866bcbf6cb6959ef7f1cf9f0273df92524ec0c
Author: Valentin Aitken <va...@cloudsoftcorp.com>
Date:   2015-07-06T15:41:23Z

    Fix Kafka installation
    
    - updated kafka version
    - installing from binary instead of compiling from source

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-brooklyn pull request: Fix KafkaIntegrationTest tests

Posted by neykov <gi...@git.apache.org>.
Github user neykov commented on a diff in the pull request:

    https://github.com/apache/incubator-brooklyn/pull/742#discussion_r34775372
  
    --- Diff: software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java ---
    @@ -60,4 +67,12 @@ public Integer getZookeeperPort() {
             return getEntity().getAttribute(KafkaZooKeeper.ZOOKEEPER_PORT);
         }
     
    +    @Override
    +    public void createTopic(String topic) {
    +        String zookeeperUrl = getEntity().getAttribute(Attributes.HOSTNAME) + ":" + getZookeeperPort();
    +        newScript(CUSTOMIZING)
    +                .failOnNonZeroResultCode()
    +                .body.append(String.format("./bin/%s  --create --zookeeper %s --replication-factor 1 --partitions 1 --topic %s", getTopicsScriptName(), zookeeperUrl, topic))
    --- End diff --
    
    `getTopicsScriptName()` is coming from the user, right? Should be bash-escaped, see `BashStringEscapes`. Otherwise will let the user execute arbitrary commands (or fail when special characters are used).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-brooklyn pull request: Fix KafkaIntegrationTest tests

Posted by mikezaccardo <gi...@git.apache.org>.
Github user mikezaccardo commented on a diff in the pull request:

    https://github.com/apache/incubator-brooklyn/pull/742#discussion_r34611082
  
    --- Diff: software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java ---
    @@ -55,15 +53,39 @@ public KafkaSupport(KafkaCluster cluster) {
          */
         public void sendMessage(String topic, String message) {
             ZooKeeperNode zookeeper = cluster.getZooKeeper();
    -        Properties props = new Properties();
    -        props.put("zk.connect", String.format("%s:%d", zookeeper.getAttribute(Attributes.HOSTNAME), zookeeper.getZookeeperPort()));
    -        props.put("serializer.class", "kafka.serializer.StringEncoder");
    -        ProducerConfig config = new ProducerConfig(props);
    -
    -        Producer<String, String> producer = new Producer<String, String>(config);
    -        ProducerData<String, String> data = new ProducerData<String, String>(topic, message);
    -        producer.send(data);
    -        producer.close();
    +        for(Entity e : cluster.getCluster().getChildren()) {
    +            if(e instanceof KafkaBroker) {
    +
    +                break;
    +            }
    +        }
    +        Optional<Entity> anyBrokerNodeInCluster = Iterables.tryFind(cluster.getCluster().getChildren(), Predicates.and(
    +                Predicates.instanceOf(KafkaBroker.class),
    +                EntityPredicates.attributeEqualTo(KafkaBroker.SERVICE_UP, true)));
    +        if (anyBrokerNodeInCluster.isPresent()) {
    +            KafkaBroker broker = (KafkaBroker)anyBrokerNodeInCluster.get();
    +
    +            Properties props = new Properties();
    +
    +            props.put("metadata.broker.list", format("%s:%d", broker.getAttribute(KafkaBroker.HOSTNAME), broker.getKafkaPort()));
    +            props.put("bootstrap.servers", format("%s:%d", broker.getAttribute(KafkaBroker.HOSTNAME), broker.getKafkaPort()));
    +            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    +            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    +
    +            Producer<String, String> producer = new KafkaProducer<>(props);
    +            try {
    +                ((KafkaZooKeeper)cluster.getZooKeeper()).createTopic(topic);
    +                Thread.sleep(Duration.seconds(1).toMilliseconds());
    --- End diff --
    
    Same comment as above wrt propagating.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-brooklyn pull request: Fix KafkaIntegrationTest tests

Posted by mikezaccardo <gi...@git.apache.org>.
Github user mikezaccardo commented on a diff in the pull request:

    https://github.com/apache/incubator-brooklyn/pull/742#discussion_r34611540
  
    --- Diff: software/messaging/src/main/java/brooklyn/entity/messaging/kafka/Kafka.java ---
    @@ -30,11 +30,11 @@
      */
     public interface Kafka {
     
    -    ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "0.7.2-incubating");
    +    ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "2.9.2-0.8.2.1");
     
         @SetFromFlag("downloadUrl")
         BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey<String>(
    -            Attributes.DOWNLOAD_URL, "http://mirror.catn.com/pub/apache/incubator/kafka/kafka-${version}/kafka-${version}-src.tgz");
    +            Attributes.DOWNLOAD_URL, "http://apache.cbox.biz/kafka/0.8.2.1/kafka_${version}.tgz");
    --- End diff --
    
    Can the hardcoded `0.8.2.1` be avoided?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-brooklyn pull request: Fix KafkaIntegrationTest tests

Posted by neykov <gi...@git.apache.org>.
Github user neykov commented on the pull request:

    https://github.com/apache/incubator-brooklyn/pull/742#issuecomment-121886628
  
    A bit late, but was this ready for merging @bostko? I am referring to the hard-coded message in `getMessage()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-brooklyn pull request: Fix KafkaIntegrationTest tests

Posted by mikezaccardo <gi...@git.apache.org>.
Github user mikezaccardo commented on a diff in the pull request:

    https://github.com/apache/incubator-brooklyn/pull/742#discussion_r34610450
  
    --- Diff: software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java ---
    @@ -128,8 +128,13 @@ public Void call() {
             KafkaSupport support = new KafkaSupport(cluster);
     
             support.sendMessage("brooklyn", "TEST_MESSAGE");
    -        String message = support.getMessage("brooklyn");
    -        assertEquals(message, "TEST_MESSAGE");
    +        try {
    +            Thread.sleep(Duration.seconds(5).toMilliseconds());
    --- End diff --
    
    Probably should let this propagate with `Time.sleep()` since it's in a test?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-brooklyn pull request: Fix KafkaIntegrationTest tests

Posted by neykov <gi...@git.apache.org>.
Github user neykov commented on a diff in the pull request:

    https://github.com/apache/incubator-brooklyn/pull/742#discussion_r34775495
  
    --- Diff: software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java ---
    @@ -54,16 +52,33 @@ public KafkaSupport(KafkaCluster cluster) {
          * Send a message to the {@link KafkaCluster} on the given topic.
          */
         public void sendMessage(String topic, String message) {
    -        ZooKeeperNode zookeeper = cluster.getZooKeeper();
    -        Properties props = new Properties();
    -        props.put("zk.connect", String.format("%s:%d", zookeeper.getAttribute(Attributes.HOSTNAME), zookeeper.getZookeeperPort()));
    -        props.put("serializer.class", "kafka.serializer.StringEncoder");
    -        ProducerConfig config = new ProducerConfig(props);
    -
    -        Producer<String, String> producer = new Producer<String, String>(config);
    -        ProducerData<String, String> data = new ProducerData<String, String>(topic, message);
    -        producer.send(data);
    -        producer.close();
    +        Optional<Entity> anyBrokerNodeInCluster = Iterables.tryFind(cluster.getCluster().getChildren(), Predicates.and(
    +                Predicates.instanceOf(KafkaBroker.class),
    +                EntityPredicates.attributeEqualTo(KafkaBroker.SERVICE_UP, true)));
    +        if (anyBrokerNodeInCluster.isPresent()) {
    +            KafkaBroker broker = (KafkaBroker)anyBrokerNodeInCluster.get();
    +
    +            Properties props = new Properties();
    +
    +            props.put("metadata.broker.list", format("%s:%d", broker.getAttribute(KafkaBroker.HOSTNAME), broker.getKafkaPort()));
    +            props.put("bootstrap.servers", format("%s:%d", broker.getAttribute(KafkaBroker.HOSTNAME), broker.getKafkaPort()));
    +            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    +            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    +
    +            Producer<String, String> producer = new KafkaProducer<>(props);
    +            try {
    +                ((KafkaZooKeeper)cluster.getZooKeeper()).createTopic(topic);
    +                Thread.sleep(Duration.seconds(1).toMilliseconds());
    --- End diff --
    
    Is the sleep needed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-brooklyn pull request: Fix KafkaIntegrationTest tests

Posted by mikezaccardo <gi...@git.apache.org>.
Github user mikezaccardo commented on a diff in the pull request:

    https://github.com/apache/incubator-brooklyn/pull/742#discussion_r34611029
  
    --- Diff: software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java ---
    @@ -55,15 +53,39 @@ public KafkaSupport(KafkaCluster cluster) {
          */
         public void sendMessage(String topic, String message) {
             ZooKeeperNode zookeeper = cluster.getZooKeeper();
    -        Properties props = new Properties();
    -        props.put("zk.connect", String.format("%s:%d", zookeeper.getAttribute(Attributes.HOSTNAME), zookeeper.getZookeeperPort()));
    -        props.put("serializer.class", "kafka.serializer.StringEncoder");
    -        ProducerConfig config = new ProducerConfig(props);
    -
    -        Producer<String, String> producer = new Producer<String, String>(config);
    -        ProducerData<String, String> data = new ProducerData<String, String>(topic, message);
    -        producer.send(data);
    -        producer.close();
    +        for(Entity e : cluster.getCluster().getChildren()) {
    --- End diff --
    
    Can this loop be omitted?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-brooklyn pull request: Fix KafkaIntegrationTest tests

Posted by mikezaccardo <gi...@git.apache.org>.
Github user mikezaccardo commented on the pull request:

    https://github.com/apache/incubator-brooklyn/pull/742#issuecomment-121364025
  
    LGTM -- KafkaIntegrationTest passes for me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-brooklyn pull request: Fix KafkaIntegrationTest tests

Posted by neykov <gi...@git.apache.org>.
Github user neykov commented on a diff in the pull request:

    https://github.com/apache/incubator-brooklyn/pull/742#discussion_r34775865
  
    --- Diff: software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java ---
    @@ -54,16 +52,33 @@ public KafkaSupport(KafkaCluster cluster) {
          * Send a message to the {@link KafkaCluster} on the given topic.
          */
         public void sendMessage(String topic, String message) {
    -        ZooKeeperNode zookeeper = cluster.getZooKeeper();
    -        Properties props = new Properties();
    -        props.put("zk.connect", String.format("%s:%d", zookeeper.getAttribute(Attributes.HOSTNAME), zookeeper.getZookeeperPort()));
    -        props.put("serializer.class", "kafka.serializer.StringEncoder");
    -        ProducerConfig config = new ProducerConfig(props);
    -
    -        Producer<String, String> producer = new Producer<String, String>(config);
    -        ProducerData<String, String> data = new ProducerData<String, String>(topic, message);
    -        producer.send(data);
    -        producer.close();
    +        Optional<Entity> anyBrokerNodeInCluster = Iterables.tryFind(cluster.getCluster().getChildren(), Predicates.and(
    +                Predicates.instanceOf(KafkaBroker.class),
    +                EntityPredicates.attributeEqualTo(KafkaBroker.SERVICE_UP, true)));
    +        if (anyBrokerNodeInCluster.isPresent()) {
    +            KafkaBroker broker = (KafkaBroker)anyBrokerNodeInCluster.get();
    +
    +            Properties props = new Properties();
    +
    +            props.put("metadata.broker.list", format("%s:%d", broker.getAttribute(KafkaBroker.HOSTNAME), broker.getKafkaPort()));
    +            props.put("bootstrap.servers", format("%s:%d", broker.getAttribute(KafkaBroker.HOSTNAME), broker.getKafkaPort()));
    +            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    +            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    +
    +            Producer<String, String> producer = new KafkaProducer<>(props);
    +            try {
    +                ((KafkaZooKeeper)cluster.getZooKeeper()).createTopic(topic);
    +                Thread.sleep(Duration.seconds(1).toMilliseconds());
    +
    +                ProducerRecord<String, String> data = new ProducerRecord<>(topic, message);
    +                producer.send(data);
    +                producer.close();
    +            } catch (InterruptedException e) {
    +                e.printStackTrace();
    --- End diff --
    
    If the sleep really needs to stay can use `Time.sleep(Duration.ONE_SECOND)` which doesn't declare `InterruptedException` so you can remove the try-catch. At the very least replace the `printStackTrace` with a call to a logger or `Excpetions.propagate`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-brooklyn pull request: Fix KafkaIntegrationTest tests

Posted by aledsage <gi...@git.apache.org>.
Github user aledsage commented on the pull request:

    https://github.com/apache/incubator-brooklyn/pull/742#issuecomment-121775966
  
    Thanks @bostko - merging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-brooklyn pull request: Fix KafkaIntegrationTest tests

Posted by neykov <gi...@git.apache.org>.
Github user neykov commented on the pull request:

    https://github.com/apache/incubator-brooklyn/pull/742#issuecomment-121925850
  
    Finished review (after the fact...). Worth addressing the bash escape in another PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-brooklyn pull request: Fix KafkaIntegrationTest tests

Posted by bostko <gi...@git.apache.org>.
Github user bostko commented on the pull request:

    https://github.com/apache/incubator-brooklyn/pull/742#issuecomment-121922947
  
    Yes it is just for the integration test, the java client doesn't implement getting message, poll() is not implemented so I hardcoded this in the integration test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-brooklyn pull request: Fix KafkaIntegrationTest tests

Posted by neykov <gi...@git.apache.org>.
Github user neykov commented on a diff in the pull request:

    https://github.com/apache/incubator-brooklyn/pull/742#discussion_r34775465
  
    --- Diff: software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java ---
    @@ -128,8 +128,8 @@ public Void call() {
             KafkaSupport support = new KafkaSupport(cluster);
     
             support.sendMessage("brooklyn", "TEST_MESSAGE");
    +        Thread.sleep(Duration.seconds(5).toMilliseconds());
    --- End diff --
    
    Is the sleep added on purpose or a leftover from testing?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-brooklyn pull request: Fix KafkaIntegrationTest tests

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-brooklyn/pull/742


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---