You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Jeff Klukas <jk...@simple.com> on 2016/04/15 16:56:05 UTC

How to explicitly apply TimestampExtractor?

The only hook I see for specifying a TimestampExtractor is in the
Properties that you pass when creating a KafkaStreams instance. Is it
possible to modify the timestamp while processing a stream, or does the
timestamp need to be extracted immediately upon entry into the topology?

I have a case where I'm creating a KStream from a topic with mostly
JSON-formatted messages. I need to deserialize as byte array, filter out
non-JSON messages, call .map on the stream to deserialize those objects
into desired POJOs, and only then reach into the objects to extract the
desired timestamp.

Workarounds I've imagined are either to define a TimestampExtractor that
attempts to do some partial deserialization of the payload to get at the
timestamp field; or, to create two separate topologies, with the second one
reading a topic that's already filtered.

Re: How to explicitly apply TimestampExtractor?

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Jeff,

We currently does not expose the TimestampExtractor, as it will always be
applied for all records polled from consumer automatically.

As for your case, do you have the JSON-formatted along with non-JSON
messages on the same topic? In that case, I agree with you that you could
do the filtering on a first topology and pipe to another topic with pure
JSON formatted messages.


Guozhang


On Fri, Apr 15, 2016 at 7:56 AM, Jeff Klukas <jk...@simple.com> wrote:

> The only hook I see for specifying a TimestampExtractor is in the
> Properties that you pass when creating a KafkaStreams instance. Is it
> possible to modify the timestamp while processing a stream, or does the
> timestamp need to be extracted immediately upon entry into the topology?
>
> I have a case where I'm creating a KStream from a topic with mostly
> JSON-formatted messages. I need to deserialize as byte array, filter out
> non-JSON messages, call .map on the stream to deserialize those objects
> into desired POJOs, and only then reach into the objects to extract the
> desired timestamp.
>
> Workarounds I've imagined are either to define a TimestampExtractor that
> attempts to do some partial deserialization of the payload to get at the
> timestamp field; or, to create two separate topologies, with the second one
> reading a topic that's already filtered.
>



-- 
-- Guozhang