You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Abhishek Rai <ab...@netspring.io> on 2021/02/04 22:28:41 UTC

Re: Re: Re: Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12)

I had a similar need recently and ended up using
KafkaDeserializationSchemaWrapper to wrap a given
DeserializationSchema.  The resulting
KafkaDeserializationSchema[Wrapper] can be passed directly to the
`FlinkKafkaConsumer` constructor.

```
class BoundingDeserializationSchema
    extends KafkaDeserializationSchemaWrapper<Row> {
 private static final long serialVersionUID = 1858204203663583785L;
 private long maxRecords_;
 private long numRecords_ = 0;

 public BoundingDeserializationSchema(
     DeserializationSchema<Row> deserializationSchema,
     long maxRecords) {
  super(deserializationSchema);
  maxRecords_ = maxRecords;
 }

 @Override
 public void deserialize(
     ConsumerRecord<byte[], byte[]> message, Collector<Row> out)
     throws Exception {
  super.deserialize(message, out);
  numRecords_++;
 }

 @Override
 public boolean isEndOfStream(Row nextElement) {
  return numRecords_ >= maxRecords_;
 }
}

```

On Thu, Jan 14, 2021 at 6:15 AM sagar <sa...@gmail.com> wrote:
>
> Thanks Yun
>
>
>
> On Thu, Jan 14, 2021 at 1:58 PM Yun Gao <yu...@aliyun.com> wrote:
>>
>> Hi Sagar,
>>
>>   I rechecked and found that the new kafka source is not formally publish yet, and a stable method I think may be try adding the FlinkKafkaConsumer as a BOUNDED source first. Sorry for the inconvient.
>>
>> Best,
>>  Yun
>>
>> ------------------------------------------------------------------
>> Sender:Yun Gao<yu...@aliyun.com>
>> Date:2021/01/14 15:26:54
>> Recipient:Ardhani Narasimha<ar...@razorpay.com>; sagar<sa...@gmail.com>
>> Cc:Flink User Mail List<us...@flink.apache.org>
>> Theme:Re: Re: Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12)
>>
>> Hi Sagar,
>>
>>       I think the problem is that the legacy source implemented by extending SourceFunction are all defined as CONTINOUS_UNBOUNDED when use env.addSource(). Although there is hacky way to add the legacy sources as BOUNDED source [1], I think you may first have a try of new version of KafkaSource [2] ? The new version of KafkaSource is implemented with the new Source API [3], which provides unfied support for the streaming and batch mode.
>>
>> Best,
>>  Yun
>>
>>
>>
>>
>> [1] https://github.com/apache/flink/blob/fc00492654a3707dc5ad54fc3dd33453bb568ed1/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchExecutionFileSinkITCase.java#L64
>> [2]  https://github.com/apache/flink/blob/fc00492654a3707dc5ad54fc3dd33453bb568ed1/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java#L69
>> [3] https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>>
>>
>>
>> ------------------Original Mail ------------------
>> Sender:Ardhani Narasimha <ar...@razorpay.com>
>> Send Date:Thu Jan 14 15:11:35 2021
>> Recipients:sagar <sa...@gmail.com>
>> CC:Flink User Mail List <us...@flink.apache.org>
>> Subject:Re: Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12)
>>>
>>> Interesting use case.
>>>
>>> Can you please elaborate more on this.
>>> On what criteria do you want to batch? Time? Count? Or Size?
>>>
>>> On Thu, 14 Jan 2021 at 12:15 PM, sagar <sa...@gmail.com> wrote:
>>>>
>>>> Hi Team,
>>>>
>>>> I am getting the following error while running DataStream API in with batch mode with kafka source.
>>>> I am using FlinkKafkaConsumer to consume the data.
>>>>
>>>> Caused by: java.lang.IllegalStateException: Detected an UNBOUNDED source with the 'execution.runtime-mode' set to 'BATCH'. This combination is not allowed, please set the 'execution.runtime-mode' to STREAMING or AUTOMATIC
>>>> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:198) ~[flink-core-1.12.0.jar:1.12.0]
>>>>
>>>> In my batch program I wanted to work with four to five different stream in batch mode as data source is bounded
>>>>
>>>> I don't find any clear example of how to do it with kafka souce with Flink 1.12
>>>>
>>>> I don't want to use JDBC source as underlying database table may change. please give me some example on how to achieve the above use case.
>>>>
>>>> Also for any large bounded source are there any alternatives to achieve this?
>>>>
>>>>
>>>>
>>>> --
>>>> ---Regards---
>>>>
>>>>   Sagar Bandal
>>>>
>>>> This is confidential mail ,All Rights are Reserved.If you are not intended receipiant please ignore this email.
>>>
>>>
>>> -----------------------------------------------------------------------------------------------------------------------------------------------------------------------
>>> IMPORTANT: The contents of this email and any attachments are confidential and protected by applicable laws. If you have received this email by mistake, please (i) notify the sender immediately; (ii) delete it from your database; and (iii) do not disclose the contents to anyone or make copies thereof. Razorpay accepts no liability caused due to any inadvertent/ unintentional data transmitted through this email.
>>> -----------------------------------------------------------------------------------------------------------------------------------------------------------------------
>>
>>
>
>
> --
> ---Regards---
>
>   Sagar Bandal
>
> This is confidential mail ,All Rights are Reserved.If you are not intended receipiant please ignore this email.