You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/11/07 03:21:17 UTC

[1/8] storm git commit: use new kafka producer api

Repository: storm
Updated Branches:
  refs/heads/master 14c7defd5 -> c12e28c82


use new kafka producer api

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/df417e0a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/df417e0a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/df417e0a

Branch: refs/heads/master
Commit: df417e0aa1439c0619e905ee0e624aa0a9717f1e
Parents: 4523e54
Author: Xin Wang <be...@163.com>
Authored: Fri Sep 18 13:58:06 2015 +0800
Committer: Xin Wang <be...@163.com>
Committed: Fri Sep 18 13:58:06 2015 +0800

----------------------------------------------------------------------
 .../storm/kafka/trident/TridentKafkaState.java  | 45 ++++++++++----------
 1 file changed, 23 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/df417e0a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
index 402ffb1..098805c 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
@@ -19,10 +19,11 @@ package storm.kafka.trident;
 
 import backtype.storm.task.OutputCollector;
 import backtype.storm.topology.FailedException;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
 import org.apache.commons.lang.Validate;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
@@ -40,7 +41,7 @@ public class TridentKafkaState implements State {
 
     public static final String KAFKA_BROKER_PROPERTIES = "kafka.broker.properties";
 
-    private Producer producer;
+    private KafkaProducer producer;
     private OutputCollector collector;
 
     private TridentTupleToKafkaMapper mapper;
@@ -72,27 +73,27 @@ public class TridentKafkaState implements State {
         Map configMap = (Map) stormConf.get(KAFKA_BROKER_PROPERTIES);
         Properties properties = new Properties();
         properties.putAll(configMap);
-        ProducerConfig config = new ProducerConfig(properties);
-        producer = new Producer(config);
+        producer = new KafkaProducer(properties);
     }
 
     public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
-        String topic = null;
-        for (TridentTuple tuple : tuples) {
-            try {
-                topic = topicSelector.getTopic(tuple);
-
-                if(topic != null) {
-                    producer.send(new KeyedMessage(topic, mapper.getKeyFromTuple(tuple),
-                            mapper.getMessageFromTuple(tuple)));
-                } else {
-                    LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) + ", topic selector returned null.");
-                }
-            } catch (Exception ex) {
-                String errorMsg = "Could not send message with key = " + mapper.getKeyFromTuple(tuple)
-                        + " to topic = " + topic;
-                LOG.warn(errorMsg, ex);
-                throw new FailedException(errorMsg, ex);
+        for (final TridentTuple tuple : tuples) {
+            final String topic = topicSelector.getTopic(tuple);
+            if(topic != null) {
+                producer.send(new ProducerRecord(topic, mapper.getKeyFromTuple(tuple),
+                        mapper.getMessageFromTuple(tuple)),new Callback() {
+                            @Override
+                            public void onCompletion(RecordMetadata metadata, Exception ex) {
+                                if(ex != null){
+                                    String errorMsg = "Could not send message with key = "
+                                    + mapper.getKeyFromTuple(tuple) + " to topic = " + topic;
+                                    LOG.warn(errorMsg, ex);
+                                    throw new FailedException(errorMsg, ex);
+                                }
+                            }
+                        });
+            } else {
+                LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) + ", topic selector returned null.");
             }
         }
     }


[7/8] storm git commit: Merge branch 'patch-6' of https://github.com/vesense/storm into STORM-1052

Posted by sr...@apache.org.
Merge branch 'patch-6' of https://github.com/vesense/storm into STORM-1052


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/90ef005c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/90ef005c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/90ef005c

Branch: refs/heads/master
Commit: 90ef005c11dc32caaa1549ad73a4b7c9195a917b
Parents: 14c7def 58f0994
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Fri Nov 6 18:10:29 2015 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Fri Nov 6 18:10:29 2015 -0800

----------------------------------------------------------------------
 .../storm/kafka/trident/TridentKafkaState.java  | 25 +++++++++++++-------
 .../src/test/storm/kafka/TestUtils.java         |  7 +++---
 .../test/storm/kafka/TridentKafkaTopology.java  |  8 +++----
 3 files changed, 25 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/90ef005c/external/storm-kafka/src/test/storm/kafka/TestUtils.java
----------------------------------------------------------------------


[5/8] storm git commit: Check exceptions for response

Posted by sr...@apache.org.
Check exceptions for response

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/faa33f92
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/faa33f92
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/faa33f92

Branch: refs/heads/master
Commit: faa33f922aa7deac0622de2d4d48c86d0cc61875
Parents: 2e1bedc
Author: Xin Wang <be...@163.com>
Authored: Thu Nov 5 10:57:56 2015 +0800
Committer: Xin Wang <be...@163.com>
Committed: Thu Nov 5 10:57:56 2015 +0800

----------------------------------------------------------------------
 .../jvm/storm/kafka/trident/TridentKafkaState.java   | 15 +++++++++++++--
 1 file changed, 13 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/faa33f92/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
index 05a862c..1ed61b1 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
@@ -22,6 +22,7 @@ import backtype.storm.topology.FailedException;
 import org.apache.commons.lang.Validate;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
@@ -33,6 +34,8 @@ import storm.trident.tuple.TridentTuple;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 public class TridentKafkaState implements State {
     private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaState.class);
@@ -81,8 +84,16 @@ public class TridentKafkaState implements State {
                 topic = topicSelector.getTopic(tuple);
 
                 if(topic != null) {
-                    producer.send(new ProducerRecord(topic, mapper.getKeyFromTuple(tuple),
-                            mapper.getMessageFromTuple(tuple))).get();
+                    Future<RecordMetadata> result = producer.send(new ProducerRecord(topic,
+                            mapper.getKeyFromTuple(tuple), mapper.getMessageFromTuple(tuple)));
+                    try {
+                        result.get();
+                    } catch (ExecutionException e) {
+                        String errorMsg = "Could not retrieve result for message with key = "
+                                + mapper.getKeyFromTuple(tuple) + " from topic = " + topic;
+                        LOG.error(errorMsg, e);
+                        throw new FailedException(errorMsg, e);
+                    }
                 } else {
                     LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) + ", topic selector returned null.");
                 }


[6/8] storm git commit: update TestUtils kafka configs

Posted by sr...@apache.org.
update TestUtils kafka configs

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/58f09942
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/58f09942
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/58f09942

Branch: refs/heads/master
Commit: 58f099421eeed8286d1e7729dad45a60de38f0c8
Parents: faa33f9
Author: Xin Wang <be...@163.com>
Authored: Thu Nov 5 11:27:30 2015 +0800
Committer: Xin Wang <be...@163.com>
Committed: Thu Nov 5 11:27:30 2015 +0800

----------------------------------------------------------------------
 external/storm-kafka/src/test/storm/kafka/TestUtils.java | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/58f09942/external/storm-kafka/src/test/storm/kafka/TestUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/TestUtils.java b/external/storm-kafka/src/test/storm/kafka/TestUtils.java
index fa29992..1ec5d71 100644
--- a/external/storm-kafka/src/test/storm/kafka/TestUtils.java
+++ b/external/storm-kafka/src/test/storm/kafka/TestUtils.java
@@ -71,9 +71,10 @@ public class TestUtils {
     public static Config getConfig(String brokerConnectionString) {
         Config config = new Config();
         Properties props = new Properties();
-        props.put("metadata.broker.list", brokerConnectionString);
-        props.put("request.required.acks", "1");
-        props.put("serializer.class", "kafka.serializer.StringEncoder");
+        props.put("bootstrap.servers", brokerConnectionString);
+        props.put("acks", "1");
+        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         config.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
         config.put(KafkaBolt.TOPIC, TOPIC);
 


[4/8] storm git commit: fix travis-ci build error

Posted by sr...@apache.org.
fix travis-ci build error

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2e1bedcf
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2e1bedcf
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2e1bedcf

Branch: refs/heads/master
Commit: 2e1bedcf584b9fa46796567bc6ccaa0068b92cfd
Parents: 88f71b8
Author: Xin Wang <be...@163.com>
Authored: Thu Oct 29 13:26:10 2015 +0800
Committer: Xin Wang <be...@163.com>
Committed: Thu Oct 29 13:26:10 2015 +0800

----------------------------------------------------------------------
 .../storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java  | 1 -
 1 file changed, 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/2e1bedcf/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
index 7f70ce4..05a862c 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
@@ -94,5 +94,4 @@ public class TridentKafkaState implements State {
             }
         }
     }
-    }
 }


[3/8] storm git commit: use sync send method

Posted by sr...@apache.org.
use sync send method

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/88f71b85
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/88f71b85
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/88f71b85

Branch: refs/heads/master
Commit: 88f71b858e2589abbd477f560bfbb706225ea82c
Parents: 5c38b91
Author: Xin Wang <be...@163.com>
Authored: Thu Oct 29 13:09:20 2015 +0800
Committer: Xin Wang <be...@163.com>
Committed: Thu Oct 29 13:09:20 2015 +0800

----------------------------------------------------------------------
 .../storm/kafka/trident/TridentKafkaState.java  | 36 +++++++++-----------
 1 file changed, 17 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/88f71b85/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
index 098805c..7f70ce4 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
@@ -20,10 +20,8 @@ package storm.kafka.trident;
 import backtype.storm.task.OutputCollector;
 import backtype.storm.topology.FailedException;
 import org.apache.commons.lang.Validate;
-import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
@@ -77,24 +75,24 @@ public class TridentKafkaState implements State {
     }
 
     public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
-        for (final TridentTuple tuple : tuples) {
-            final String topic = topicSelector.getTopic(tuple);
-            if(topic != null) {
-                producer.send(new ProducerRecord(topic, mapper.getKeyFromTuple(tuple),
-                        mapper.getMessageFromTuple(tuple)),new Callback() {
-                            @Override
-                            public void onCompletion(RecordMetadata metadata, Exception ex) {
-                                if(ex != null){
-                                    String errorMsg = "Could not send message with key = "
-                                    + mapper.getKeyFromTuple(tuple) + " to topic = " + topic;
-                                    LOG.warn(errorMsg, ex);
-                                    throw new FailedException(errorMsg, ex);
-                                }
-                            }
-                        });
-            } else {
-                LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) + ", topic selector returned null.");
+        String topic = null;
+        for (TridentTuple tuple : tuples) {
+            try {
+                topic = topicSelector.getTopic(tuple);
+
+                if(topic != null) {
+                    producer.send(new ProducerRecord(topic, mapper.getKeyFromTuple(tuple),
+                            mapper.getMessageFromTuple(tuple))).get();
+                } else {
+                    LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) + ", topic selector returned null.");
+                }
+            } catch (Exception ex) {
+                String errorMsg = "Could not send message with key = " + mapper.getKeyFromTuple(tuple)
+                        + " to topic = " + topic;
+                LOG.warn(errorMsg, ex);
+                throw new FailedException(errorMsg, ex);
             }
         }
     }
+    }
 }


[2/8] storm git commit: use new kafka producer configs

Posted by sr...@apache.org.
use new kafka producer configs

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5c38b91a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5c38b91a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5c38b91a

Branch: refs/heads/master
Commit: 5c38b91accb9b9f5adfeea412bac0e457508f492
Parents: df417e0
Author: Xin Wang <be...@163.com>
Authored: Fri Sep 18 14:05:21 2015 +0800
Committer: Xin Wang <be...@163.com>
Committed: Fri Sep 18 14:05:21 2015 +0800

----------------------------------------------------------------------
 .../src/test/storm/kafka/TridentKafkaTopology.java           | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/5c38b91a/external/storm-kafka/src/test/storm/kafka/TridentKafkaTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/TridentKafkaTopology.java b/external/storm-kafka/src/test/storm/kafka/TridentKafkaTopology.java
index 9cb7bbf..cade6df 100644
--- a/external/storm-kafka/src/test/storm/kafka/TridentKafkaTopology.java
+++ b/external/storm-kafka/src/test/storm/kafka/TridentKafkaTopology.java
@@ -88,11 +88,11 @@ public class TridentKafkaTopology {
 
     private  static Config getConfig(String brokerConnectionString) {
         Config conf = new Config();
-        Map config = new HashMap();
         Properties props = new Properties();
-        props.put("metadata.broker.list", brokerConnectionString);
-        props.put("request.required.acks", "1");
-        props.put("serializer.class", "kafka.serializer.StringEncoder");
+        props.put("bootstrap.servers", brokerConnectionString);
+        props.put("acks", "1");
+        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         conf.put(TridentKafkaState.KAFKA_BROKER_PROPERTIES, props);
         return conf;
     }


[8/8] storm git commit: Added STORM-1052 to CHANGELOG.

Posted by sr...@apache.org.
Added STORM-1052 to CHANGELOG.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c12e28c8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c12e28c8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c12e28c8

Branch: refs/heads/master
Commit: c12e28c829fcfabc0a3a775fb9714968b7e3e349
Parents: 90ef005
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Fri Nov 6 18:20:40 2015 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Fri Nov 6 18:20:40 2015 -0800

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c12e28c8/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index a8cc1bf..e8c3de8 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.11.0
+ * STORM-1052: TridentKafkaState uses new Kafka Producer API.
  * STORM-1182: Removing and wrapping some exceptions in ConfigValidation to make code cleaner
  * STORM-1134. Windows: Fix log4j config.
  * STORM-1127: allow for boolean arguments (Flux)