You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Madhusudan Shastri <te...@gmail.com> on 2018/10/23 14:22:29 UTC

Get request header from Kinesis

Hi,
    I am using below code to read data from AWS Kinesis stream. But it is
giving me the request body and not the request header. How to get the
request header from Kinesis. My flink jar versions are:
flink-java - 1.6.1
flink-streaming-java_2.11 - 1.6.1
flink-connector-kinesis_2.11 - 1.6.1

My code is:
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "<Region>");
consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "<KEY>");
consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY,
"<SECRET>");
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"LATEST");
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> kinesis = env.addSource(
new FlinkKinesisConsumer<String>("<STREAM>", new SimpleStringSchema(),
consumerConfig));
kinesis.addSink(new SinkFunction<String>() {
@Override
public void invoke(String value) throws Exception {
System.out.println("value= "+value);
}
});
env.execute();

Thanks and Regards,
Madhusudan B. Shastri

Re: Get request header from Kinesis

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi,

Could you point to the AWS Kinesis Java API that exposes record headers?
As far as I can tell from the Javadoc, I can’t seem to find how to retrieve headers from Kinesis records.

If there is a way to do that, then it might make sense to expose that from the Kinesis connector’s serialization / deserialization schema interfaces.

Cheers,
Gordon

On 23 October 2018 at 10:22:45 PM, Madhusudan Shastri (testmailmadhusudan@gmail.com) wrote:

Hi,
    I am using below code to read data from AWS Kinesis stream. But it is giving me the request body and not the request header. How to get the request header from Kinesis. My flink jar versions are:
flink-java - 1.6.1
flink-streaming-java_2.11 - 1.6.1
flink-connector-kinesis_2.11 - 1.6.1

My code is: 
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "<Region>");
consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "<KEY>");
consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "<SECRET>");
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> kinesis = env.addSource(
new FlinkKinesisConsumer<String>("<STREAM>", new SimpleStringSchema(), consumerConfig));
kinesis.addSink(new SinkFunction<String>() {
@Override
public void invoke(String value) throws Exception {
System.out.println("value= "+value);
}
});
env.execute();

Thanks and Regards,
Madhusudan B. Shastri