You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dian Fu (Jira)" <ji...@apache.org> on 2020/10/01 13:50:00 UTC

[jira] [Commented] (FLINK-19332) Special characters issue using Kinesis Data Analytics for Apache Flink

    [ https://issues.apache.org/jira/browse/FLINK-19332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17205532#comment-17205532 ] 

Dian Fu commented on FLINK-19332:
---------------------------------

Hi [~zekuny] Personally I guess this should not be a **Blocker** issue. Could you explain why you think this is a **Blocker** issue?

> Special characters issue using Kinesis Data Analytics for Apache Flink
> ----------------------------------------------------------------------
>
>                 Key: FLINK-19332
>                 URL: https://issues.apache.org/jira/browse/FLINK-19332
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kinesis
>    Affects Versions: 1.8.2
>            Reporter: Zekun Yu
>            Priority: Blocker
>             Fix For: 1.8.2
>
>
> Hi there,
>  
> I am encountering one special character issue while using Kinesis Data Analytics for Apache Flink (KDA).
>  
> Our KDA is built for processing data and outputting to a Kinesis stream. We have a lambda function that subscribes to the Kinesis stream and reads records from the Kinesis stream.
> The library in the KDA I am using is "org.apache.flink.streaming.connectors.kinesis".
>  
> Our KDA is only outputting one single record to the Kinesis sink using "collector.collect()" for a single key (details will be found below)
> Most times, the record received by the Lambda looks perfectly good.
> However, occasionally, when two records are sent to the kinesis sink using "collector.collect()" at the same time, *we noticed that those two records are combined somehow and there are some special characters in the record received by the Lambda function*.
>  
>  
>  
>  
>  
>  
> Below are some technical details:
>  
> The KDA is not using any "TimeWindow()" but uses "keyBy()" by some keys.
> ).returns(MatchedDataForAlarm.class)
>         .keyBy(MatchedDataForAlarm::getStateKey)
>         .connect(ruleBroadcastStream)
>         .process(new MetricProcess())
>         .addSink(kinesis);
>  
>  
> The "MetricProcess()" extends "KeyedBroadcastProcessFunction" which overrides the "processElement" function. It uses collector.collect() for outputs.
> @Override
> public void processElement(MatchedDataForAlarm input, ReadOnlyContext ctx,Collector<MatchedDataForAlarm> collector) throws Exception {
>  
>  
> We have our own AEMMatchedDataForAlarmSchemaSerialization which implements KinesisSerializationSchema<MatchedDataForAlarm>. The serialization simply converts a "MatchedDataForAlarm" object to String using Gson and then converts to ByteBuffer.
>  
> @Override
> public ByteBuffer serialize(MatchedDataForAlarm matchedDataForAlarm) {
>     Gson gson = new Gson();
>     String result = gson.toJson(matchedDataForAlarm);
>     _log_.info("Alarm record sent to Kinesis stream: {}", result);
>     return ByteBuffer._wrap_(result.getBytes());
> }
>  
>  
>  
> *Here's the record shown in the Lambda logs when two records are combined somewhere somehow (most cases those two are received as two separate records):*
>  
> ????
> 0??
> { "inAlarmState": false }
> ??
> { "inAlarmState": false}
> e????E????o?N9x
>  
>  
>  
>  
> I am not sure if it's a serialization issue or some default behaviors in the Kinesis sink library? It might be just some common mistakes that I made which I am not aware of.
> Could anyone help with this problem? I really appreciate it.
>  
>  
>  
> Thanks,
> Zekun
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)