You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Jake Maes (JIRA)" <ji...@apache.org> on 2016/10/17 18:26:58 UTC

[jira] [Commented] (SAMZA-1035) Return offset of produced message

    [ https://issues.apache.org/jira/browse/SAMZA-1035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15583013#comment-15583013 ] 

Jake Maes commented on SAMZA-1035:
----------------------------------

Hey [~vlad_sergeev], Thanks for logging this ticket. 

I totally agree that it would be useful to have the offset for each produced message. 

I think there are a few technical challenges that make this non-trivial, so it would require some design:
1. If I remember correctly, the offset is only returned by the producer if "acks" is > 0. So we wouldn't have consistent behavior for different configurations. 
2. While this feature makes sense for Kafka, it wont apply to all SystemProducers, and that asymmetry could be confusing for users. 
3. The producer is async and there is currently no mechanism to collect output from the callback threads. The solution will have to take both thread safety and performance into consideration.

To address #3, it might be reasonable to just track the latest offset in each flush().

In any case, the comments above are not suggesting this feature shouldn't be implemented. Rather, they are intended to seed the discussion. 

-Jake

> Return offset of produced message
> ---------------------------------
>
>                 Key: SAMZA-1035
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1035
>             Project: Samza
>          Issue Type: Wish
>            Reporter: Vladislav Sergeev
>            Priority: Minor
>
> Hello.
> We use Apache Kafka as a databus and Apache Samza as a router.
> I suppose good feature for samza will be returning offset of a message ,in the case when we transfer messages from topic A to topic B.
> In our case we have SystemA that produce messages to input_topic and we route with some transformations that messages to topicA and topicB.
> Sometimes it is usefull to collect metrics of processed messages.
> We can do it when they come to method :
> process(IncomingMessageEnvelope incomingMessageEnvelope, MessageCollector messageCollector, TaskCoordinator taskCoordinator)
> But when we route them to another topic like this:
> messageCollector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", topic), keyOut, message));
> We don't know what offset has message in routed topic.
> As i saw in your code:
> https://github.com/apache/samza/blob/master/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
> You use kafka producer that returns offsets for sent message.So it will be great to get it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)