You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/06/26 20:06:40 UTC

[1/3] storm git commit: storm-826, update KafkaBolt to use the new kafka producer APIs

Repository: storm
Updated Branches:
  refs/heads/master 577e34e7e -> 5383ac375


storm-826, update KafkaBolt to use the new kafka producer APIs

Deal with the producer cannot close issue

Able to pass unit tests and add callback async

Update the producer api in KafkaUtilsTest.

Address comments and add test options for async and fireAndForget


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9319ab13
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9319ab13
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9319ab13

Branch: refs/heads/master
Commit: 9319ab137efa9d606e96d1fc0504502bbaec87e6
Parents: 28e64c9
Author: zhuol <zh...@yahoo-inc.com>
Authored: Wed May 27 10:48:39 2015 -0500
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Fri Jun 26 10:56:57 2015 -0500

----------------------------------------------------------------------
 external/storm-kafka/pom.xml                    | 10 ++-
 .../src/jvm/storm/kafka/bolt/KafkaBolt.java     | 77 +++++++++++++++++---
 .../src/test/storm/kafka/KafkaUtilsTest.java    | 32 +++++---
 .../test/storm/kafka/bolt/KafkaBoltTest.java    | 74 ++++++++++++++++---
 4 files changed, 157 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/9319ab13/external/storm-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka/pom.xml b/external/storm-kafka/pom.xml
index 0aba519..c731a0f 100644
--- a/external/storm-kafka/pom.xml
+++ b/external/storm-kafka/pom.xml
@@ -113,8 +113,8 @@
         </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_2.9.2</artifactId>
-            <version>0.8.1.1</version>
+            <artifactId>kafka_2.10</artifactId>
+            <version>0.8.2.1</version>
             <!-- use provided scope, so users can pull in whichever scala version they choose -->
             <scope>provided</scope>
             <exclusions>
@@ -129,6 +129,12 @@
             </exclusions>
         </dependency>
         <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>0.8.2.1</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
             <groupId>org.apache.storm</groupId>
             <artifactId>storm-core</artifactId>
             <version>${project.version}</version>

http://git-wip-us.apache.org/repos/asf/storm/blob/9319ab13/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
index 714ecd3..738b358 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
@@ -23,16 +23,18 @@ import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.base.BaseRichBolt;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.utils.TupleUtils;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.Callback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
 import storm.kafka.bolt.mapper.TupleToKafkaMapper;
 import storm.kafka.bolt.selector.DefaultTopicSelector;
 import storm.kafka.bolt.selector.KafkaTopicSelector;
-
+import java.util.concurrent.Future;
+import java.util.concurrent.ExecutionException;
 import java.util.Map;
 import java.util.Properties;
 
@@ -45,6 +47,10 @@ import java.util.Properties;
  * 'kafka.broker.properties' and 'topic'
  * <p/>
  * respectively.
+ * <p/>
+ * This bolt uses 0.8.2 Kafka Producer API.
+ * <p/>
+ * It works for sending tuples to older Kafka version (0.8.1).
  */
 public class KafkaBolt<K, V> extends BaseRichBolt {
 
@@ -53,10 +59,18 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
     public static final String TOPIC = "topic";
     public static final String KAFKA_BROKER_PROPERTIES = "kafka.broker.properties";
 
-    private Producer<K, V> producer;
+    private KafkaProducer<K, V> producer;
     private OutputCollector collector;
     private TupleToKafkaMapper<K,V> mapper;
     private KafkaTopicSelector topicSelector;
+    /** 
+     * With default setting for fireAndForget and async, the callback is called when the sending succeeds.
+     * By setting fireAndForget true, the send will not wait at all for kafka to ack.
+     * "acks" setting in 0.8.2 Producer API config doesn't matter if fireAndForget is set.
+     * By setting async false, synchronous sending is used. 
+     */
+    private boolean fireAndForget = false;
+    private boolean async = true;
 
     public KafkaBolt<K,V> withTupleToKafkaMapper(TupleToKafkaMapper<K,V> mapper) {
         this.mapper = mapper;
@@ -83,18 +97,16 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
         Map configMap = (Map) stormConf.get(KAFKA_BROKER_PROPERTIES);
         Properties properties = new Properties();
         properties.putAll(configMap);
-        ProducerConfig config = new ProducerConfig(properties);
-        producer = new Producer<K, V>(config);
+        producer = new KafkaProducer<K, V>(properties);
         this.collector = collector;
     }
 
     @Override
-    public void execute(Tuple input) {
+    public void execute(final Tuple input) {
         if (TupleUtils.isTick(input)) {
           collector.ack(input);
           return; // Do not try to send ticks to Kafka
         }
-
         K key = null;
         V message = null;
         String topic = null;
@@ -102,12 +114,40 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
             key = mapper.getKeyFromTuple(input);
             message = mapper.getMessageFromTuple(input);
             topic = topicSelector.getTopic(input);
-            if(topic != null ) {
-                producer.send(new KeyedMessage<K, V>(topic, key, message));
+            if (topic != null ) {
+                Callback callback = null;
+
+                if (!fireAndForget && async) {
+                    callback = new Callback() {
+                        @Override
+                        public void onCompletion(RecordMetadata ignored, Exception e) {
+                            synchronized (collector) {
+                                if (e != null) {
+                                    collector.reportError(e);
+                                    collector.fail(input);
+                                } else {
+                                    collector.ack(input);
+                                }
+                            }
+                        }
+                    };
+                }
+                Future<RecordMetadata> result = producer.send(new ProducerRecord<K, V>(topic, key, message), callback);
+                if (!async) {
+                    try {
+                        result.get();
+                        collector.ack(input);
+                    } catch (ExecutionException err) {
+                        collector.reportError(err);
+                        collector.fail(input);
+                    }
+                } else if (fireAndForget) {
+                    collector.ack(input);
+                }
             } else {
                 LOG.warn("skipping key = " + key + ", topic selector returned null.");
+                collector.ack(input);
             }
-            collector.ack(input);
         } catch (Exception ex) {
             collector.reportError(ex);
             collector.fail(input);
@@ -118,4 +158,17 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
 
     }
+
+    @Override
+    public void cleanup() {
+        producer.close();
+    }
+
+    public void setFireAndForget(boolean fireAndForget) {
+        this.fireAndForget = fireAndForget;
+    }
+
+    public void setAsync(boolean async) {
+        this.async = async;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/9319ab13/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
index 965eaea..a2d2af8 100644
--- a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
+++ b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
@@ -23,13 +23,14 @@ import com.google.common.collect.ImmutableMap;
 import kafka.api.OffsetRequest;
 import kafka.javaapi.consumer.SimpleConsumer;
 import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.javaapi.producer.Producer;
 import kafka.message.MessageAndOffset;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.Assert;
 import storm.kafka.trident.GlobalPartitionInformation;
 
 import java.util.List;
@@ -39,9 +40,11 @@ import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 public class KafkaUtilsTest {
 
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaUtilsTest.class);
     private KafkaTestBroker broker;
     private SimpleConsumer simpleConsumer;
     private KafkaConfig config;
@@ -182,7 +185,6 @@ public class KafkaUtilsTest {
         }
     }
 
-
     private void createTopicAndSendMessage() {
         createTopicAndSendMessage(null, "someValue");
     }
@@ -193,14 +195,22 @@ public class KafkaUtilsTest {
 
     private void createTopicAndSendMessage(String key, String value) {
         Properties p = new Properties();
-        p.setProperty("metadata.broker.list", broker.getBrokerConnectionString());
-        p.setProperty("serializer.class", "kafka.serializer.StringEncoder");
-        ProducerConfig producerConfig = new ProducerConfig(p);
-        Producer<String, String> producer = new Producer<String, String>(producerConfig);
-        producer.send(new KeyedMessage<String, String>(config.topic, key, value));
+        p.put("serializer.class", "kafka.serializer.StringEncoder");
+        p.put("bootstrap.servers", broker.getBrokerConnectionString());
+        p.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        p.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        p.put("metadata.fetch.timeout.ms", 1000);
+        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(p);
+        try {
+            producer.send(new ProducerRecord<String, String>(config.topic, key, value)).get();
+        } catch (Exception e) {
+            Assert.fail(e.getMessage());
+            LOG.error("Failed to do synchronous sending due to " + e, e);
+        } finally {
+            producer.close();
+        }
     }
 
-
     @Test
     public void assignOnePartitionPerTask() {
         runPartitionToTaskMappingTest(16, 1);

http://git-wip-us.apache.org/repos/asf/storm/blob/9319ab13/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
index 576cc12..05d138b 100644
--- a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
+++ b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
@@ -34,9 +34,7 @@ import kafka.javaapi.consumer.SimpleConsumer;
 import kafka.javaapi.message.ByteBufferMessageSet;
 import kafka.message.Message;
 import kafka.message.MessageAndOffset;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.*;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import storm.kafka.*;
@@ -77,9 +75,9 @@ public class KafkaBoltTest {
     public void shutdown() {
         simpleConsumer.close();
         broker.shutdown();
+        bolt.cleanup();
     }
 
-
     private void setupKafkaConsumer() {
         GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation();
         globalPartitionInformation.addPartition(0, Broker.fromString(broker.getBrokerConnectionString()));
@@ -110,15 +108,60 @@ public class KafkaBoltTest {
         verifyMessage(key, message);
     }
 
+    /* test synchronous sending */
+    @Test
+    public void executeWithByteArrayKeyAndMessageSync() {
+        boolean async = false;
+        boolean fireAndForget = false;
+        bolt = generateDefaultSerializerBolt(async, fireAndForget);
+        String keyString = "test-key";
+        String messageString = "test-message";
+        byte[] key = keyString.getBytes();
+        byte[] message = messageString.getBytes();
+        Tuple tuple = generateTestTuple(key, message);
+        bolt.execute(tuple);
+        verify(collector).ack(tuple);
+        verifyMessage(keyString, messageString);
+    }
+
+    /* test asynchronous sending (default) */
+    @Test
+    public void executeWithByteArrayKeyAndMessageAsync() {
+        boolean async = true;
+        boolean fireAndForget = false;
+        bolt = generateDefaultSerializerBolt(async, fireAndForget);
+        String keyString = "test-key";
+        String messageString = "test-message";
+        byte[] key = keyString.getBytes();
+        byte[] message = messageString.getBytes();
+        Tuple tuple = generateTestTuple(key, message);
+        bolt.execute(tuple);
+        try {
+            Thread.sleep(1000);                 
+        } catch (InterruptedException ex) {
+            Thread.currentThread().interrupt();
+        }
+        verify(collector).ack(tuple);
+        verifyMessage(keyString, messageString);
+    }
+
+    /* test with fireAndForget option enabled */
     @Test
-    public void executeWithByteArrayKeyAndMessage() {
-        bolt = generateDefaultSerializerBolt();
+    public void executeWithByteArrayKeyAndMessageFire() {
+        boolean async = true;
+        boolean fireAndForget = true;
+        bolt = generateDefaultSerializerBolt(async, fireAndForget);
         String keyString = "test-key";
         String messageString = "test-message";
         byte[] key = keyString.getBytes();
         byte[] message = messageString.getBytes();
         Tuple tuple = generateTestTuple(key, message);
         bolt.execute(tuple);
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException ex) {
+            Thread.currentThread().interrupt();
+        }
         verify(collector).ack(tuple);
         verifyMessage(keyString, messageString);
     }
@@ -126,21 +169,32 @@ public class KafkaBoltTest {
     private KafkaBolt generateStringSerializerBolt() {
         KafkaBolt bolt = new KafkaBolt();
         Properties props = new Properties();
-        props.put("metadata.broker.list", broker.getBrokerConnectionString());
         props.put("request.required.acks", "1");
         props.put("serializer.class", "kafka.serializer.StringEncoder");
+        props.put("bootstrap.servers", broker.getBrokerConnectionString());
+        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        props.put("metadata.fetch.timeout.ms", 1000);
         config.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
         bolt.prepare(config, null, new OutputCollector(collector));
+        bolt.setAsync(false);
         return bolt;
     }
 
-    private KafkaBolt generateDefaultSerializerBolt() {
+    private KafkaBolt generateDefaultSerializerBolt(boolean async, boolean fireAndForget) {
         KafkaBolt bolt = new KafkaBolt();
         Properties props = new Properties();
-        props.put("metadata.broker.list", broker.getBrokerConnectionString());
         props.put("request.required.acks", "1");
+        props.put("serializer.class", "kafka.serializer.StringEncoder");
+        props.put("bootstrap.servers", broker.getBrokerConnectionString());
+        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+        props.put("metadata.fetch.timeout.ms", 1000);
+        props.put("linger.ms", 0);
         config.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
         bolt.prepare(config, null, new OutputCollector(collector));
+        bolt.setAsync(async);
+        bolt.setFireAndForget(fireAndForget);
         return bolt;
     }
 
@@ -163,7 +217,6 @@ public class KafkaBoltTest {
         verify(collector).fail(tuple);
     }
 
-
     private boolean verifyMessage(String key, String message) {
         long lastMessageOffset = KafkaUtils.getOffset(simpleConsumer, kafkaConfig.topic, 0, OffsetRequest.LatestTime()) - 1;
         ByteBufferMessageSet messageAndOffsets = KafkaUtils.fetchMessages(kafkaConfig, simpleConsumer,
@@ -211,5 +264,4 @@ public class KafkaBoltTest {
         assertTrue(TupleUtils.isTick(tuple));
         return tuple;
     }
-
 }


[2/3] storm git commit: Merge branch 'storm826' of https://github.com/zhuoliu/storm into STORM-826

Posted by bo...@apache.org.
Merge branch 'storm826' of https://github.com/zhuoliu/storm into STORM-826

STORM-826: Update KafkaBolt to use the new kafka producer API


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/081af1cf
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/081af1cf
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/081af1cf

Branch: refs/heads/master
Commit: 081af1cf18b397af5e77c8c83400c8c7dffda1d0
Parents: 577e34e 9319ab1
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Jun 26 12:58:01 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Fri Jun 26 12:58:01 2015 -0500

----------------------------------------------------------------------
 external/storm-kafka/pom.xml                    | 10 ++-
 .../src/jvm/storm/kafka/bolt/KafkaBolt.java     | 77 +++++++++++++++++---
 .../src/test/storm/kafka/KafkaUtilsTest.java    | 32 +++++---
 .../test/storm/kafka/bolt/KafkaBoltTest.java    | 74 ++++++++++++++++---
 4 files changed, 157 insertions(+), 36 deletions(-)
----------------------------------------------------------------------



[3/3] storm git commit: Added STORM-862 to Changelog

Posted by bo...@apache.org.
Added STORM-862 to Changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5383ac37
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5383ac37
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5383ac37

Branch: refs/heads/master
Commit: 5383ac375cb2955e3247d485e46f1f58bff62810
Parents: 081af1c
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Jun 26 13:05:57 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Fri Jun 26 13:05:57 2015 -0500

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/5383ac37/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 09995a5..94b9ce6 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -6,6 +6,7 @@
  * STORM-864: Exclude storm-kafka tests from Travis CI build
  * STORM-860: UI: while topology is transitioned to killed, "Activate" button is enabled but not functioning
  * STORM-477: Add warning for invalid JAVA_HOME
+ * STORM-826: Update KafkaBolt to use the new kafka producer API
 
 ## 0.10.0-beta
  * STORM-867: fix bug with mk-ssl-connector