You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/11/07 03:21:17 UTC
[1/8] storm git commit: use new kafka producer api
Repository: storm
Updated Branches:
refs/heads/master 14c7defd5 -> c12e28c82
use new kafka producer api
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/df417e0a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/df417e0a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/df417e0a
Branch: refs/heads/master
Commit: df417e0aa1439c0619e905ee0e624aa0a9717f1e
Parents: 4523e54
Author: Xin Wang <be...@163.com>
Authored: Fri Sep 18 13:58:06 2015 +0800
Committer: Xin Wang <be...@163.com>
Committed: Fri Sep 18 13:58:06 2015 +0800
----------------------------------------------------------------------
.../storm/kafka/trident/TridentKafkaState.java | 45 ++++++++++----------
1 file changed, 23 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/df417e0a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
index 402ffb1..098805c 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
@@ -19,10 +19,11 @@ package storm.kafka.trident;
import backtype.storm.task.OutputCollector;
import backtype.storm.topology.FailedException;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
import org.apache.commons.lang.Validate;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
@@ -40,7 +41,7 @@ public class TridentKafkaState implements State {
public static final String KAFKA_BROKER_PROPERTIES = "kafka.broker.properties";
- private Producer producer;
+ private KafkaProducer producer;
private OutputCollector collector;
private TridentTupleToKafkaMapper mapper;
@@ -72,27 +73,27 @@ public class TridentKafkaState implements State {
Map configMap = (Map) stormConf.get(KAFKA_BROKER_PROPERTIES);
Properties properties = new Properties();
properties.putAll(configMap);
- ProducerConfig config = new ProducerConfig(properties);
- producer = new Producer(config);
+ producer = new KafkaProducer(properties);
}
public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
- String topic = null;
- for (TridentTuple tuple : tuples) {
- try {
- topic = topicSelector.getTopic(tuple);
-
- if(topic != null) {
- producer.send(new KeyedMessage(topic, mapper.getKeyFromTuple(tuple),
- mapper.getMessageFromTuple(tuple)));
- } else {
- LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) + ", topic selector returned null.");
- }
- } catch (Exception ex) {
- String errorMsg = "Could not send message with key = " + mapper.getKeyFromTuple(tuple)
- + " to topic = " + topic;
- LOG.warn(errorMsg, ex);
- throw new FailedException(errorMsg, ex);
+ for (final TridentTuple tuple : tuples) {
+ final String topic = topicSelector.getTopic(tuple);
+ if(topic != null) {
+ producer.send(new ProducerRecord(topic, mapper.getKeyFromTuple(tuple),
+ mapper.getMessageFromTuple(tuple)),new Callback() {
+ @Override
+ public void onCompletion(RecordMetadata metadata, Exception ex) {
+ if(ex != null){
+ String errorMsg = "Could not send message with key = "
+ + mapper.getKeyFromTuple(tuple) + " to topic = " + topic;
+ LOG.warn(errorMsg, ex);
+ throw new FailedException(errorMsg, ex);
+ }
+ }
+ });
+ } else {
+ LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) + ", topic selector returned null.");
}
}
}
[7/8] storm git commit: Merge branch 'patch-6' of
https://github.com/vesense/storm into STORM-1052
Posted by sr...@apache.org.
Merge branch 'patch-6' of https://github.com/vesense/storm into STORM-1052
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/90ef005c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/90ef005c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/90ef005c
Branch: refs/heads/master
Commit: 90ef005c11dc32caaa1549ad73a4b7c9195a917b
Parents: 14c7def 58f0994
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Fri Nov 6 18:10:29 2015 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Fri Nov 6 18:10:29 2015 -0800
----------------------------------------------------------------------
.../storm/kafka/trident/TridentKafkaState.java | 25 +++++++++++++-------
.../src/test/storm/kafka/TestUtils.java | 7 +++---
.../test/storm/kafka/TridentKafkaTopology.java | 8 +++----
3 files changed, 25 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/90ef005c/external/storm-kafka/src/test/storm/kafka/TestUtils.java
----------------------------------------------------------------------
[5/8] storm git commit: Check exceptions for response
Posted by sr...@apache.org.
Check exceptions for response
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/faa33f92
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/faa33f92
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/faa33f92
Branch: refs/heads/master
Commit: faa33f922aa7deac0622de2d4d48c86d0cc61875
Parents: 2e1bedc
Author: Xin Wang <be...@163.com>
Authored: Thu Nov 5 10:57:56 2015 +0800
Committer: Xin Wang <be...@163.com>
Committed: Thu Nov 5 10:57:56 2015 +0800
----------------------------------------------------------------------
.../jvm/storm/kafka/trident/TridentKafkaState.java | 15 +++++++++++++--
1 file changed, 13 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/faa33f92/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
index 05a862c..1ed61b1 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
@@ -22,6 +22,7 @@ import backtype.storm.topology.FailedException;
import org.apache.commons.lang.Validate;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
@@ -33,6 +34,8 @@ import storm.trident.tuple.TridentTuple;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
public class TridentKafkaState implements State {
private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaState.class);
@@ -81,8 +84,16 @@ public class TridentKafkaState implements State {
topic = topicSelector.getTopic(tuple);
if(topic != null) {
- producer.send(new ProducerRecord(topic, mapper.getKeyFromTuple(tuple),
- mapper.getMessageFromTuple(tuple))).get();
+ Future<RecordMetadata> result = producer.send(new ProducerRecord(topic,
+ mapper.getKeyFromTuple(tuple), mapper.getMessageFromTuple(tuple)));
+ try {
+ result.get();
+ } catch (ExecutionException e) {
+ String errorMsg = "Could not retrieve result for message with key = "
+ + mapper.getKeyFromTuple(tuple) + " from topic = " + topic;
+ LOG.error(errorMsg, e);
+ throw new FailedException(errorMsg, e);
+ }
} else {
LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) + ", topic selector returned null.");
}
[6/8] storm git commit: update TestUtils kafka configs
Posted by sr...@apache.org.
update TestUtils kafka configs
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/58f09942
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/58f09942
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/58f09942
Branch: refs/heads/master
Commit: 58f099421eeed8286d1e7729dad45a60de38f0c8
Parents: faa33f9
Author: Xin Wang <be...@163.com>
Authored: Thu Nov 5 11:27:30 2015 +0800
Committer: Xin Wang <be...@163.com>
Committed: Thu Nov 5 11:27:30 2015 +0800
----------------------------------------------------------------------
external/storm-kafka/src/test/storm/kafka/TestUtils.java | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/58f09942/external/storm-kafka/src/test/storm/kafka/TestUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/TestUtils.java b/external/storm-kafka/src/test/storm/kafka/TestUtils.java
index fa29992..1ec5d71 100644
--- a/external/storm-kafka/src/test/storm/kafka/TestUtils.java
+++ b/external/storm-kafka/src/test/storm/kafka/TestUtils.java
@@ -71,9 +71,10 @@ public class TestUtils {
public static Config getConfig(String brokerConnectionString) {
Config config = new Config();
Properties props = new Properties();
- props.put("metadata.broker.list", brokerConnectionString);
- props.put("request.required.acks", "1");
- props.put("serializer.class", "kafka.serializer.StringEncoder");
+ props.put("bootstrap.servers", brokerConnectionString);
+ props.put("acks", "1");
+ props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
config.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
config.put(KafkaBolt.TOPIC, TOPIC);
[4/8] storm git commit: fix travis-ci build error
Posted by sr...@apache.org.
fix travis-ci build error
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2e1bedcf
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2e1bedcf
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2e1bedcf
Branch: refs/heads/master
Commit: 2e1bedcf584b9fa46796567bc6ccaa0068b92cfd
Parents: 88f71b8
Author: Xin Wang <be...@163.com>
Authored: Thu Oct 29 13:26:10 2015 +0800
Committer: Xin Wang <be...@163.com>
Committed: Thu Oct 29 13:26:10 2015 +0800
----------------------------------------------------------------------
.../storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java | 1 -
1 file changed, 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/2e1bedcf/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
index 7f70ce4..05a862c 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
@@ -94,5 +94,4 @@ public class TridentKafkaState implements State {
}
}
}
- }
}
[3/8] storm git commit: use sync send method
Posted by sr...@apache.org.
use sync send method
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/88f71b85
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/88f71b85
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/88f71b85
Branch: refs/heads/master
Commit: 88f71b858e2589abbd477f560bfbb706225ea82c
Parents: 5c38b91
Author: Xin Wang <be...@163.com>
Authored: Thu Oct 29 13:09:20 2015 +0800
Committer: Xin Wang <be...@163.com>
Committed: Thu Oct 29 13:09:20 2015 +0800
----------------------------------------------------------------------
.../storm/kafka/trident/TridentKafkaState.java | 36 +++++++++-----------
1 file changed, 17 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/88f71b85/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
index 098805c..7f70ce4 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
@@ -20,10 +20,8 @@ package storm.kafka.trident;
import backtype.storm.task.OutputCollector;
import backtype.storm.topology.FailedException;
import org.apache.commons.lang.Validate;
-import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
@@ -77,24 +75,24 @@ public class TridentKafkaState implements State {
}
public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
- for (final TridentTuple tuple : tuples) {
- final String topic = topicSelector.getTopic(tuple);
- if(topic != null) {
- producer.send(new ProducerRecord(topic, mapper.getKeyFromTuple(tuple),
- mapper.getMessageFromTuple(tuple)),new Callback() {
- @Override
- public void onCompletion(RecordMetadata metadata, Exception ex) {
- if(ex != null){
- String errorMsg = "Could not send message with key = "
- + mapper.getKeyFromTuple(tuple) + " to topic = " + topic;
- LOG.warn(errorMsg, ex);
- throw new FailedException(errorMsg, ex);
- }
- }
- });
- } else {
- LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) + ", topic selector returned null.");
+ String topic = null;
+ for (TridentTuple tuple : tuples) {
+ try {
+ topic = topicSelector.getTopic(tuple);
+
+ if(topic != null) {
+ producer.send(new ProducerRecord(topic, mapper.getKeyFromTuple(tuple),
+ mapper.getMessageFromTuple(tuple))).get();
+ } else {
+ LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) + ", topic selector returned null.");
+ }
+ } catch (Exception ex) {
+ String errorMsg = "Could not send message with key = " + mapper.getKeyFromTuple(tuple)
+ + " to topic = " + topic;
+ LOG.warn(errorMsg, ex);
+ throw new FailedException(errorMsg, ex);
}
}
}
+ }
}
[2/8] storm git commit: use new kafka producer configs
Posted by sr...@apache.org.
use new kafka producer configs
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5c38b91a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5c38b91a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5c38b91a
Branch: refs/heads/master
Commit: 5c38b91accb9b9f5adfeea412bac0e457508f492
Parents: df417e0
Author: Xin Wang <be...@163.com>
Authored: Fri Sep 18 14:05:21 2015 +0800
Committer: Xin Wang <be...@163.com>
Committed: Fri Sep 18 14:05:21 2015 +0800
----------------------------------------------------------------------
.../src/test/storm/kafka/TridentKafkaTopology.java | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/5c38b91a/external/storm-kafka/src/test/storm/kafka/TridentKafkaTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/TridentKafkaTopology.java b/external/storm-kafka/src/test/storm/kafka/TridentKafkaTopology.java
index 9cb7bbf..cade6df 100644
--- a/external/storm-kafka/src/test/storm/kafka/TridentKafkaTopology.java
+++ b/external/storm-kafka/src/test/storm/kafka/TridentKafkaTopology.java
@@ -88,11 +88,11 @@ public class TridentKafkaTopology {
private static Config getConfig(String brokerConnectionString) {
Config conf = new Config();
- Map config = new HashMap();
Properties props = new Properties();
- props.put("metadata.broker.list", brokerConnectionString);
- props.put("request.required.acks", "1");
- props.put("serializer.class", "kafka.serializer.StringEncoder");
+ props.put("bootstrap.servers", brokerConnectionString);
+ props.put("acks", "1");
+ props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
conf.put(TridentKafkaState.KAFKA_BROKER_PROPERTIES, props);
return conf;
}
[8/8] storm git commit: Added STORM-1052 to CHANGELOG.
Posted by sr...@apache.org.
Added STORM-1052 to CHANGELOG.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c12e28c8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c12e28c8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c12e28c8
Branch: refs/heads/master
Commit: c12e28c829fcfabc0a3a775fb9714968b7e3e349
Parents: 90ef005
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Fri Nov 6 18:20:40 2015 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Fri Nov 6 18:20:40 2015 -0800
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/c12e28c8/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index a8cc1bf..e8c3de8 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 0.11.0
+ * STORM-1052: TridentKafkaState uses new Kafka Producer API.
* STORM-1182: Removing and wrapping some exceptions in ConfigValidation to make code cleaner
* STORM-1134. Windows: Fix log4j config.
* STORM-1127: allow for boolean arguments (Flux)