You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@storm.apache.org by "Gergo Hong (JIRA)" <ji...@apache.org> on 2018/01/28 09:23:00 UTC
[jira] [Created] (STORM-2915) How could I to get the fail Numer
in Bolt When I use Kafka Spout
Gergo Hong created STORM-2915:
---------------------------------
Summary: How could I to get the fail Numer in Bolt When I use Kafka Spout
Key: STORM-2915
URL: https://issues.apache.org/jira/browse/STORM-2915
Project: Apache Storm
Issue Type: New Feature
Components: storm-kafka-client
Affects Versions: 1.0.5, 1.1.1, 1.0.4, 1.0.3, 1.1.0, 1.0.2
Reporter: Gergo Hong
I want to get fail num in bolt , how could I to get it?
if fail it retry, I see This
if (!isScheduled || retryService.isReady(msgId)) {
final String stream = tuple instanceof KafkaTuple ? ((KafkaTuple) tuple).getStream() : Utils.DEFAULT_STREAM_ID;
if (!isAtLeastOnceProcessing()) {
if (kafkaSpoutConfig.isTupleTrackingEnforced()) {
collector.emit(stream, tuple, msgId);
LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, record, msgId);
} else {
collector.emit(stream, tuple);
LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
}
} else {
emitted.add(msgId);
offsetManagers.get(tp).addToEmitMsgs(msgId.offset());
if (isScheduled) { // Was scheduled for retry and re-emitted, so remove from schedule.
retryService.remove(msgId);
}
collector.emit(stream, tuple, msgId);
tupleListener.onEmit(tuple, msgId);
LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, record, msgId);
}
return true;
}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)