You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Yunus Olgun <yu...@gmail.com> on 2017/09/27 14:35:28 UTC

CustomPartitioner that simulates ForwardPartitioner and watermarks

Hi,

I have a simple streaming job such as:

source.process(taskA)
           .process(taskB)

I want taskB to access minimum watermark of all parallel taskA instances, but the data is ordered and should not be shuffled. ForwardPartitioner uses watermark of only one predecessor. So, I have used a customPartitioner.

source.process(taskA)
           .map(AssignPartitionID)
           .partitionCustom(IdPartitioner)
           .map(StripPartitionID)
           .process(taskB)

At AssignPartitionID function, I attach getRuntimeContext().getIndexOfThisSubtask() as a partitionId to the object. At IdPartitioner, I return this partitionId.

This solved the main requirement but I have another concern now,

Network shuffle: I don’t need a network shuffle. I thought within a taskmanager, indexId of taskA subtasks would be same as indexId of taskB subtasks. Unfortunately, they are not. Is there a way to make partitionCustom distribute data like ForwardPartitioner, to the next local operator? 

As I know, this still requires object serialization/deserialization since operators can’t be chained anymore. Is there a way to get minimum watermark from upstream operators without network shuffle and object serilization/deserialization?

Regards,

Re: CustomPartitioner that simulates ForwardPartitioner and watermarks

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

As you noticed, Flink does currently not put Source-X and Throttler-X (for some X) in the same task slot (TaskManager). In the low-level execution system, there are two connection patterns: ALL_TO_ALL and POINTWISE. Flink will only schedule Source-X and Throttler-X on the same slot when the POINTWISE pattern is used. At the API level, only when there is a simple "forward" connection between operations in the streaming API will the POINTWISE pattern be used, for all other partitioning schemes ALL_TO_ALL is used with a custom (API-level partitioner).

It might be possible to change this, I'll get back to you once I investigated more.

Best,
Aljoscha

> On 29. Sep 2017, at 00:05, Yunus Olgun <yu...@gmail.com> wrote:
> 
> Hi Kostas, Aljoscha,
> 
> To answer Kostas’s concern, the algorithm works this way:
> 
> Let’s say we have two sources Source-0 and Source-1. Source-0 is slow and Source-1 is fast. Sources read from Kafka at different paces. Threshold is 10 time units.
> 
> 1st cycle: Source-0 sends records with timestamp 1,2 and emit watermark 2. Throttle-0 has WM 2.
>                 Source-1 sends records with timestamp 1,2,3 and emit watermark 3. Throttle-1 has also WM 2.
> .
> .
> .
> 10th cycle: Source-0 sends records with timestamp 19, 20 and emit watermark 20. Throttle-0 has WM 20.
>                   Source-1 sends records with timestamp 28, 29, 30 and emit watermark 30. Throttle-1 has also WM 20.
> 
> 11th cycle: Source-0 sends records with timestamp 21,22 and emit watermark 22. Throttle-0 has WM 22.
>                   Source-1 sends records with timestamp 31,32,33 and emit watermark 33. Since, Throttle-1 has a WM of 20 at the beginning of the cycle ,it will start sleeping a very short amount of time for each incoming record. This eventually causes a backpressure to Source-1 and only Source-1. Source-1 starts to poll less frequently from Kafka.
> 
> For this algorithm to work each Throttler should receive records from only one source. Otherwise backpressure will be applied to both sources. I achive that using a custom partitioner and indexIds. Everything that comes from Source-n goes to Throttler-n. Since it is a custom partitioner watermarks gets broadcasted to all throttlers.
> 
> The problem is I thought Source-0 and Throttler-0 will be colocated in the same taskmanager. Unfortunately this is not the case. Source-0 and Throttler-1 can end up in TM-0; Source-1 and Throttler-0 at TM-1. This causes a network shuffle, one more data serialization/deserialization. I want to avoid that if it is possible, since the stream is big.
> 
> Regards,  
>  
>> On 28. Sep 2017, at 23:03, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
>> 
>> To quickly make Kostas' intuition concrete: it's currently not possible to have watermarks broadcast but the data be locally forwarded. The reason is that watermarks and data travel in the same channels so if the watermark needs to be broadcast there needs to be an n to m (in this case m == n) connection pattern between the operations (tasks).
>> 
>> I think your algorithm should work if you take the correct difference, i.e. throttle when timestamp - "global watermark" > threshold. The inverted diff would be "global watermark" - timestamp. I think you're already doing the correct thing, just wanted to clarify for others who might be reading.
>> 
>> Did you check on which TaskManagers the taskA and taskB operators run? I think they should still be running on the same TM if resources permit.
>> 
>> Best,
>> Aljoscha
>>> On 28. Sep 2017, at 10:25, Kostas Kloudas <k.kloudas@data-artisans.com <ma...@data-artisans.com>> wrote:
>>> 
>>> Hi Yunus,
>>> 
>>> I see. Currently I am not sure that you can simply broadcast the watermark only, without 
>>> having a shuffle.
>>> 
>>> But one thing to notice about your algorithm is that, I am not sure if your algorithm solves 
>>> the problem you encounter.
>>> 
>>> Your algorithm seems to prioritize the stream with the elements with the smallest timestamps,
>>> rather than throttling fast streams so that slow ones can catch up.
>>> 
>>> Example: Reading a partition from Kafka that has elements with timestamps 1,2,3
>>> will emit watermark 3 (assuming ascending watermark extractor), while another task that reads 
>>> another partition with elements with timestamps 5,6,7 will emit watermark 7. With your algorithm, 
>>> if I get it right, you will throttle the second partition/task, while allow the first one to advance, although
>>> both read at the same pace (e.g. 3 elements per unit of time).
>>> 
>>> I will think a bit more on the solution. 
>>> 
>>> Some sketches that I can find, they all introduce some latency, e.g. measuring throughput in taskA
>>> and sending it to a side output with a taksID, then broadcasting the side output to a downstream operator
>>> which is sth like a coprocess function (taskB) and receives the original stream and the side output, and 
>>> this is the one that checks if “my task" is slow. 
>>> 
>>> As I said I will think on it a bit more,
>>> Kostas
>>> 
>>>> On Sep 27, 2017, at 6:32 PM, Yunus Olgun <yunolgun@gmail.com <ma...@gmail.com>> wrote:
>>>> 
>>>> Hi Kostas,
>>>> 
>>>> Yes, you have summarized well. I want to only forward the data to the next local operator, but broadcast the watermark through the cluster.
>>>> 
>>>> - I can’t set parallelism of taskB to 1. The stream is too big for that. Also, the data is ordered at each partition. I don’t want to change that order.
>>>> 
>>>> - I don’t need KeyedStream. Also taskA and taskB will always have the same parallelism with each other. But this parallelism can be increased in the future.
>>>> 
>>>> The use case is: The source is Kafka. At our peak hours or when we want to run the streaming job with old data from Kafka, always the same thing happens. Even at trivial jobs. Some consumers consumes faster than others. They produce too much data to downstream but watermark advances slowly at the speed of the slowest consumer. This extra data gets piled up at downstream operators. When the downstream operator is an aggregation, it is ok. But when it is a in-Flink join; state size gets too big, checkpoints take much longer and overall the job becomes slower or fails. Also it effects other jobs at the cluster.
>>>> 
>>>> So, basically I want to implement a throttler. It compares timestamp of a record and the global watermark. If the difference is larger than a constant threshold it starts sleeping 1 ms for each incoming record. This way, fast operators wait for the slowest one.
>>>> 
>>>> The only problem is that, this solution came at the cost of one network shuffle and data serialization/deserialization. Since the stream is large I want to avoid the network shuffle at the least. 
>>>> 
>>>> I thought operator instances within a taskmanager would get the same indexId, but apparently this is not the case.
>>>> 
>>>> Thanks,
>>>> 
>>>>> On 27. Sep 2017, at 17:16, Kostas Kloudas <k.kloudas@data-artisans.com <ma...@data-artisans.com>> wrote:
>>>>> 
>>>>> Hi Yunus,
>>>>> 
>>>>> I am not sure if I understand correctly the question.
>>>>> 
>>>>> Am I correct to assume that you want the following?
>>>>> 
>>>>> 				———————————> time
>>>>> 
>>>>> 		ProcessA						ProcessB
>>>>> 
>>>>> Task1: W(3) E(1) E(2) E(5)			W(3) W(7) E(1) E(2) E(5)
>>>>> 
>>>>> Task2: W(7) E(3) E(10) E(6)			W(3) W(7) E(3) E(10) E(6)
>>>>> 
>>>>> 
>>>>> In the above, elements flow from left to right and W() stands for watermark and E() stands for element.
>>>>> In other words, between Process(TaksA) and Process(TaskB) you want to only forward the elements, but broadcast the watermarks, right?
>>>>> 
>>>>> If this is the case, a trivial solution would be to set the parallelism of TaskB to 1, so that all elements go through the same node.
>>>>> 
>>>>> One other solution is what you did, BUT by using a custom partitioner you cannot use keyed state in your process function B because the 
>>>>> stream is no longer keyed.
>>>>> 
>>>>> A similar approach to what you did but without the limitation above, is that in the first processFunction (TaskA) you can append the 
>>>>> taskId to the elements themselves and then do a keyBy(taskId) between the first and the second process function.
>>>>> 
>>>>> These are the solutions that I can come up with, assuming that you want to do what I described.
>>>>> 
>>>>> But in general, could you please describe a bit more what is your use case? 
>>>>> This way we may figure out another approach to achieve your goal. 
>>>>> In fact, I am not sure if you earn anything by broadcasting the watermark, other than 
>>>>> re-implementing (to some extent) Flink’s windowing mechanism.
>>>>> 
>>>>> Thanks,
>>>>> Kostas
>>>>> 
>>>>>> On Sep 27, 2017, at 4:35 PM, Yunus Olgun <yunolgun@gmail.com <ma...@gmail.com>> wrote:
>>>>>> 
>>>>>> Hi,
>>>>>> 
>>>>>> I have a simple streaming job such as:
>>>>>> 
>>>>>> source.process(taskA)
>>>>>>           .process(taskB)
>>>>>> 
>>>>>> I want taskB to access minimum watermark of all parallel taskA instances, but the data is ordered and should not be shuffled. ForwardPartitioner uses watermark of only one predecessor. So, I have used a customPartitioner.
>>>>>> 
>>>>>> source.process(taskA)
>>>>>>           .map(AssignPartitionID)
>>>>>>           .partitionCustom(IdPartitioner)
>>>>>>           .map(StripPartitionID)
>>>>>>           .process(taskB)
>>>>>> 
>>>>>> At AssignPartitionID function, I attach getRuntimeContext().getIndexOfThisSubtask() as a partitionId to the object. At IdPartitioner, I return this partitionId.
>>>>>> 
>>>>>> This solved the main requirement but I have another concern now,
>>>>>> 
>>>>>> Network shuffle: I don’t need a network shuffle. I thought within a taskmanager, indexId of taskA subtasks would be same as indexId of taskB subtasks. Unfortunately, they are not. Is there a way to make partitionCustom distribute data like ForwardPartitioner, to the next local operator? 
>>>>>> 
>>>>>> As I know, this still requires object serialization/deserialization since operators can’t be chained anymore. Is there a way to get minimum watermark from upstream operators without network shuffle and object serilization/deserialization?
>>>>>> 
>>>>>> Regards,
>>>>> 
>>>> 
>>> 
>> 
> 


Re: CustomPartitioner that simulates ForwardPartitioner and watermarks

Posted by Yunus Olgun <yu...@gmail.com>.
Hi Kostas, Aljoscha,

To answer Kostas’s concern, the algorithm works this way:

Let’s say we have two sources Source-0 and Source-1. Source-0 is slow and Source-1 is fast. Sources read from Kafka at different paces. Threshold is 10 time units.

1st cycle: Source-0 sends records with timestamp 1,2 and emit watermark 2. Throttle-0 has WM 2.
                Source-1 sends records with timestamp 1,2,3 and emit watermark 3. Throttle-1 has also WM 2.
.
.
.
10th cycle: Source-0 sends records with timestamp 19, 20 and emit watermark 20. Throttle-0 has WM 20.
                  Source-1 sends records with timestamp 28, 29, 30 and emit watermark 30. Throttle-1 has also WM 20.

11th cycle: Source-0 sends records with timestamp 21,22 and emit watermark 22. Throttle-0 has WM 22.
                  Source-1 sends records with timestamp 31,32,33 and emit watermark 33. Since, Throttle-1 has a WM of 20 at the beginning of the cycle ,it will start sleeping a very short amount of time for each incoming record. This eventually causes a backpressure to Source-1 and only Source-1. Source-1 starts to poll less frequently from Kafka.

For this algorithm to work each Throttler should receive records from only one source. Otherwise backpressure will be applied to both sources. I achive that using a custom partitioner and indexIds. Everything that comes from Source-n goes to Throttler-n. Since it is a custom partitioner watermarks gets broadcasted to all throttlers.

The problem is I thought Source-0 and Throttler-0 will be colocated in the same taskmanager. Unfortunately this is not the case. Source-0 and Throttler-1 can end up in TM-0; Source-1 and Throttler-0 at TM-1. This causes a network shuffle, one more data serialization/deserialization. I want to avoid that if it is possible, since the stream is big.

Regards,  
 
> On 28. Sep 2017, at 23:03, Aljoscha Krettek <al...@apache.org> wrote:
> 
> To quickly make Kostas' intuition concrete: it's currently not possible to have watermarks broadcast but the data be locally forwarded. The reason is that watermarks and data travel in the same channels so if the watermark needs to be broadcast there needs to be an n to m (in this case m == n) connection pattern between the operations (tasks).
> 
> I think your algorithm should work if you take the correct difference, i.e. throttle when timestamp - "global watermark" > threshold. The inverted diff would be "global watermark" - timestamp. I think you're already doing the correct thing, just wanted to clarify for others who might be reading.
> 
> Did you check on which TaskManagers the taskA and taskB operators run? I think they should still be running on the same TM if resources permit.
> 
> Best,
> Aljoscha
>> On 28. Sep 2017, at 10:25, Kostas Kloudas <k.kloudas@data-artisans.com <ma...@data-artisans.com>> wrote:
>> 
>> Hi Yunus,
>> 
>> I see. Currently I am not sure that you can simply broadcast the watermark only, without 
>> having a shuffle.
>> 
>> But one thing to notice about your algorithm is that, I am not sure if your algorithm solves 
>> the problem you encounter.
>> 
>> Your algorithm seems to prioritize the stream with the elements with the smallest timestamps,
>> rather than throttling fast streams so that slow ones can catch up.
>> 
>> Example: Reading a partition from Kafka that has elements with timestamps 1,2,3
>> will emit watermark 3 (assuming ascending watermark extractor), while another task that reads 
>> another partition with elements with timestamps 5,6,7 will emit watermark 7. With your algorithm, 
>> if I get it right, you will throttle the second partition/task, while allow the first one to advance, although
>> both read at the same pace (e.g. 3 elements per unit of time).
>> 
>> I will think a bit more on the solution. 
>> 
>> Some sketches that I can find, they all introduce some latency, e.g. measuring throughput in taskA
>> and sending it to a side output with a taksID, then broadcasting the side output to a downstream operator
>> which is sth like a coprocess function (taskB) and receives the original stream and the side output, and 
>> this is the one that checks if “my task" is slow. 
>> 
>> As I said I will think on it a bit more,
>> Kostas
>> 
>>> On Sep 27, 2017, at 6:32 PM, Yunus Olgun <yunolgun@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Hi Kostas,
>>> 
>>> Yes, you have summarized well. I want to only forward the data to the next local operator, but broadcast the watermark through the cluster.
>>> 
>>> - I can’t set parallelism of taskB to 1. The stream is too big for that. Also, the data is ordered at each partition. I don’t want to change that order.
>>> 
>>> - I don’t need KeyedStream. Also taskA and taskB will always have the same parallelism with each other. But this parallelism can be increased in the future.
>>> 
>>> The use case is: The source is Kafka. At our peak hours or when we want to run the streaming job with old data from Kafka, always the same thing happens. Even at trivial jobs. Some consumers consumes faster than others. They produce too much data to downstream but watermark advances slowly at the speed of the slowest consumer. This extra data gets piled up at downstream operators. When the downstream operator is an aggregation, it is ok. But when it is a in-Flink join; state size gets too big, checkpoints take much longer and overall the job becomes slower or fails. Also it effects other jobs at the cluster.
>>> 
>>> So, basically I want to implement a throttler. It compares timestamp of a record and the global watermark. If the difference is larger than a constant threshold it starts sleeping 1 ms for each incoming record. This way, fast operators wait for the slowest one.
>>> 
>>> The only problem is that, this solution came at the cost of one network shuffle and data serialization/deserialization. Since the stream is large I want to avoid the network shuffle at the least. 
>>> 
>>> I thought operator instances within a taskmanager would get the same indexId, but apparently this is not the case.
>>> 
>>> Thanks,
>>> 
>>>> On 27. Sep 2017, at 17:16, Kostas Kloudas <k.kloudas@data-artisans.com <ma...@data-artisans.com>> wrote:
>>>> 
>>>> Hi Yunus,
>>>> 
>>>> I am not sure if I understand correctly the question.
>>>> 
>>>> Am I correct to assume that you want the following?
>>>> 
>>>> 				———————————> time
>>>> 
>>>> 		ProcessA						ProcessB
>>>> 
>>>> Task1: W(3) E(1) E(2) E(5)			W(3) W(7) E(1) E(2) E(5)
>>>> 
>>>> Task2: W(7) E(3) E(10) E(6)			W(3) W(7) E(3) E(10) E(6)
>>>> 
>>>> 
>>>> In the above, elements flow from left to right and W() stands for watermark and E() stands for element.
>>>> In other words, between Process(TaksA) and Process(TaskB) you want to only forward the elements, but broadcast the watermarks, right?
>>>> 
>>>> If this is the case, a trivial solution would be to set the parallelism of TaskB to 1, so that all elements go through the same node.
>>>> 
>>>> One other solution is what you did, BUT by using a custom partitioner you cannot use keyed state in your process function B because the 
>>>> stream is no longer keyed.
>>>> 
>>>> A similar approach to what you did but without the limitation above, is that in the first processFunction (TaskA) you can append the 
>>>> taskId to the elements themselves and then do a keyBy(taskId) between the first and the second process function.
>>>> 
>>>> These are the solutions that I can come up with, assuming that you want to do what I described.
>>>> 
>>>> But in general, could you please describe a bit more what is your use case? 
>>>> This way we may figure out another approach to achieve your goal. 
>>>> In fact, I am not sure if you earn anything by broadcasting the watermark, other than 
>>>> re-implementing (to some extent) Flink’s windowing mechanism.
>>>> 
>>>> Thanks,
>>>> Kostas
>>>> 
>>>>> On Sep 27, 2017, at 4:35 PM, Yunus Olgun <yunolgun@gmail.com <ma...@gmail.com>> wrote:
>>>>> 
>>>>> Hi,
>>>>> 
>>>>> I have a simple streaming job such as:
>>>>> 
>>>>> source.process(taskA)
>>>>>           .process(taskB)
>>>>> 
>>>>> I want taskB to access minimum watermark of all parallel taskA instances, but the data is ordered and should not be shuffled. ForwardPartitioner uses watermark of only one predecessor. So, I have used a customPartitioner.
>>>>> 
>>>>> source.process(taskA)
>>>>>           .map(AssignPartitionID)
>>>>>           .partitionCustom(IdPartitioner)
>>>>>           .map(StripPartitionID)
>>>>>           .process(taskB)
>>>>> 
>>>>> At AssignPartitionID function, I attach getRuntimeContext().getIndexOfThisSubtask() as a partitionId to the object. At IdPartitioner, I return this partitionId.
>>>>> 
>>>>> This solved the main requirement but I have another concern now,
>>>>> 
>>>>> Network shuffle: I don’t need a network shuffle. I thought within a taskmanager, indexId of taskA subtasks would be same as indexId of taskB subtasks. Unfortunately, they are not. Is there a way to make partitionCustom distribute data like ForwardPartitioner, to the next local operator? 
>>>>> 
>>>>> As I know, this still requires object serialization/deserialization since operators can’t be chained anymore. Is there a way to get minimum watermark from upstream operators without network shuffle and object serilization/deserialization?
>>>>> 
>>>>> Regards,
>>>> 
>>> 
>> 
> 


Re: CustomPartitioner that simulates ForwardPartitioner and watermarks

Posted by Aljoscha Krettek <al...@apache.org>.
To quickly make Kostas' intuition concrete: it's currently not possible to have watermarks broadcast but the data be locally forwarded. The reason is that watermarks and data travel in the same channels so if the watermark needs to be broadcast there needs to be an n to m (in this case m == n) connection pattern between the operations (tasks).

I think your algorithm should work if you take the correct difference, i.e. throttle when timestamp - "global watermark" > threshold. The inverted diff would be "global watermark" - timestamp. I think you're already doing the correct thing, just wanted to clarify for others who might be reading.

Did you check on which TaskManagers the taskA and taskB operators run? I think they should still be running on the same TM if resources permit.

Best,
Aljoscha
> On 28. Sep 2017, at 10:25, Kostas Kloudas <k....@data-artisans.com> wrote:
> 
> Hi Yunus,
> 
> I see. Currently I am not sure that you can simply broadcast the watermark only, without 
> having a shuffle.
> 
> But one thing to notice about your algorithm is that, I am not sure if your algorithm solves 
> the problem you encounter.
> 
> Your algorithm seems to prioritize the stream with the elements with the smallest timestamps,
> rather than throttling fast streams so that slow ones can catch up.
> 
> Example: Reading a partition from Kafka that has elements with timestamps 1,2,3
> will emit watermark 3 (assuming ascending watermark extractor), while another task that reads 
> another partition with elements with timestamps 5,6,7 will emit watermark 7. With your algorithm, 
> if I get it right, you will throttle the second partition/task, while allow the first one to advance, although
> both read at the same pace (e.g. 3 elements per unit of time).
> 
> I will think a bit more on the solution. 
> 
> Some sketches that I can find, they all introduce some latency, e.g. measuring throughput in taskA
> and sending it to a side output with a taksID, then broadcasting the side output to a downstream operator
> which is sth like a coprocess function (taskB) and receives the original stream and the side output, and 
> this is the one that checks if “my task" is slow. 
> 
> As I said I will think on it a bit more,
> Kostas
> 
>> On Sep 27, 2017, at 6:32 PM, Yunus Olgun <yunolgun@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hi Kostas,
>> 
>> Yes, you have summarized well. I want to only forward the data to the next local operator, but broadcast the watermark through the cluster.
>> 
>> - I can’t set parallelism of taskB to 1. The stream is too big for that. Also, the data is ordered at each partition. I don’t want to change that order.
>> 
>> - I don’t need KeyedStream. Also taskA and taskB will always have the same parallelism with each other. But this parallelism can be increased in the future.
>> 
>> The use case is: The source is Kafka. At our peak hours or when we want to run the streaming job with old data from Kafka, always the same thing happens. Even at trivial jobs. Some consumers consumes faster than others. They produce too much data to downstream but watermark advances slowly at the speed of the slowest consumer. This extra data gets piled up at downstream operators. When the downstream operator is an aggregation, it is ok. But when it is a in-Flink join; state size gets too big, checkpoints take much longer and overall the job becomes slower or fails. Also it effects other jobs at the cluster.
>> 
>> So, basically I want to implement a throttler. It compares timestamp of a record and the global watermark. If the difference is larger than a constant threshold it starts sleeping 1 ms for each incoming record. This way, fast operators wait for the slowest one.
>> 
>> The only problem is that, this solution came at the cost of one network shuffle and data serialization/deserialization. Since the stream is large I want to avoid the network shuffle at the least. 
>> 
>> I thought operator instances within a taskmanager would get the same indexId, but apparently this is not the case.
>> 
>> Thanks,
>> 
>>> On 27. Sep 2017, at 17:16, Kostas Kloudas <k.kloudas@data-artisans.com <ma...@data-artisans.com>> wrote:
>>> 
>>> Hi Yunus,
>>> 
>>> I am not sure if I understand correctly the question.
>>> 
>>> Am I correct to assume that you want the following?
>>> 
>>> 				———————————> time
>>> 
>>> 		ProcessA						ProcessB
>>> 
>>> Task1: W(3) E(1) E(2) E(5)			W(3) W(7) E(1) E(2) E(5)
>>> 
>>> Task2: W(7) E(3) E(10) E(6)			W(3) W(7) E(3) E(10) E(6)
>>> 
>>> 
>>> In the above, elements flow from left to right and W() stands for watermark and E() stands for element.
>>> In other words, between Process(TaksA) and Process(TaskB) you want to only forward the elements, but broadcast the watermarks, right?
>>> 
>>> If this is the case, a trivial solution would be to set the parallelism of TaskB to 1, so that all elements go through the same node.
>>> 
>>> One other solution is what you did, BUT by using a custom partitioner you cannot use keyed state in your process function B because the 
>>> stream is no longer keyed.
>>> 
>>> A similar approach to what you did but without the limitation above, is that in the first processFunction (TaskA) you can append the 
>>> taskId to the elements themselves and then do a keyBy(taskId) between the first and the second process function.
>>> 
>>> These are the solutions that I can come up with, assuming that you want to do what I described.
>>> 
>>> But in general, could you please describe a bit more what is your use case? 
>>> This way we may figure out another approach to achieve your goal. 
>>> In fact, I am not sure if you earn anything by broadcasting the watermark, other than 
>>> re-implementing (to some extent) Flink’s windowing mechanism.
>>> 
>>> Thanks,
>>> Kostas
>>> 
>>>> On Sep 27, 2017, at 4:35 PM, Yunus Olgun <yunolgun@gmail.com <ma...@gmail.com>> wrote:
>>>> 
>>>> Hi,
>>>> 
>>>> I have a simple streaming job such as:
>>>> 
>>>> source.process(taskA)
>>>>           .process(taskB)
>>>> 
>>>> I want taskB to access minimum watermark of all parallel taskA instances, but the data is ordered and should not be shuffled. ForwardPartitioner uses watermark of only one predecessor. So, I have used a customPartitioner.
>>>> 
>>>> source.process(taskA)
>>>>           .map(AssignPartitionID)
>>>>           .partitionCustom(IdPartitioner)
>>>>           .map(StripPartitionID)
>>>>           .process(taskB)
>>>> 
>>>> At AssignPartitionID function, I attach getRuntimeContext().getIndexOfThisSubtask() as a partitionId to the object. At IdPartitioner, I return this partitionId.
>>>> 
>>>> This solved the main requirement but I have another concern now,
>>>> 
>>>> Network shuffle: I don’t need a network shuffle. I thought within a taskmanager, indexId of taskA subtasks would be same as indexId of taskB subtasks. Unfortunately, they are not. Is there a way to make partitionCustom distribute data like ForwardPartitioner, to the next local operator? 
>>>> 
>>>> As I know, this still requires object serialization/deserialization since operators can’t be chained anymore. Is there a way to get minimum watermark from upstream operators without network shuffle and object serilization/deserialization?
>>>> 
>>>> Regards,
>>> 
>> 
> 


Re: CustomPartitioner that simulates ForwardPartitioner and watermarks

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Yunus,

I see. Currently I am not sure that you can simply broadcast the watermark only, without 
having a shuffle.

But one thing to notice about your algorithm is that, I am not sure if your algorithm solves 
the problem you encounter.

Your algorithm seems to prioritize the stream with the elements with the smallest timestamps,
rather than throttling fast streams so that slow ones can catch up.

Example: Reading a partition from Kafka that has elements with timestamps 1,2,3
will emit watermark 3 (assuming ascending watermark extractor), while another task that reads 
another partition with elements with timestamps 5,6,7 will emit watermark 7. With your algorithm, 
if I get it right, you will throttle the second partition/task, while allow the first one to advance, although
both read at the same pace (e.g. 3 elements per unit of time).

I will think a bit more on the solution. 

Some sketches that I can find, they all introduce some latency, e.g. measuring throughput in taskA
and sending it to a side output with a taksID, then broadcasting the side output to a downstream operator
which is sth like a coprocess function (taskB) and receives the original stream and the side output, and 
this is the one that checks if “my task" is slow. 

As I said I will think on it a bit more,
Kostas

> On Sep 27, 2017, at 6:32 PM, Yunus Olgun <yu...@gmail.com> wrote:
> 
> Hi Kostas,
> 
> Yes, you have summarized well. I want to only forward the data to the next local operator, but broadcast the watermark through the cluster.
> 
> - I can’t set parallelism of taskB to 1. The stream is too big for that. Also, the data is ordered at each partition. I don’t want to change that order.
> 
> - I don’t need KeyedStream. Also taskA and taskB will always have the same parallelism with each other. But this parallelism can be increased in the future.
> 
> The use case is: The source is Kafka. At our peak hours or when we want to run the streaming job with old data from Kafka, always the same thing happens. Even at trivial jobs. Some consumers consumes faster than others. They produce too much data to downstream but watermark advances slowly at the speed of the slowest consumer. This extra data gets piled up at downstream operators. When the downstream operator is an aggregation, it is ok. But when it is a in-Flink join; state size gets too big, checkpoints take much longer and overall the job becomes slower or fails. Also it effects other jobs at the cluster.
> 
> So, basically I want to implement a throttler. It compares timestamp of a record and the global watermark. If the difference is larger than a constant threshold it starts sleeping 1 ms for each incoming record. This way, fast operators wait for the slowest one.
> 
> The only problem is that, this solution came at the cost of one network shuffle and data serialization/deserialization. Since the stream is large I want to avoid the network shuffle at the least. 
> 
> I thought operator instances within a taskmanager would get the same indexId, but apparently this is not the case.
> 
> Thanks,
> 
>> On 27. Sep 2017, at 17:16, Kostas Kloudas <k.kloudas@data-artisans.com <ma...@data-artisans.com>> wrote:
>> 
>> Hi Yunus,
>> 
>> I am not sure if I understand correctly the question.
>> 
>> Am I correct to assume that you want the following?
>> 
>> 				———————————> time
>> 
>> 		ProcessA						ProcessB
>> 
>> Task1: W(3) E(1) E(2) E(5)			W(3) W(7) E(1) E(2) E(5)
>> 
>> Task2: W(7) E(3) E(10) E(6)			W(3) W(7) E(3) E(10) E(6)
>> 
>> 
>> In the above, elements flow from left to right and W() stands for watermark and E() stands for element.
>> In other words, between Process(TaksA) and Process(TaskB) you want to only forward the elements, but broadcast the watermarks, right?
>> 
>> If this is the case, a trivial solution would be to set the parallelism of TaskB to 1, so that all elements go through the same node.
>> 
>> One other solution is what you did, BUT by using a custom partitioner you cannot use keyed state in your process function B because the 
>> stream is no longer keyed.
>> 
>> A similar approach to what you did but without the limitation above, is that in the first processFunction (TaskA) you can append the 
>> taskId to the elements themselves and then do a keyBy(taskId) between the first and the second process function.
>> 
>> These are the solutions that I can come up with, assuming that you want to do what I described.
>> 
>> But in general, could you please describe a bit more what is your use case? 
>> This way we may figure out another approach to achieve your goal. 
>> In fact, I am not sure if you earn anything by broadcasting the watermark, other than 
>> re-implementing (to some extent) Flink’s windowing mechanism.
>> 
>> Thanks,
>> Kostas
>> 
>>> On Sep 27, 2017, at 4:35 PM, Yunus Olgun <yunolgun@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Hi,
>>> 
>>> I have a simple streaming job such as:
>>> 
>>> source.process(taskA)
>>>           .process(taskB)
>>> 
>>> I want taskB to access minimum watermark of all parallel taskA instances, but the data is ordered and should not be shuffled. ForwardPartitioner uses watermark of only one predecessor. So, I have used a customPartitioner.
>>> 
>>> source.process(taskA)
>>>           .map(AssignPartitionID)
>>>           .partitionCustom(IdPartitioner)
>>>           .map(StripPartitionID)
>>>           .process(taskB)
>>> 
>>> At AssignPartitionID function, I attach getRuntimeContext().getIndexOfThisSubtask() as a partitionId to the object. At IdPartitioner, I return this partitionId.
>>> 
>>> This solved the main requirement but I have another concern now,
>>> 
>>> Network shuffle: I don’t need a network shuffle. I thought within a taskmanager, indexId of taskA subtasks would be same as indexId of taskB subtasks. Unfortunately, they are not. Is there a way to make partitionCustom distribute data like ForwardPartitioner, to the next local operator? 
>>> 
>>> As I know, this still requires object serialization/deserialization since operators can’t be chained anymore. Is there a way to get minimum watermark from upstream operators without network shuffle and object serilization/deserialization?
>>> 
>>> Regards,
>> 
> 


Re: CustomPartitioner that simulates ForwardPartitioner and watermarks

Posted by Yunus Olgun <yu...@gmail.com>.
Hi Kostas,

Yes, you have summarized well. I want to only forward the data to the next local operator, but broadcast the watermark through the cluster.

- I can’t set parallelism of taskB to 1. The stream is too big for that. Also, the data is ordered at each partition. I don’t want to change that order.

- I don’t need KeyedStream. Also taskA and taskB will always have the same parallelism with each other. But this parallelism can be increased in the future.

The use case is: The source is Kafka. At our peak hours or when we want to run the streaming job with old data from Kafka, always the same thing happens. Even at trivial jobs. Some consumers consumes faster than others. They produce too much data to downstream but watermark advances slowly at the speed of the slowest consumer. This extra data gets piled up at downstream operators. When the downstream operator is an aggregation, it is ok. But when it is a in-Flink join; state size gets too big, checkpoints take much longer and overall the job becomes slower or fails. Also it effects other jobs at the cluster.

So, basically I want to implement a throttler. It compares timestamp of a record and the global watermark. If the difference is larger than a constant threshold it starts sleeping 1 ms for each incoming record. This way, fast operators wait for the slowest one.

The only problem is that, this solution came at the cost of one network shuffle and data serialization/deserialization. Since the stream is large I want to avoid the network shuffle at the least. 

I thought operator instances within a taskmanager would get the same indexId, but apparently this is not the case.

Thanks,

> On 27. Sep 2017, at 17:16, Kostas Kloudas <k....@data-artisans.com> wrote:
> 
> Hi Yunus,
> 
> I am not sure if I understand correctly the question.
> 
> Am I correct to assume that you want the following?
> 
> 				———————————> time
> 
> 		ProcessA						ProcessB
> 
> Task1: W(3) E(1) E(2) E(5)			W(3) W(7) E(1) E(2) E(5)
> 
> Task2: W(7) E(3) E(10) E(6)			W(3) W(7) E(3) E(10) E(6)
> 
> 
> In the above, elements flow from left to right and W() stands for watermark and E() stands for element.
> In other words, between Process(TaksA) and Process(TaskB) you want to only forward the elements, but broadcast the watermarks, right?
> 
> If this is the case, a trivial solution would be to set the parallelism of TaskB to 1, so that all elements go through the same node.
> 
> One other solution is what you did, BUT by using a custom partitioner you cannot use keyed state in your process function B because the 
> stream is no longer keyed.
> 
> A similar approach to what you did but without the limitation above, is that in the first processFunction (TaskA) you can append the 
> taskId to the elements themselves and then do a keyBy(taskId) between the first and the second process function.
> 
> These are the solutions that I can come up with, assuming that you want to do what I described.
> 
> But in general, could you please describe a bit more what is your use case? 
> This way we may figure out another approach to achieve your goal. 
> In fact, I am not sure if you earn anything by broadcasting the watermark, other than 
> re-implementing (to some extent) Flink’s windowing mechanism.
> 
> Thanks,
> Kostas
> 
>> On Sep 27, 2017, at 4:35 PM, Yunus Olgun <yunolgun@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hi,
>> 
>> I have a simple streaming job such as:
>> 
>> source.process(taskA)
>>           .process(taskB)
>> 
>> I want taskB to access minimum watermark of all parallel taskA instances, but the data is ordered and should not be shuffled. ForwardPartitioner uses watermark of only one predecessor. So, I have used a customPartitioner.
>> 
>> source.process(taskA)
>>           .map(AssignPartitionID)
>>           .partitionCustom(IdPartitioner)
>>           .map(StripPartitionID)
>>           .process(taskB)
>> 
>> At AssignPartitionID function, I attach getRuntimeContext().getIndexOfThisSubtask() as a partitionId to the object. At IdPartitioner, I return this partitionId.
>> 
>> This solved the main requirement but I have another concern now,
>> 
>> Network shuffle: I don’t need a network shuffle. I thought within a taskmanager, indexId of taskA subtasks would be same as indexId of taskB subtasks. Unfortunately, they are not. Is there a way to make partitionCustom distribute data like ForwardPartitioner, to the next local operator? 
>> 
>> As I know, this still requires object serialization/deserialization since operators can’t be chained anymore. Is there a way to get minimum watermark from upstream operators without network shuffle and object serilization/deserialization?
>> 
>> Regards,
> 


Re: CustomPartitioner that simulates ForwardPartitioner and watermarks

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Yunus,

I am not sure if I understand correctly the question.

Am I correct to assume that you want the following?

				———————————> time

		ProcessA						ProcessB

Task1: W(3) E(1) E(2) E(5)			W(3) W(7) E(1) E(2) E(5)

Task2: W(7) E(3) E(10) E(6)			W(3) W(7) E(3) E(10) E(6)


In the above, elements flow from left to right and W() stands for watermark and E() stands for element.
In other words, between Process(TaksA) and Process(TaskB) you want to only forward the elements, but broadcast the watermarks, right?

If this is the case, a trivial solution would be to set the parallelism of TaskB to 1, so that all elements go through the same node.

One other solution is what you did, BUT by using a custom partitioner you cannot use keyed state in your process function B because the 
stream is no longer keyed.

A similar approach to what you did but without the limitation above, is that in the first processFunction (TaskA) you can append the 
taskId to the elements themselves and then do a keyBy(taskId) between the first and the second process function.

These are the solutions that I can come up with, assuming that you want to do what I described.

But in general, could you please describe a bit more what is your use case? 
This way we may figure out another approach to achieve your goal. 
In fact, I am not sure if you earn anything by broadcasting the watermark, other than 
re-implementing (to some extent) Flink’s windowing mechanism.

Thanks,
Kostas

> On Sep 27, 2017, at 4:35 PM, Yunus Olgun <yu...@gmail.com> wrote:
> 
> Hi,
> 
> I have a simple streaming job such as:
> 
> source.process(taskA)
>           .process(taskB)
> 
> I want taskB to access minimum watermark of all parallel taskA instances, but the data is ordered and should not be shuffled. ForwardPartitioner uses watermark of only one predecessor. So, I have used a customPartitioner.
> 
> source.process(taskA)
>           .map(AssignPartitionID)
>           .partitionCustom(IdPartitioner)
>           .map(StripPartitionID)
>           .process(taskB)
> 
> At AssignPartitionID function, I attach getRuntimeContext().getIndexOfThisSubtask() as a partitionId to the object. At IdPartitioner, I return this partitionId.
> 
> This solved the main requirement but I have another concern now,
> 
> Network shuffle: I don’t need a network shuffle. I thought within a taskmanager, indexId of taskA subtasks would be same as indexId of taskB subtasks. Unfortunately, they are not. Is there a way to make partitionCustom distribute data like ForwardPartitioner, to the next local operator? 
> 
> As I know, this still requires object serialization/deserialization since operators can’t be chained anymore. Is there a way to get minimum watermark from upstream operators without network shuffle and object serilization/deserialization?
> 
> Regards,