You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by simone <si...@gmail.com> on 2017/05/16 08:44:17 UTC

Problem with Kafka Consumer

Hi to all,

I have a problem with Flink and Kafka queues.

I have a Producer that puts some Rows into a data Sink represented by a 
kafka queue and a Consumer that reads from this sink and process Rows in 
buckets of *N* elements using custom trigger function

/messageStream.keyBy(0)//
//        .windowAll(GlobalWindows.create())//
//        .trigger(CountWithTimeoutTrigger.of(Time.seconds(30), *N*))//
//        .apply(new RowToQuery());/

/
/The problem is that the Consumer, stop to consume data once reached 
about 1000 rows.
With N = 20 the consumer process 50 buckets for a total of 1000 elements.
With N = 21 the consumer process 48 buckets for a total of 1008 elements.
With N = 68 the consumer process 15 buckets for a total of 1020 
elements. And so on...

The same happens also without using a custom trigger function, but with 
simple CountTrigger function:

/messageStream.keyBy(0)//
//        .windowAll(GlobalWindows.create())//
//         .trigger(PurgingTrigger.of(CountTrigger.of(//*N*//)))//
//         .apply(new RowToQuery());/

How is it possible? Is there any properties on Consumer to be set in 
order to process more data?

Thanks,

Simone.


Re: Problem with Kafka Consumer

Posted by simone <si...@gmail.com>.
Hi Kostas,

As suggested I switched to version 1.3-SNAPSHOT and the project run 
without any problem. I will keep you informed if any other issue occurs. 
Thanks again for the help.

Cheers,
Simone.

On 16/05/2017 16:36, Kostas Kloudas wrote:
> Hi Simone,
>
> Glad I could help ;)
>
> Actually it would be great if you could also try out the upcoming (not 
> yet released) 1.3 version
> and let us know if you find something that does not work as expected.
>
> We are currently in the phase of testing it, as you may have noticed, 
> and every contribution to
> that front is more than welcomed.
>
> Cheers,
> Kostas
>
>> On May 16, 2017, at 4:30 PM, simone <simone.povoscania@gmail.com 
>> <ma...@gmail.com>> wrote:
>>
>> Hi Kostas,
>>
>> thanks for your suggestion. Indeed, replacing my custom sink with a 
>> simpler one problem bring out that the cause of the problem was 
>> RowToQuery as you suggested. The sink was blocking the reads making 
>> the Kafka pipeline stall, due to a misconfiguration of an internal 
>> client that is calling an external service.
>>
>> Thanks for your help,
>> Simone.
>>
>>
>> On 16/05/2017 14:01, Kostas Kloudas wrote:
>>> Hi Simone,
>>>
>>> I suppose that you use messageStream.keyBy(…).window(…) right? 
>>> .windowAll() is not applicable to keyedStreams.
>>>
>>> Some follow up questions are:
>>>
>>> In your logs, do you see any error messages?
>>> What does your RowToQuery() sink do? Can it be that it blocks and 
>>> the back pressure makes all the pipeline stall?
>>> To check that, you can:
>>> 1) check the webui for backpressure metrics
>>> 2) replace your sink with a dummy one that just prints whatever it 
>>> receives
>>> 3) or even put a flatmap after reading from Kafka (before the 
>>> keyBy()) that prints the elements before sending
>>> them downstream, so that you know if the consumer keeps on reading.
>>>
>>> Let us know what is the result for the previous.
>>>
>>> Thanks,
>>> Kostas
>>>
>>>> On May 16, 2017, at 10:44 AM, simone <simone.povoscania@gmail.com 
>>>> <ma...@gmail.com>> wrote:
>>>>
>>>> Hi to all,
>>>>
>>>> I have a problem with Flink and Kafka queues.
>>>>
>>>> I have a Producer that puts some Rows into a data Sink represented 
>>>> by a kafka queue and a Consumer that reads from this sink and 
>>>> process Rows in buckets of *N* elements using custom trigger function
>>>>
>>>> /messageStream.keyBy(0)//
>>>> //.windowAll(GlobalWindows.create())//
>>>> //.trigger(CountWithTimeoutTrigger.of(Time.seconds(30), *N*))//
>>>> //        .apply(new RowToQuery());/
>>>>
>>>> /
>>>> /The problem is that the Consumer, stop to consume data once 
>>>> reached about 1000 rows.
>>>> With N = 20 the consumer process 50 buckets for a total of 1000 
>>>> elements.
>>>> With N = 21 the consumer process 48 buckets for a total of 1008 
>>>> elements.
>>>> With N = 68 the consumer process 15 buckets for a total of 1020 
>>>> elements. And so on...
>>>>
>>>> The same happens also without using a custom trigger function, but 
>>>> with simple CountTrigger function:
>>>>
>>>> /messageStream.keyBy(0)//
>>>> //.windowAll(GlobalWindows.create())//
>>>> //.trigger(PurgingTrigger.of(CountTrigger.of(//*N*//)))//
>>>> //         .apply(new RowToQuery());/
>>>>
>>>> How is it possible? Is there any properties on Consumer to be set 
>>>> in order to process more data?
>>>>
>>>> Thanks,
>>>>
>>>> Simone.
>>>>
>>>
>>
>


Re: Problem with Kafka Consumer

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Simone,

Glad I could help ;)

Actually it would be great if you could also try out the upcoming (not yet released) 1.3 version 
and let us know if you find something that does not work as expected.

We are currently in the phase of testing it, as you may have noticed, and every contribution to 
that front is more than welcomed.

Cheers,
Kostas

> On May 16, 2017, at 4:30 PM, simone <si...@gmail.com> wrote:
> 
> Hi Kostas,
> 
> thanks for your suggestion. Indeed, replacing my custom sink with a simpler one problem bring out that the cause of the problem was RowToQuery as you suggested. The sink was blocking the reads making the Kafka pipeline stall, due to a misconfiguration of an internal client that is calling an external service.
> 
> Thanks for your help,
> Simone.
> 
> On 16/05/2017 14:01, Kostas Kloudas wrote:
>> Hi Simone,
>> 
>> I suppose that you use messageStream.keyBy(…).window(…) right? .windowAll() is not applicable to keyedStreams.
>> 
>> Some follow up questions are:
>> 
>> In your logs, do you see any error messages? 
>> What does your RowToQuery() sink do? Can it be that it blocks and the back pressure makes all the pipeline stall?
>> To check that, you can: 
>> 	1) check the webui for backpressure metrics
>>  	2) replace your sink with a dummy one that just prints whatever it receives
>> 	3) or even put a flatmap after reading from Kafka (before the keyBy()) that prints the elements before sending 
>> 		them downstream, so that you know if the consumer keeps on reading.
>> 
>> Let us know what is the result for the previous.
>> 
>> Thanks,
>> Kostas
>> 
>>> On May 16, 2017, at 10:44 AM, simone <simone.povoscania@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Hi to all,
>>> 
>>> I have a problem with Flink and Kafka queues.
>>> 
>>> I have a Producer that puts some Rows into a data Sink represented by a kafka queue and a Consumer that reads from this sink and process Rows in buckets of N elements using custom trigger function
>>> messageStream.keyBy(0)
>>>         .windowAll(GlobalWindows.create())
>>>         .trigger(CountWithTimeoutTrigger.of(Time.seconds(30), N))
>>>         .apply(new RowToQuery());
>>> 
>>> 
>>> The problem is that the Consumer, stop to consume data once reached about 1000 rows.
>>> With N = 20 the consumer process 50 buckets for a total of 1000 elements. 
>>> With N = 21 the consumer process 48 buckets for a total of 1008 elements.
>>> With N = 68 the consumer process 15 buckets for a total of 1020 elements. And so on... 
>>> The same happens also without using a custom trigger function, but with simple CountTrigger function:
>>> 
>>> messageStream.keyBy(0)
>>>         .windowAll(GlobalWindows.create())
>>>          .trigger(PurgingTrigger.of(CountTrigger.of(N)))
>>>          .apply(new RowToQuery());
>>> How is it possible? Is there any properties on Consumer to be set in order to process more data?
>>> 
>>> Thanks,
>>> 
>>> Simone.
>> 
> 


Re: Problem with Kafka Consumer

Posted by simone <si...@gmail.com>.
Hi Kostas,

thanks for your suggestion. Indeed, replacing my custom sink with a 
simpler one problem bring out that the cause of the problem was 
RowToQuery as you suggested. The sink was blocking the reads making the 
Kafka pipeline stall, due to a misconfiguration of an internal client 
that is calling an external service.

Thanks for your help,
Simone.


On 16/05/2017 14:01, Kostas Kloudas wrote:
> Hi Simone,
>
> I suppose that you use messageStream.keyBy(…).window(…) right? 
> .windowAll() is not applicable to keyedStreams.
>
> Some follow up questions are:
>
> In your logs, do you see any error messages?
> What does your RowToQuery() sink do? Can it be that it blocks and the 
> back pressure makes all the pipeline stall?
> To check that, you can:
> 1) check the webui for backpressure metrics
> 2) replace your sink with a dummy one that just prints whatever it 
> receives
> 3) or even put a flatmap after reading from Kafka (before the keyBy()) 
> that prints the elements before sending
> them downstream, so that you know if the consumer keeps on reading.
>
> Let us know what is the result for the previous.
>
> Thanks,
> Kostas
>
>> On May 16, 2017, at 10:44 AM, simone <simone.povoscania@gmail.com 
>> <ma...@gmail.com>> wrote:
>>
>> Hi to all,
>>
>> I have a problem with Flink and Kafka queues.
>>
>> I have a Producer that puts some Rows into a data Sink represented by 
>> a kafka queue and a Consumer that reads from this sink and process 
>> Rows in buckets of *N* elements using custom trigger function
>>
>> /messageStream.keyBy(0)//
>> //.windowAll(GlobalWindows.create())//
>> //.trigger(CountWithTimeoutTrigger.of(Time.seconds(30), *N*))//
>> //        .apply(new RowToQuery());/
>>
>> /
>> /The problem is that the Consumer, stop to consume data once reached 
>> about 1000 rows.
>> With N = 20 the consumer process 50 buckets for a total of 1000 
>> elements.
>> With N = 21 the consumer process 48 buckets for a total of 1008 elements.
>> With N = 68 the consumer process 15 buckets for a total of 1020 
>> elements. And so on...
>>
>> The same happens also without using a custom trigger function, but 
>> with simple CountTrigger function:
>>
>> /messageStream.keyBy(0)//
>> //.windowAll(GlobalWindows.create())//
>> //.trigger(PurgingTrigger.of(CountTrigger.of(//*N*//)))//
>> //         .apply(new RowToQuery());/
>>
>> How is it possible? Is there any properties on Consumer to be set in 
>> order to process more data?
>>
>> Thanks,
>>
>> Simone.
>>
>


Re: Problem with Kafka Consumer

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Simone,

I suppose that you use messageStream.keyBy(…).window(…) right? .windowAll() is not applicable to keyedStreams.

Some follow up questions are:

In your logs, do you see any error messages? 
What does your RowToQuery() sink do? Can it be that it blocks and the back pressure makes all the pipeline stall?
To check that, you can: 
	1) check the webui for backpressure metrics
 	2) replace your sink with a dummy one that just prints whatever it receives
	3) or even put a flatmap after reading from Kafka (before the keyBy()) that prints the elements before sending 
		them downstream, so that you know if the consumer keeps on reading.

Let us know what is the result for the previous.

Thanks,
Kostas

> On May 16, 2017, at 10:44 AM, simone <si...@gmail.com> wrote:
> 
> Hi to all,
> 
> I have a problem with Flink and Kafka queues.
> 
> I have a Producer that puts some Rows into a data Sink represented by a kafka queue and a Consumer that reads from this sink and process Rows in buckets of N elements using custom trigger function
> messageStream.keyBy(0)
>         .windowAll(GlobalWindows.create())
>         .trigger(CountWithTimeoutTrigger.of(Time.seconds(30), N))
>         .apply(new RowToQuery());
> 
> 
> The problem is that the Consumer, stop to consume data once reached about 1000 rows.
> With N = 20 the consumer process 50 buckets for a total of 1000 elements. 
> With N = 21 the consumer process 48 buckets for a total of 1008 elements.
> With N = 68 the consumer process 15 buckets for a total of 1020 elements. And so on... 
> The same happens also without using a custom trigger function, but with simple CountTrigger function:
> 
> messageStream.keyBy(0)
>         .windowAll(GlobalWindows.create())
>          .trigger(PurgingTrigger.of(CountTrigger.of(N)))
>          .apply(new RowToQuery());
> How is it possible? Is there any properties on Consumer to be set in order to process more data?
> 
> Thanks,
> 
> Simone.