You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2015/11/23 12:06:10 UTC
[jira] [Commented] (STORM-826) As a storm developer I’d like to use the new kafka producer API to reduce dependencies and use long term supported kafka apis
[ https://issues.apache.org/jira/browse/STORM-826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15021957#comment-15021957 ]
ASF GitHub Bot commented on STORM-826:
--------------------------------------
Github user abhishekagarwal87 commented on a diff in the pull request:
https://github.com/apache/storm/pull/572#discussion_r45591441
--- Diff: external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java ---
@@ -102,12 +114,40 @@ public void execute(Tuple input) {
key = mapper.getKeyFromTuple(input);
message = mapper.getMessageFromTuple(input);
topic = topicSelector.getTopic(input);
- if(topic != null ) {
- producer.send(new KeyedMessage<K, V>(topic, key, message));
+ if (topic != null ) {
+ Callback callback = null;
+
+ if (!fireAndForget && async) {
+ callback = new Callback() {
+ @Override
+ public void onCompletion(RecordMetadata ignored, Exception e) {
+ synchronized (collector) {
+ if (e != null) {
+ collector.reportError(e);
+ collector.fail(input);
+ } else {
+ collector.ack(input);
+ }
+ }
+ }
+ };
+ }
+ Future<RecordMetadata> result = producer.send(new ProducerRecord<K, V>(topic, key, message), callback);
+ if (!async) {
+ try {
+ result.get();
+ collector.ack(input);
--- End diff --
does this call take place in the same executor thread?
> As a storm developer I’d like to use the new kafka producer API to reduce dependencies and use long term supported kafka apis
> ------------------------------------------------------------------------------------------------------------------------------
>
> Key: STORM-826
> URL: https://issues.apache.org/jira/browse/STORM-826
> Project: Apache Storm
> Issue Type: Story
> Components: storm-kafka
> Reporter: Thomas Becker
> Assignee: Zhuo Liu
> Fix For: 0.11.0
>
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)