You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@storm.apache.org by "Stig Rohde Døssing (JIRA)" <ji...@apache.org> on 2018/04/27 20:37:00 UTC

[jira] [Commented] (STORM-2915) How could I to get the fail Number in Bolt When I use Kafka Spout

    [ https://issues.apache.org/jira/browse/STORM-2915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16457014#comment-16457014 ] 

Stig Rohde Døssing commented on STORM-2915:
-------------------------------------------

[~Gergo Hong] Sorry, this was missed. Could you try to elaborate what you're asking? Are you asking about getting the failure count for a specific tuple (e.g. "tuple 1 has failed 3 times") from a bolt, or does your bolt want to know the total failure count for the spout?

> How could I to get the fail Number   in Bolt When I use  Kafka Spout
> --------------------------------------------------------------------
>
>                 Key: STORM-2915
>                 URL: https://issues.apache.org/jira/browse/STORM-2915
>             Project: Apache Storm
>          Issue Type: New Feature
>          Components: storm-kafka-client
>    Affects Versions: 1.0.2, 1.1.0, 1.0.3, 1.0.4, 1.1.1, 1.0.5
>            Reporter: Gergo Hong
>            Priority: Minor
>
> I want to get fail num in bolt , how could  I  to get it? 
> if  fail it  retry, I see This 
> if (!isScheduled || retryService.isReady(msgId)) {
>  final String stream = tuple instanceof KafkaTuple ? ((KafkaTuple) tuple).getStream() : Utils.DEFAULT_STREAM_ID;
>  if (!isAtLeastOnceProcessing()) {
>  if (kafkaSpoutConfig.isTupleTrackingEnforced()) {
>  collector.emit(stream, tuple, msgId);
>  LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, record, msgId);
>  } else {
>  collector.emit(stream, tuple);
>  LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
>  }
>  } else {
>  emitted.add(msgId);
>  offsetManagers.get(tp).addToEmitMsgs(msgId.offset());
>  if (isScheduled) { // Was scheduled for retry and re-emitted, so remove from schedule.
>  retryService.remove(msgId);
>  }
>  collector.emit(stream, tuple, msgId);
>  tupleListener.onEmit(tuple, msgId);
>  LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, record, msgId);
>  }
>  return true;
> }



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)