You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/11/07 03:21:21 UTC

[5/8] storm git commit: Check exceptions for response

Check exceptions for response

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/faa33f92
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/faa33f92
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/faa33f92

Branch: refs/heads/master
Commit: faa33f922aa7deac0622de2d4d48c86d0cc61875
Parents: 2e1bedc
Author: Xin Wang <be...@163.com>
Authored: Thu Nov 5 10:57:56 2015 +0800
Committer: Xin Wang <be...@163.com>
Committed: Thu Nov 5 10:57:56 2015 +0800

----------------------------------------------------------------------
 .../jvm/storm/kafka/trident/TridentKafkaState.java   | 15 +++++++++++++--
 1 file changed, 13 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/faa33f92/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
index 05a862c..1ed61b1 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
@@ -22,6 +22,7 @@ import backtype.storm.topology.FailedException;
 import org.apache.commons.lang.Validate;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
@@ -33,6 +34,8 @@ import storm.trident.tuple.TridentTuple;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 public class TridentKafkaState implements State {
     private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaState.class);
@@ -81,8 +84,16 @@ public class TridentKafkaState implements State {
                 topic = topicSelector.getTopic(tuple);
 
                 if(topic != null) {
-                    producer.send(new ProducerRecord(topic, mapper.getKeyFromTuple(tuple),
-                            mapper.getMessageFromTuple(tuple))).get();
+                    Future<RecordMetadata> result = producer.send(new ProducerRecord(topic,
+                            mapper.getKeyFromTuple(tuple), mapper.getMessageFromTuple(tuple)));
+                    try {
+                        result.get();
+                    } catch (ExecutionException e) {
+                        String errorMsg = "Could not retrieve result for message with key = "
+                                + mapper.getKeyFromTuple(tuple) + " from topic = " + topic;
+                        LOG.error(errorMsg, e);
+                        throw new FailedException(errorMsg, e);
+                    }
                 } else {
                     LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) + ", topic selector returned null.");
                 }