You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Michael Ran <gr...@163.com> on 2021/06/17 06:34:56 UTC

如何使用kafka的KafkaDynamicTableFactory ,替换他的json format

dear all :
        目前有个小需求,由于binlog数据不同,不方便直接使用 format="json",想自定义format进行处理。
        但是根据 “implements DeserializationFormatFactory, SerializationFormatFactory” 这样自定义format之后,只能处理binlog的原始数据,我还想拿kafka的offset、partition等信息。我看kafka原生的DynamicKafkaDeserializationSchema    有方法 
        deserialize(ConsumerRecord<byte[], byte[]> record, Collector<RowData> collector) 。
        包装了offset 的对象:ConsumerRecord   ,我也想使用这个东西,或者用自定义的format替换他这部分,能做到吗?还是要connector 进行从定义?
  

Re:Re: 如何使用kafka的KafkaDynamicTableFactory ,替换他的json format

Posted by Michael Ran <gr...@163.com>.
本来想从DeserializationFormat 拿到的,如果不能。后续SQL 能拿到也行
在 2021-06-17 14:41:55,"Jingsong Li" <ji...@gmail.com> 写道:
>不能,除非你自己创建一个新的kafka connector。
>
>不过,
>kafka的offset、partition等信息是可以通过metadata的方式拿到的。
>
>你是需要在DeserializationFormat里面拿到offset、partition?还是说后续的SQL拿到就行了?
>
>Best,
>Jingsong
>
>On Thu, Jun 17, 2021 at 2:35 PM Michael Ran <gr...@163.com> wrote:
>
>> dear all :
>>         目前有个小需求,由于binlog数据不同,不方便直接使用 format="json",想自定义format进行处理。
>>         但是根据 “implements DeserializationFormatFactory,
>> SerializationFormatFactory”
>> 这样自定义format之后,只能处理binlog的原始数据,我还想拿kafka的offset、partition等信息。我看kafka原生的DynamicKafkaDeserializationSchema
>>   有方法
>>         deserialize(ConsumerRecord<byte[], byte[]> record,
>> Collector<RowData> collector) 。
>>         包装了offset 的对象:ConsumerRecord
>>  ,我也想使用这个东西,或者用自定义的format替换他这部分,能做到吗?还是要connector 进行从定义?
>>
>
>
>
>-- 
>Best, Jingsong Lee

Re: 如何使用kafka的KafkaDynamicTableFactory ,替换他的json format

Posted by Jingsong Li <ji...@gmail.com>.
不能,除非你自己创建一个新的kafka connector。

不过,
kafka的offset、partition等信息是可以通过metadata的方式拿到的。

你是需要在DeserializationFormat里面拿到offset、partition?还是说后续的SQL拿到就行了?

Best,
Jingsong

On Thu, Jun 17, 2021 at 2:35 PM Michael Ran <gr...@163.com> wrote:

> dear all :
>         目前有个小需求,由于binlog数据不同,不方便直接使用 format="json",想自定义format进行处理。
>         但是根据 “implements DeserializationFormatFactory,
> SerializationFormatFactory”
> 这样自定义format之后,只能处理binlog的原始数据,我还想拿kafka的offset、partition等信息。我看kafka原生的DynamicKafkaDeserializationSchema
>   有方法
>         deserialize(ConsumerRecord<byte[], byte[]> record,
> Collector<RowData> collector) 。
>         包装了offset 的对象:ConsumerRecord
>  ,我也想使用这个东西,或者用自定义的format替换他这部分,能做到吗?还是要connector 进行从定义?
>



-- 
Best, Jingsong Lee