You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Steve Bistline <sr...@gmail.com> on 2018/11/09 14:54:07 UTC
Flink Question
I am having problems with the Flink Kinesis adapter. I have some native KCL
code that works fine. I want to replace the .addSource with the CSV String
data that is coming in from my KCL code. How can I do that?
// Consume the data streams from AWS Kinesis stream
DataStream<Event> dataStream = env.addSource(new FlinkKinesisConsumer<>(
pt.getRequired("stream"),
new EventSchema(),
kinesisConsumerConfig))
.name("Kinesis Stream Consumer");
Any help would be appreciated
Thanks,
Steve
Re: Flink Question
Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Steve,
I’m not sure what you mean by “replacing addSource with CSV String data”. Are your Kinesis records CSV and you want to parse them into Events?
If so, you should be able to do that in the provided DeserializationSchema.
Cheers,
Gordon
On 9 November 2018 at 10:54:22 PM, Steve Bistline (srbistline.tech@gmail.com) wrote:
I am having problems with the Flink Kinesis adapter. I have some native KCL code that works fine. I want to replace the .addSource with the CSV String data that is coming in from my KCL code. How can I do that?
// Consume the data
streams from AWS Kinesis stream
DataStream<Event> dataStream = env.addSource(new FlinkKinesisConsumer<>(
pt.getRequired("stream"),
new EventSchema(),
kinesisConsumerConfig))
.name("Kinesis Stream Consumer");
Any help would be appreciated
Thanks,
Steve