You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Paul Joireman <pa...@physiq.com> on 2017/01/12 20:53:52 UTC
Getting key from keyed stream
Hi all,
Is there a simple way to read the key from a KeyedStream. Very simply I'm trying to read a message from Kafka, separate the incoming messages by a field in the message and write the original message back to Kafka using that field as a new topic. I chose to partition the incoming stream by creating a KeyedStream and using the field from the message as the key. The only thing left is to write the message to Kafka with a producer but i need to know the topic to write to and for that I need to be able to read the key. Is there a way to do this?
Is there a better way to do this, rather than using a KeyedStream.
Paul
Re: Getting key from keyed stream
Posted by Paul Joireman <pa...@physiq.com>.
Thanks Jamie,
Just figured that out after some digging and a little trial and error, that works great.
Paul
________________________________
From: Jamie Grier <ja...@data-artisans.com>
Sent: Thursday, January 12, 2017 4:59:43 PM
To: user@flink.apache.org
Subject: Re: Getting key from keyed stream
A simpler and more efficient approach would simply be the following:
val stream = env.addSource(new FlinkKafkaConsumer(...))
stream
.addSink(new FlinkKafkaProducer(new MyKeyedSerializationSchema(...)))
env.execute()
In MyKeyedSerializationSchema just override the getTargetTopic() method.
That should do it :)
-Jamie
On Thu, Jan 12, 2017 at 12:53 PM, Paul Joireman <pa...@physiq.com>> wrote:
Hi all,
Is there a simple way to read the key from a KeyedStream. Very simply I'm trying to read a message from Kafka, separate the incoming messages by a field in the message and write the original message back to Kafka using that field as a new topic. I chose to partition the incoming stream by creating a KeyedStream and using the field from the message as the key. The only thing left is to write the message to Kafka with a producer but i need to know the topic to write to and for that I need to be able to read the key. Is there a way to do this?
Is there a better way to do this, rather than using a KeyedStream.
Paul
?
--
Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier<https://twitter.com/jamiegrier>
jamie@data-artisans.com<ma...@data-artisans.com>
Re: Getting key from keyed stream
Posted by Jamie Grier <ja...@data-artisans.com>.
A simpler and more efficient approach would simply be the following:
val stream = env.addSource(new FlinkKafkaConsumer(...))
stream
.addSink(new FlinkKafkaProducer(new MyKeyedSerializationSchema(...)))
env.execute()
In MyKeyedSerializationSchema just override the getTargetTopic() method.
That should do it :)
-Jamie
On Thu, Jan 12, 2017 at 12:53 PM, Paul Joireman <pa...@physiq.com>
wrote:
Hi all,
>
>
> Is there a simple way to read the key from a KeyedStream. Very simply
> I'm trying to read a message from Kafka, separate the incoming messages by
> a field in the message and write the original message back to Kafka using
> that field as a new topic. I chose to partition the incoming stream by
> creating a KeyedStream and using the field from the message as the key.
> The only thing left is to write the message to Kafka with a producer but i
> need to know the topic to write to and for that I need to be able to read
> the key. Is there a way to do this?
>
>
> Is there a better way to do this, rather than using a KeyedStream.
>
>
> Paul
>
--
Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
jamie@data-artisans.com