You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Warren, Brad" <Br...@aa.com> on 2018/03/13 19:51:43 UTC

Re: [DISCUSS]: KIP-159: Introducing Rich functions to Streams

Hi devs,

It's a bit difficult to put all of the pieces together regarding the status and API changes around the KIPs dealing with exposing the record metadata in the Processor and DSL APIs.  This is a feature that my team here at American Airlines is keenly interested in and I'd like to provide a real world use case to help move the discussion along:

I have a source topic that contains a text value that includes datetimes without a year.  The desire is to order the records in a stream by an extracted timestamp from the record value and we plan to use the timestamp from the source topic to provide the year.  We're hoping to use the DSL.  Something like:

val streamOrderedByMyValueTime = Builder.stream("sourceTopic").map( K,V -> KeyValue(KR, VR, timestamp) )

so then I can do

groupBy(), aggregate(), etc.

Inside the mapper, my timestamp would be something like LocalDateTime.of(yearFromIncomingConsumerRecordTimestamp, monthFromValue, dayFromValue, ....)

Looking at the wiki here https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=73637757, what is the proposed implementation of RichValueMapper?  Is it going to support what I want to do here?


Thanks,
Brad

[cid:49F8CA06-65F7-457B-9DC0-8251F696295B]

Brad Warren
Principal Application Architect
Airport Technology

brad.warren@aa.com

[cid:DB82A805-2411-4411-8D3D-3688F7234324]





NOTICE: This email and any attachments are for the exclusive and confidential use of the intended recipient(s). If you are not an intended recipient, please do not read, distribute, or take action in reliance upon this message. If you have received this in error, please notify me immediately by return email and promptly delete this message and its attachments from your computer.

Re: [DISCUSS]: KIP-159: Introducing Rich functions to Streams

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Warren,

thanks for following up this KIP. And sorry for the "messy" discussion
thread. Adding this feature is a little tricky. We still hope to get it
into 1.2 release, but atm there is not much progress.

However, for your use case, you can replace .map() with .transform()
that allows you to access the record's timestamp (via the provided
`context` object) as extracted from the TimestampExtractor. See the docs
for more details:
https://kafka.apache.org/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration


-Matthias

On 3/13/18 12:51 PM, Warren, Brad wrote:
> Hi devs,
> 
>  
> 
> It’s a bit difficult to put all of the pieces together regarding the
> status and API changes around the KIPs dealing with exposing the record
> metadata in the Processor and DSL APIs.  This is a feature that my team
> here at American Airlines is keenly interested in and I’d like to
> provide a real world use case to help move the discussion along:
> 
>  
> 
> I have a source topic that contains a text value that includes datetimes
> without a year.  The desire is to order the records in a stream by an
> extracted timestamp from the record value and we plan to use the
> timestamp from the source topic to provide the year.  We’re hoping to
> use the DSL.  Something like:
> 
>  
> 
> val streamOrderedByMyValueTime = Builder.stream(“sourceTopic”).map( K,V
> -> KeyValue(KR, VR, timestamp) )
> 
>  
> 
> so then I can do
> 
>  
> 
> groupBy(), aggregate(), etc.
> 
>  
> 
> Inside the mapper, my timestamp would be something like
> LocalDateTime.of(yearFromIncomingConsumerRecordTimestamp,
> monthFromValue, dayFromValue, ….)
> 
>  
> 
> Looking at the wiki here
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=73637757,
> what is the proposed implementation of RichValueMapper?  Is it going to
> support what I want to do here?
> 
>  
> 
>  
> 
> Thanks,
> 
> Brad
> 
>  
> 
> cid:49F8CA06-65F7-457B-9DC0-8251F696295B
> 
>  
> 
> *Brad Warren***
> 
> /Principal Application Architect/
> 
> /Airport Technology///
> 
>  
> 
> brad.warren@aa.com
> 
>  
> 
> cid:DB82A805-2411-4411-8D3D-3688F7234324
> 
>  
> 
>  
> 
> 
> 
>  
> 
> NOTICE: This email and any attachments are for the exclusive and
> confidential use of the intended recipient(s). If you are not an
> intended recipient, please do not read, distribute, or take action in
> reliance upon this message. If you have received this in error, please
> notify me immediately by return email and promptly delete this message
> and its attachments from your computer.
>