You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/11/07 03:21:21 UTC
[5/8] storm git commit: Check exceptions for response
Check exceptions for response
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/faa33f92
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/faa33f92
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/faa33f92
Branch: refs/heads/master
Commit: faa33f922aa7deac0622de2d4d48c86d0cc61875
Parents: 2e1bedc
Author: Xin Wang <be...@163.com>
Authored: Thu Nov 5 10:57:56 2015 +0800
Committer: Xin Wang <be...@163.com>
Committed: Thu Nov 5 10:57:56 2015 +0800
----------------------------------------------------------------------
.../jvm/storm/kafka/trident/TridentKafkaState.java | 15 +++++++++++++--
1 file changed, 13 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/faa33f92/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
index 05a862c..1ed61b1 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
@@ -22,6 +22,7 @@ import backtype.storm.topology.FailedException;
import org.apache.commons.lang.Validate;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
@@ -33,6 +34,8 @@ import storm.trident.tuple.TridentTuple;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
public class TridentKafkaState implements State {
private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaState.class);
@@ -81,8 +84,16 @@ public class TridentKafkaState implements State {
topic = topicSelector.getTopic(tuple);
if(topic != null) {
- producer.send(new ProducerRecord(topic, mapper.getKeyFromTuple(tuple),
- mapper.getMessageFromTuple(tuple))).get();
+ Future<RecordMetadata> result = producer.send(new ProducerRecord(topic,
+ mapper.getKeyFromTuple(tuple), mapper.getMessageFromTuple(tuple)));
+ try {
+ result.get();
+ } catch (ExecutionException e) {
+ String errorMsg = "Could not retrieve result for message with key = "
+ + mapper.getKeyFromTuple(tuple) + " from topic = " + topic;
+ LOG.error(errorMsg, e);
+ throw new FailedException(errorMsg, e);
+ }
} else {
LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) + ", topic selector returned null.");
}