You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Zekun Yu (Jira)" <ji...@apache.org> on 2020/09/22 00:47:00 UTC

[jira] [Created] (FLINK-19332) Special characters issue using Kinesis Data Analytics for Apache Flink

Zekun Yu created FLINK-19332:
--------------------------------

             Summary: Special characters issue using Kinesis Data Analytics for Apache Flink
                 Key: FLINK-19332
                 URL: https://issues.apache.org/jira/browse/FLINK-19332
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kinesis
    Affects Versions: 1.8.2
            Reporter: Zekun Yu
             Fix For: 1.8.2


Hi there,

 

I am encountering one special character issue while using Kinesis Data Analytics for Apache Flink (KDA).

 

Our KDA is built for processing data and outputting to a Kinesis stream. We have a lambda function that subscribes to the Kinesis stream and reads records from the Kinesis stream.

The library in the KDA I am using is "org.apache.flink.streaming.connectors.kinesis".

 

Our KDA is only outputting one single record to the Kinesis sink using "collector.collect()" for a single key (details will be found below)

Most times, the record received by the Lambda looks perfectly good.

However, occasionally, when two records are sent to the kinesis sink using "collector.collect()" at the same time, *we noticed that those two records are combined somehow and there are some special characters in the record received by the Lambda function*.

 

 

 

 

 

 

Below are some technical details:

 

The KDA is not using any "TimeWindow()" but uses "keyBy()" by some keys.

).returns(MatchedDataForAlarm.class)
        .keyBy(MatchedDataForAlarm::getStateKey)
        .connect(ruleBroadcastStream)
        .process(new MetricProcess())
        .addSink(kinesis);

 

 

The "MetricProcess()" extends "KeyedBroadcastProcessFunction" which overrides the "processElement" function. It uses collector.collect() for outputs.

@Override
public void processElement(MatchedDataForAlarm input, ReadOnlyContext ctx,Collector<MatchedDataForAlarm> collector) throws Exception {

 

 

We have our own AEMMatchedDataForAlarmSchemaSerialization which implements KinesisSerializationSchema<MatchedDataForAlarm>. The serialization simply converts a "MatchedDataForAlarm" object to String using Gson and then converts to ByteBuffer.

 

@Override
public ByteBuffer serialize(MatchedDataForAlarm matchedDataForAlarm) {
    Gson gson = new Gson();
    String result = gson.toJson(matchedDataForAlarm);
    _log_.info("Alarm record sent to Kinesis stream: {}", result);
    return ByteBuffer._wrap_(result.getBytes());
}

 

 

 

*Here's the record shown in the Lambda logs when two records are combined somewhere somehow (most cases those two are received as two separate records):*

 

????
0??

{ "inAlarmState": false }

??

{ "inAlarmState": false}

e????E????o?N9x

 

 

 

 

I am not sure if it's a serialization issue or some default behaviors in the Kinesis sink library? It might be just some common mistakes that I made which I am not aware of.

Could anyone help with this problem? I really appreciate it.

 

 

 

Thanks,

Zekun

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)