You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dian Fu (Jira)" <ji...@apache.org> on 2020/10/01 13:50:00 UTC
[jira] [Commented] (FLINK-19332) Special characters issue using
Kinesis Data Analytics for Apache Flink
[ https://issues.apache.org/jira/browse/FLINK-19332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17205532#comment-17205532 ]
Dian Fu commented on FLINK-19332:
---------------------------------
Hi [~zekuny] Personally I guess this should not be a **Blocker** issue. Could you explain why you think this is a **Blocker** issue?
> Special characters issue using Kinesis Data Analytics for Apache Flink
> ----------------------------------------------------------------------
>
> Key: FLINK-19332
> URL: https://issues.apache.org/jira/browse/FLINK-19332
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kinesis
> Affects Versions: 1.8.2
> Reporter: Zekun Yu
> Priority: Blocker
> Fix For: 1.8.2
>
>
> Hi there,
>
> I am encountering one special character issue while using Kinesis Data Analytics for Apache Flink (KDA).
>
> Our KDA is built for processing data and outputting to a Kinesis stream. We have a lambda function that subscribes to the Kinesis stream and reads records from the Kinesis stream.
> The library in the KDA I am using is "org.apache.flink.streaming.connectors.kinesis".
>
> Our KDA is only outputting one single record to the Kinesis sink using "collector.collect()" for a single key (details will be found below)
> Most times, the record received by the Lambda looks perfectly good.
> However, occasionally, when two records are sent to the kinesis sink using "collector.collect()" at the same time, *we noticed that those two records are combined somehow and there are some special characters in the record received by the Lambda function*.
>
>
>
>
>
>
> Below are some technical details:
>
> The KDA is not using any "TimeWindow()" but uses "keyBy()" by some keys.
> ).returns(MatchedDataForAlarm.class)
> .keyBy(MatchedDataForAlarm::getStateKey)
> .connect(ruleBroadcastStream)
> .process(new MetricProcess())
> .addSink(kinesis);
>
>
> The "MetricProcess()" extends "KeyedBroadcastProcessFunction" which overrides the "processElement" function. It uses collector.collect() for outputs.
> @Override
> public void processElement(MatchedDataForAlarm input, ReadOnlyContext ctx,Collector<MatchedDataForAlarm> collector) throws Exception {
>
>
> We have our own AEMMatchedDataForAlarmSchemaSerialization which implements KinesisSerializationSchema<MatchedDataForAlarm>. The serialization simply converts a "MatchedDataForAlarm" object to String using Gson and then converts to ByteBuffer.
>
> @Override
> public ByteBuffer serialize(MatchedDataForAlarm matchedDataForAlarm) {
> Gson gson = new Gson();
> String result = gson.toJson(matchedDataForAlarm);
> _log_.info("Alarm record sent to Kinesis stream: {}", result);
> return ByteBuffer._wrap_(result.getBytes());
> }
>
>
>
> *Here's the record shown in the Lambda logs when two records are combined somewhere somehow (most cases those two are received as two separate records):*
>
> ????
> 0??
> { "inAlarmState": false }
> ??
> { "inAlarmState": false}
> e????E????o?N9x
>
>
>
>
> I am not sure if it's a serialization issue or some default behaviors in the Kinesis sink library? It might be just some common mistakes that I made which I am not aware of.
> Could anyone help with this problem? I really appreciate it.
>
>
>
> Thanks,
> Zekun
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)