You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Zhijiang(wangzhijiang999)" <wa...@aliyun.com.INVALID> on 2018/10/15 10:02:22 UTC

回复:[DISCUSS] Improve broadcast serialization

Let's come back to this discussion again.

Thanks for @Nico Kruber and @Piotr Nowojski  reviewing the PR of proposed 2.1 for serialization only once below and it is already merged into branch.

For proposed 2.2 for copy only once, we verified it is also very important for batch job related with join operation in our benchmark, so we want to focus on improving it based on 2.1.
Considering the initial conclusion before, we want to finish current broadcasting shared BufferBuilder when triggering non-broadcast operation which will request new separate BufferBuilder. But I am just wondering if the switch is frequent between broadcasting and non-broadcasting operations, the BufferBuilder may be filled with few data resulting in low resource utilization which may cause regression in special cases. And as long as one channel has not consumed the data by network transfer, this shared BufferBuilder can not be recycled.

Another raw idea is if we support one BufferBuilder shared by all the channels has different data regions for differnet partition indexes, that means one subpartition has non-continuous data distribution in the BufferBuilder. We can use skip index list to identify which regions belong to sepecific subpartiiton which can avoid finishing BufferBuilder during mode switch. But it seems more complicated with current process and i am not sure whether it has other performance concerns.

Wish any feedbacks for this issue, then i can further focus on it. 

Best,
Zhijiang


------------------------------------------------------------------
发件人:Zhijiang(wangzhijiang999) <wa...@aliyun.com.INVALID>
发送时间:2018年7月20日(星期五) 13:21
收件人:Piotr Nowojski <pi...@data-artisans.com>
抄 送:Nico Kruber <ni...@data-artisans.com>; dev <de...@flink.apache.org>
主 题:回复:[DISCUSS] Improve broadcast serialization

Ok, that is fine. :)

I will create JIRA today and submit the PR next week.

Zhijiang
------------------------------------------------------------------
发件人:Piotr Nowojski <pi...@data-artisans.com>
发送时间:2018年7月19日(星期四) 17:52
收件人:Zhijiang(wangzhijiang999) <wa...@aliyun.com>
抄 送:Nico Kruber <ni...@data-artisans.com>; dev <de...@flink.apache.org>
主 题:Re: [DISCUSS] Improve broadcast serialization

Hi,

I have only noticed your second response after sending my email :) 

Ok, now I think we are on the same page :) I think you can work on 2.1 and later on 2.2 if you will think that 2.1 is not enough. Once you create a Jira issues/PRs please CC me.

Piotrek  

On 19 Jul 2018, at 04:51, Zhijiang(wangzhijiang999) <wa...@aliyun.com> wrote:
Hi Piotr

1. I agree with we should discuss higher level first and focus on implementation on jira/pr. As long as RecordSerializer does not maintain the BufferBuilder, it can become stateless, then it can get BufferBuilderfrom the RecordWriter at any time.  And I think it is the precondition to improve serializing only once for multi channels, otherwise we have to select serializer based on target channel index.

2. I already corrected this thought in last reply, maybe you have not seen it before you reply. :)  
We can break the broadcast improvement into two steps:
2.1 Serialize the record into temporary byte buffer only once for multi selected channels. (currently serialize many times)
2.2 Copy the temporary byte buffer into BufferBuilder only once and create different BufferConsumers based on the same BufferBuilder for each channel. (currently copy many times)
Regarding 2.1, just the same as your proposal[c], it is worth to do currently and can get good benefits I think.
Regarding 2.2, considering mixed broadcast/non-broadcast writes, it has to flush/finish last broadcast BufferBuilder for current non-broadcast writes and vice versa. I agree with your proposal[2] for this issue, and we can further consider it in future, maybe there are other better ways for avoiding it.

4. My previous thought is to realize both above 2.1 and 2.2. The 2.1 is your proposal[c] which has no problem for mixed write mode, so no need additional flush. The 2.2 is just as your proposal[2] which concerns additional flush. Maybe my last reply make you misunderstand.

I can submit jira for above 2.1 first if no other concerns, thanks for the helpful advice. :)

Best,

Zhijiang
------------------------------------------------------------------
发件人:Piotr Nowojski <pi...@data-artisans.com>
发送时间:2018年7月18日(星期三) 20:04
收件人:Zhijiang(wangzhijiang999) <wa...@aliyun.com>; Nico Kruber <ni...@data-artisans.com>
抄 送:dev <de...@flink.apache.org>
主 题:Re: [DISCUSS] Improve broadcast serialization


Hi 

1. I want to define a new AbstractRecordWriter as base class which defines some abstract methods and utility codes. The current RecordWriter used for other partitioner and new BroadcastRecordWriter used only for BroadcastPartitioner will both extend AbstractRecordWriter. The fields in BroadcastPartitioner are extactly as you showed below, but for current RecordWriter it also only needs one RecordSerializer if we make the RecordSerializer has no internal state.

Lets first discuss what we would like to have/implement on higher level and later focus on implementation details. Regarding making RecordSerializer stateless, there were some discussions about it previously and it was on our TODO list but I don’t remember what was holding us back. Maybe Nico will remember?


2. You pointed the key problem that how to handle `randomEmit` in BroadcastRecordWriter, and I think this process may resue the `emit` logic in current RecordWriter. Then the `emit` and `broadcastEmit` logics in BroadcastRecordWriter will serialize data only once and copy to BufferBuilder only once. So this improvement is deterministic for BroadcastPartitioner.


What logic to reuse do you have in mind? 
4. As for 'broadcastEmit` improvement in RecordWriter for non-broadcast partitioner, we can also do as you suggested in option [2], but it has to finish/flush the previous BufferBuilder generated by common `emit` operation. So it may bring bad impacts on buffer utility which was improved well in event-driven flush feature. So I am not sure whether it is worth doing `broadcastEmit` improvement in RecordWriter.


The whole point of my proposal [c] was to avoid the need to flush. Code would need a little bit more refactoring but it should look something like this:

void broadcastEmit(record):
 serializedRecord = serializer.serialize(record)
 for bufferBuilder in bufferBuilders:
 bufferBuilder.append(serializedRecord)
 // if we overfilled bufferBuilder, finish it, request new one and continue writing

void emit(record, channel)
 serializedRecord = serializer.serialize(record)
 bufferBuilders[channel].append(serializedRecord)
 // if we overfilled bufferBuilder, finish it, request new one and continue writing

I do not see here a need for additional flushes and it should be strict improvement over current code base.


I already realized the demo covering above 1,2,5 before. I can create jiras after we reach a final agreement, then maybe you can help review PR if have time. :)


Sure :)

Piotrek

Best,

Zhijiang
------------------------------------------------------------------
发件人:Piotr Nowojski <pi...@data-artisans.com>
发送时间:2018年7月18日(星期三) 16:37
收件人:dev <de...@flink.apache.org>; Zhijiang(wangzhijiang999) <wa...@aliyun.com>
主 题:Re: [DISCUSS] Improve broadcast serialization

Hi,

Couple of more thoughts

a) I’m not sure if you would have to modify current RecordWriter at all. You could extract interface from current RecordWriter and just provide two implementations: current one and BroadcastRecordWriter. I’m not sure, but it doesn’t seem like they would duplicate/share lots of code. BroadcastRecordWriter would have fields:

private final RecordSerializer<T> serializers;

private final Optional<BufferBuilder> bufferBuilder;

Compared to RecordWriter’s arrays.

b) One thing that I noticed now are latency markers and randomEmit method. It prevents us from implementing option [1]. BroadcastRecordWriter would have to flush all channels on randomEmit (as I proposed in option [2]).

c) Another option to optimise broadcast writes (or for that matter all multi channel writes), would be to serialise record only once to SpanningRecordSerializer#serializationBuffer, but copy it multiple times to separate BufferBuilders. That would save us much more then half of the overhead (serialisation is more costly compared to data copying), while we would avoid problems with uneven state of channels. There would be no problems with mixed broadcast/non broadcast writes, this option could support both of them at the same time - in other words, it would be as generic as the current one.

d) Regarding StreamRecordWriter, other option is, that it could be refactored to a class implementing extracted RecordWriter interface and being a proxy/wrapper around another RecordWriter instance:

Class StreamRecordWriter implements RecordWriter {
  private final RecordWrtier recordWriter; //either broadcast or non broadcast 
  public void foo() {
    recordWriter.foo();
  }
}

To be honest I’m not sure at the moment which one would be better [2] or [c]. In ideal world, we might want to replace current RecordWriter with [c] and after that (if that’s not enough) to implement [2] on top of [c]. 

Piotrek

> On 18 Jul 2018, at 05:36, Zhijiang(wangzhijiang999) <wa...@aliyun.com.INVALID> wrote:
> 
> Hi Piotr,
> 
> Thanks for your replies and professional suggestions!
> 
> My initial thought is just as you said in first suggestion. The current RecordWriter will emit StreamRecord to some subpartition via ChannelSelector or broadcast events/watermark to all subpartitions directly.
> If the ChannelSelector implementation is BroadcastPartitioner, then we can create a specialized BroadcastRecordWriter to handle the 'emit', 'broadcastEmit', 'broadcastEvent', etc.
> To make it seems not tricky, I want to abstract the RecordWriter as a plugin, then implement a BroadcastRecordWriter and NonBroadcastRecordWriter separately to extend abstract RecordWriter. That means we divide the RecordWriter by ChannelSelector, and also we may remove current StreamRecordWriter to uniform the RecordWriter criteria in both stream and batch mode.
> 
> Considering specific implementations, I think one RecordSerializer can work for both BroadcastRecordWriter and NonBroadcastRecordWriter, but the precondition is making the RecordSerializer has no internal state, so we have to remove the BufferBuilder variable from SpanningRecordSerializer and pass it via addRecord/continueWritingWithNextBufferBuilder
> methods from RecordWriter. BroadcastRecordWriter only needs maintain one BufferBuilder for all subpartitions, and NonBroadcastRecordWriter may need maintain one BufferBuilder per subpartition.
> 
> Another issue is whether this improvement is suitable for broadcastEmit(watermark) in NonBroadcastRecordWriter as you said in suggestion 2,3. I wonder it may decrease the buffer utilization if switch between broadcast and non-broadcast modes, even it may seem more tricky in implementation. I am still thinking of it.
> 
> Maybe we can implement the improvement for BroadcastPartitioner in first step and make sure one RecordSerializer for all subpartitions. That can reduce the memory overhead in RecordSerializer and the time cost in broadcast serialization scenarios.
> 
> Best,
> 
> Zhijiang
> 
> 
> ------------------------------------------------------------------
> 发件人:Piotr Nowojski <pi...@data-artisans.com>
> 发送时间:2018年7月17日(星期二) 23:31
> 收件人:dev <de...@flink.apache.org>; Zhijiang(wangzhijiang999) <wa...@aliyun.com>
> 主 题:Re: [DISCUSS] Improve broadcast serialization
> 
> Hi
> 
> Generally speaking this would be a nice optimisation, however it might be tricky to implement. The thing to keep in mind is that currently interface allow to interleave broadcasting and normal sending, because of that at any given time some serialisers can have more data then others. For example when we have two output channels and we are looping following writes:
> 
> Write sth to 1. Channel
> Broadcast to all channels
> Write sth to 1. Channel
> Broadcast to all channels
> Write sth to 1. Channel
> Broadcast to all channels
> (…)
> 
> Thus buffers of different channels can fill out with different rates.
> 
>> In theory every record can be serialized only once and referenced for all the subpartitions in broadcast mode.
> 
> The problem here is that after records serialising, the only unit that can be referenced afterwards is “Buffer”. So that would leave us now with couple of options:
> 
> 1. Create a specialised BroadcastRecordWriter that supports ONLY broadcasting, guaranteeing that all channels always receive the same data. Here you could serialise records only once, to one BufferBuilder that could be shared and referenced by multiple BufferConsumers from different channels. Any non broadcast write would have to fail.
> 
> 2. Similar as above, but specialised in MOSTLY broadcasting. Operate as in 1. for broadcasts, but for any non broadcast write: finish current broadcasting BufferBuilder, flush all data on all channels, serialise single record to single channel using newly create BufferBuilder and also immediately finish/flush it, so that any subsequent broadcasts will work again as in 1.:
> 
> 3. Similar as 2, but lazily switch between broadcasting and non-broadcasting modes. It would have two modes of operating that could be switched back and forth: the same as currently implemented for non-broadcasted and optimised broadcast mode
> 
> Broadcast to all channels
> Broadcast to all channels
> Broadcast to all channels
> Broadcast to all channels
> Write sth to X Channel // this flushes all channels and clears/finishes previous BufferBuilder 
> Write sth to Y Channel
> Write sth to Y Channel
> Write sth to Y Channel
> Write sth to X Channel 
> Broadcast to all channels // this flushes all channels and clears/finishes previous BufferBuilders, 
> Broadcast to all channels
> Broadcast to all channels
> (…)
> 
> However both in 2. and 3. there would be very big penalty for mixing broadcast with normal writes.  
> 
> Piotrek
> 
>> On 13 Jul 2018, at 09:44, Zhijiang(wangzhijiang999) <wa...@aliyun.com.INVALID> wrote:
>> 
>> Hi all,
>> 
>> In current implementation, the RecordSerializer is created separately for each subpartition in RecordWriter, that means the number of serializers equals to the number of subpartitions.
>> For broadcast partitioner, every record will be serialized many times in all the subpartitions, and this may bring bad performance to some extent.
>> In theory every record can be serialized only once and referenced for all the subpartitions in broadcast mode.
>> 
>> To do so, I propose the following changes:
>> 1. Create and maintain only one serializer in RecordWriter, and it will serialize the record for all the subpartitions. It makes sense for any partitioners, and the memory overhead can be also decreased, because every serializer will maintain some separate byte buffers internally.
>> 2. Maybe we can abstract the RecordWriter as a base class used for other partitioner mode and implement a BroadcastRecordWriter for BroadcastPartitioner. And this new implementation will add buffer references based on the number of subpartitions before adding into subpartition queue.
>> 3. Maybe we can remove StreamRecordWriter by migrating flusher from it to RecordWriter, then the uniform RecordWriter can be used for both stream and batch. The above BroadcastRecordWriter can aslo uniform for both stream and batch.
>> 
>> I am not sure whether this improvement is proposed before and what do you think of it?
>> If necessary I can create JIRAs to contirbute it, and may need one commiter cooperate with me.
>> 
>> Best,
>> 
>> Zhijiang
> 







Re: [DISCUSS] Improve broadcast serialization

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Sounds good to me :)

Piotrek

> On 19 Oct 2018, at 08:34, Zhijiang(wangzhijiang999) <wa...@aliyun.com.INVALID> wrote:
> 
> I agree with the additional thoughts of a), b) and c).
> 
> In all the current implementations of ChannelSelector, the selector channels are either one or all, so it makes sense for change the interface as you suggested if we will not extend other selectors for partial channels in future. And the single channel implementation would reduce some overheads in arrays and loop. For broadcast selector, it is no need to retrun channels from selector and we can make a shortcut process for this special implementation.
> 
> Comparing 3 vs 5, I still prefer 3 currently which can reuse the current network process. We only create one BufferBuilder for al thel channels and build separate BufferConsumer for every channel sharing the same BufferBuilder. To do so, we just need a few changes on RecordWriter side, do not touch the following components in network stack. And it will already gain most of the performance benefits by doing so, which copies serialization temporary buffer only once to one BufferBuilder.
> 
> I can first create the JIRA for single channel interface if you have not done that before, and then continue with copying step by step. :)
> 
> Best,
> Zhijiang
> ------------------------------------------------------------------
> 发件人:Piotr Nowojski <pi...@data-artisans.com>
> 发送时间:2018年10月18日(星期四) 17:47
> 收件人:Zhijiang(wangzhijiang999) <wa...@aliyun.com>
> 抄 送:Nico Kruber <ni...@data-artisans.com>; dev <de...@flink.apache.org>
> 主 题:Re: [DISCUSS] Improve broadcast serialization
> 
> Hey,
> 
> I also think that 3rd option is the most promising, however logic of “dirty” channels might be causing some overheads. I was also thinking about other option:
> 
> 5. In case of ‘emit’ called on BroadcastRecordWriter, we could write it to common/shared BufferBuilder, but somehow marked it as targeted to only one channel - we would send it over the network to all of the receivers, but all except of one would ignore it. This might be easier to implement in BroadcastRecordWriter, but would require extra logic on the receiver side. With respect to the performance it also might be better compared to 3.
> 
> Couple of more thoughts:
> 
> a) if we select BroadcastRecordWriter, literally the only way how it can be polluted by non broadcast writes are latency markers via `randomEmit`. When choosing 3 vs 5, mixing broadcast and non broadcast happens very rarely, so we shouldn’t optimise for it, but pick something that’s easiest to implement.
> b) there are no use cases where `ChannelSelector` returns anything else besides single channel or broadcast.
> 
> b) point brings me to one more thing. I was once playing with simplifying `ChannelSelector` interface by adding new one `SingleChannelSelector` with method:
> 
> `int selectChannel(T record, int numChannels);`
> 
> And it was resulting with ~10% performance speed up for network stack alone (overhead of creating singleton arrays and iterating over them). I didn’t follow up on this, because performance gain wasn’t super huge, while it complicated `RecordWriter`, since it had to handle both either `SingleChannelSelector` or `ChannelSelector`. Now that I realised that there are no use cases for selecting more then one, but not all of the channels and that anyway we go with broadcasting, we will have to special handle `BroadcastPartitioner`, that’s the perfect occasion to actually simplify the implementation and drop this multi channel ChannelSelector.
> 
> I think we should to this as a first step in a preparation before either 3. or 5. (changing ChannelSelector signature to:
> 
> int selectChannel(T record, int numChannels);
> 
> )
> 
> What do you think?
> 
> Piotrek
> 
> On 18 Oct 2018, at 06:12, Zhijiang(wangzhijiang999) <wa...@aliyun.com> wrote:
> Hi Piotr,
> 
> Thanks for your replies and suggestions!
> 
> For my rough idea of skip index list, I agree with your concerns of performance for non-broadcast case and complicated implementation. Although I think this idea seems more unified in semantics for "emit", "broadcastEmit" and "randomEmit" APIs, maybe it is not worth going deep into it currently for global changes.
> 
> Currently RecordWriter provides three main methods to write elements in different semantics:
> 
> "broadcastEmit" would write the element to all the channels, used for watermark currently.
> "randomEmit" would write the element to one random channel, used for latency marker currently.
> "emit" would write the element to some channels via ChannelSelector, used for normal records currectly. And the selected channels may be one, some or all.
> 
> If we want to retain these APIs for different requirements, then the RecordWriter should not be aware of which kind of elements would be written via APIs, so we should not make any assumings in the implementation. In details, I know the "randomEmit" in only used for latency marker currently, but we can not confirm whether this API would be used for other elements in future, so we can not estimate how frequency is used for this API for different possiable elements which is my above concerns. I do not want to limit any future possibilities for these APIs caused by this improvement.
> 
> Considering the below suggestions:
> 
> 1.  Inserting the elements via "randomEmit" in front of unfinished broadcast buffer will change the current sequence semantic. It may be not matter for latency marker currently, but may not be extented for future other elements.
> 
> 2. If we easily implement "randomEmit" as the way of broadcast, I am wondering the broadcast storm in special cases and we also change the semantics to send the unnecessary elements for some channels.
> 
> 3.  I prefer this way currently and it is similar with our previous discussion. And the implementation is more likely the way of current "broadcastEvent", which creates a new broadcast buffer for event, and finish the current buffer for all the channels before enqueuing this event buffer.
> 
> 4. Yes, your sayings is write for current mode. And I want to pass a boolean parameter "isBroadcast" in the constructor of RecordWriter for indicating broadcast writes in specific processes, because the RecordWriter can not check ChannelSelector instance based on module dependency.
> 
> In conclusion, I want to implement this improvement based on the third point from current thoughting, which keeps the same behavior like normal "emit" mixing with "broadcastEvent".
> 
> Best,
> Zhijiang
> ------------------------------------------------------------------
> 发件人:Piotr Nowojski <pi...@data-artisans.com>
> 发送时间:2018年10月17日(星期三) 19:25
> 收件人:Zhijiang(wangzhijiang999) <wa...@aliyun.com>
> 抄 送:Nico Kruber <ni...@data-artisans.com>; dev <de...@flink.apache.org>
> 主 题:Re: [DISCUSS] Improve broadcast serialization
> 
> Hi,
> 
> Regarding the second idea with skip index list, I would guess it might have bad performance impact in non broadcasting cases or would seriously complicate our Buffer implementation. Also it would make reading/data copying/slicing and other big chunk byte operations much more costly. Instead of memcpy whole buffer we would have to manually select the correct bits.
> 
>> But I am just wondering if the switch is frequent between broadcasting and non-broadcasting operations
> 
> I haven't researched this topic any further then before. However my first guess would be that this switch doesn’t happen at all EXCEPT of `randomEmit` which is used for the latency markers (this statement requires further research/validation). Assuming that’s true.
> 
> 1. Probably we can not/should not flush the broadcasted buffer, serialise randomEmit and flush it again, because this would prematurely emit latency marker - defeating it purpose and skewing the measured time. LatencyMarkers are expected to travel through pipeline at the exact same speed as regular records would.
> 
> 2. Maybe we could just always broadcast the latency markers as well? This would be nice solution except of that at the level of RecordWriter we do not know whether this is latency marker or not - we no only that we were asked to “emit”, “randomEmit” or “broadcastEmit” and we have to handle them somehow (throwing exception?)
> 
> 3. Assuming `randomEmit` or `emit` is rare, maybe we copy the broadcasted `BufferBuilder` into a new one, append there the record/latency marker so all except of one channel would share “broadcasted” BufferBuilder. Once we need to flush any of the buffers (either broadcasted or the “dirty” one) we flush them all and restart with all channels sharing a fresh new “broadcasted” BufferBuilder? 
> 
> 4. For streaming isn't Broadcast currently realised via passing `BroadcastPartitioner` to the RecordWriter and using standard `emit` method? That would need to be changed/handled in order to optimise broadcast writes.
> 
> Piotrek
> 
> On 15 Oct 2018, at 12:02, Zhijiang(wangzhijiang999) <wa...@aliyun.com> wrote:
> Let's come back to this discussion again.
> 
> Thanks for @Nico Kruber and @Piotr Nowojski  reviewing the PR of proposed 2.1 for serialization only once below and it is already merged into branch.
> 
> For proposed 2.2 for copy only once, we verified it is also very important for batch job related with join operation in our benchmark, so we want to focus on improving it based on 2.1.
> Considering the initial conclusion before, we want to finish current broadcasting shared BufferBuilder when triggering non-broadcast operation which will request new separate BufferBuilder. But I am just wondering if the switch is frequent between broadcasting and non-broadcasting operations, the BufferBuilder may be filled with few data resulting in low resource utilization which may cause regression in special cases. And as long as one channel has not consumed the data by network transfer, this shared BufferBuilder can not be recycled.
> 
> Another raw idea is if we support one BufferBuilder shared by all the channels has different data regions for differnet partition indexes, that means one subpartition has non-continuous data distribution in the BufferBuilder. We can use skip index list to identify which regions belong to sepecific subpartiiton which can avoid finishing BufferBuilder during mode switch. But it seems more complicated with current process and i am not sure whether it has other performance concerns.
> 
> Wish any feedbacks for this issue, then i can further focus on it. 
> 
> Best,
> Zhijiang
> 
> ------------------------------------------------------------------
> 发件人:Zhijiang(wangzhijiang999) <wa...@aliyun.com.INVALID>
> 发送时间:2018年7月20日(星期五) 13:21
> 收件人:Piotr Nowojski <pi...@data-artisans.com>
> 抄 送:Nico Kruber <ni...@data-artisans.com>; dev <de...@flink.apache.org>
> 主 题:回复:[DISCUSS] Improve broadcast serialization
> 
> Ok, that is fine. :)
> 
> I will create JIRA today and submit the PR next week.
> 
> Zhijiang
> ------------------------------------------------------------------
> 发件人:Piotr Nowojski <pi...@data-artisans.com>
> 发送时间:2018年7月19日(星期四) 17:52
> 收件人:Zhijiang(wangzhijiang999) <wa...@aliyun.com>
> 抄 送:Nico Kruber <ni...@data-artisans.com>; dev <de...@flink.apache.org>
> 主 题:Re: [DISCUSS] Improve broadcast serialization
> 
> Hi,
> 
> I have only noticed your second response after sending my email :) 
> 
> Ok, now I think we are on the same page :) I think you can work on 2.1 and later on 2.2 if you will think that 2.1 is not enough. Once you create a Jira issues/PRs please CC me.
> 
> Piotrek  
> 
> On 19 Jul 2018, at 04:51, Zhijiang(wangzhijiang999) <wa...@aliyun.com> wrote:
> Hi Piotr
> 
> 1. I agree with we should discuss higher level first and focus on implementation on jira/pr. As long as RecordSerializer does not maintain the BufferBuilder, it can become stateless, then it can get BufferBuilderfrom the RecordWriter at any time.  And I think it is the precondition to improve serializing only once for multi channels, otherwise we have to select serializer based on target channel index.
> 
> 2. I already corrected this thought in last reply, maybe you have not seen it before you reply. :)  
> We can break the broadcast improvement into two steps:
> 2.1 Serialize the record into temporary byte buffer only once for multi selected channels. (currently serialize many times)
> 2.2 Copy the temporary byte buffer into BufferBuilder only once and create different BufferConsumers based on the same BufferBuilder for each channel. (currently copy many times)
> Regarding 2.1, just the same as your proposal[c], it is worth to do currently and can get good benefits I think.
> Regarding 2.2, considering mixed broadcast/non-broadcast writes, it has to flush/finish last broadcast BufferBuilder for current non-broadcast writes and vice versa. I agree with your proposal[2] for this issue, and we can further consider it in future, maybe there are other better ways for avoiding it.
> 
> 4. My previous thought is to realize both above 2.1 and 2.2. The 2.1 is your proposal[c] which has no problem for mixed write mode, so no need additional flush. The 2.2 is just as your proposal[2] which concerns additional flush. Maybe my last reply make you misunderstand.
> 
> I can submit jira for above 2.1 first if no other concerns, thanks for the helpful advice. :)
> 
> Best,
> 
> Zhijiang
> ------------------------------------------------------------------
> 发件人:Piotr Nowojski <pi...@data-artisans.com>
> 发送时间:2018年7月18日(星期三) 20:04
> 收件人:Zhijiang(wangzhijiang999) <wa...@aliyun.com>; Nico Kruber <ni...@data-artisans.com>
> 抄 送:dev <de...@flink.apache.org>
> 主 题:Re: [DISCUSS] Improve broadcast serialization
> 
> 
> Hi 
> 
> 1. I want to define a new AbstractRecordWriter as base class which defines some abstract methods and utility codes. The current RecordWriter used for other partitioner and new BroadcastRecordWriter used only for BroadcastPartitioner will both extend AbstractRecordWriter. The fields in BroadcastPartitioner are extactly as you showed below, but for current RecordWriter it also only needs one RecordSerializer if we make the RecordSerializer has no internal state.
> 
> Lets first discuss what we would like to have/implement on higher level and later focus on implementation details. Regarding making RecordSerializer stateless, there were some discussions about it previously and it was on our TODO list but I don’t remember what was holding us back. Maybe Nico will remember?
> 
> 
> 2. You pointed the key problem that how to handle `randomEmit` in BroadcastRecordWriter, and I think this process may resue the `emit` logic in current RecordWriter. Then the `emit` and `broadcastEmit` logics in BroadcastRecordWriter will serialize data only once and copy to BufferBuilder only once. So this improvement is deterministic for BroadcastPartitioner.
> 
> 
> What logic to reuse do you have in mind? 
> 4. As for 'broadcastEmit` improvement in RecordWriter for non-broadcast partitioner, we can also do as you suggested in option [2], but it has to finish/flush the previous BufferBuilder generated by common `emit` operation. So it may bring bad impacts on buffer utility which was improved well in event-driven flush feature. So I am not sure whether it is worth doing `broadcastEmit` improvement in RecordWriter.
> 
> 
> The whole point of my proposal [c] was to avoid the need to flush. Code would need a little bit more refactoring but it should look something like this:
> 
> void broadcastEmit(record):
> serializedRecord = serializer.serialize(record)
> for bufferBuilder in bufferBuilders:
> bufferBuilder.append(serializedRecord)
> // if we overfilled bufferBuilder, finish it, request new one and continue writing
> 
> void emit(record, channel)
> serializedRecord = serializer.serialize(record)
> bufferBuilders[channel].append(serializedRecord)
> // if we overfilled bufferBuilder, finish it, request new one and continue writing
> 
> I do not see here a need for additional flushes and it should be strict improvement over current code base.
> 
> 
> I already realized the demo covering above 1,2,5 before. I can create jiras after we reach a final agreement, then maybe you can help review PR if have time. :)
> 
> 
> Sure :)
> 
> Piotrek
> 
> Best,
> 
> Zhijiang
> ------------------------------------------------------------------
> 发件人:Piotr Nowojski <pi...@data-artisans.com>
> 发送时间:2018年7月18日(星期三) 16:37
> 收件人:dev <de...@flink.apache.org>; Zhijiang(wangzhijiang999) <wa...@aliyun.com>
> 主 题:Re: [DISCUSS] Improve broadcast serialization
> 
> Hi,
> 
> Couple of more thoughts
> 
> a) I’m not sure if you would have to modify current RecordWriter at all. You could extract interface from current RecordWriter and just provide two implementations: current one and BroadcastRecordWriter. I’m not sure, but it doesn’t seem like they would duplicate/share lots of code. BroadcastRecordWriter would have fields:
> 
> private final RecordSerializer<T> serializers;
> 
> private final Optional<BufferBuilder> bufferBuilder;
> 
> Compared to RecordWriter’s arrays.
> 
> b) One thing that I noticed now are latency markers and randomEmit method. It prevents us from implementing option [1]. BroadcastRecordWriter would have to flush all channels on randomEmit (as I proposed in option [2]).
> 
> c) Another option to optimise broadcast writes (or for that matter all multi channel writes), would be to serialise record only once to SpanningRecordSerializer#serializationBuffer, but copy it multiple times to separate BufferBuilders. That would save us much more then half of the overhead (serialisation is more costly compared to data copying), while we would avoid problems with uneven state of channels. There would be no problems with mixed broadcast/non broadcast writes, this option could support both of them at the same time - in other words, it would be as generic as the current one.
> 
> d) Regarding StreamRecordWriter, other option is, that it could be refactored to a class implementing extracted RecordWriter interface and being a proxy/wrapper around another RecordWriter instance:
> 
> Class StreamRecordWriter implements RecordWriter {
>  private final RecordWrtier recordWriter; //either broadcast or non broadcast 
>  public void foo() {
>    recordWriter.foo();
>  }
> }
> 
> To be honest I’m not sure at the moment which one would be better [2] or [c]. In ideal world, we might want to replace current RecordWriter with [c] and after that (if that’s not enough) to implement [2] on top of [c]. 
> 
> Piotrek
> 
>> On 18 Jul 2018, at 05:36, Zhijiang(wangzhijiang999) <wa...@aliyun.com.INVALID> wrote:
>> 
>> Hi Piotr,
>> 
>> Thanks for your replies and professional suggestions!
>> 
>> My initial thought is just as you said in first suggestion. The current RecordWriter will emit StreamRecord to some subpartition via ChannelSelector or broadcast events/watermark to all subpartitions directly.
>> If the ChannelSelector implementation is BroadcastPartitioner, then we can create a specialized BroadcastRecordWriter to handle the 'emit', 'broadcastEmit', 'broadcastEvent', etc.
>> To make it seems not tricky, I want to abstract the RecordWriter as a plugin, then implement a BroadcastRecordWriter and NonBroadcastRecordWriter separately to extend abstract RecordWriter. That means we divide the RecordWriter by ChannelSelector, and also we may remove current StreamRecordWriter to uniform the RecordWriter criteria in both stream and batch mode.
>> 
>> Considering specific implementations, I think one RecordSerializer can work for both BroadcastRecordWriter and NonBroadcastRecordWriter, but the precondition is making the RecordSerializer has no internal state, so we have to remove the BufferBuilder variable from SpanningRecordSerializer and pass it via addRecord/continueWritingWithNextBufferBuilder
>> methods from RecordWriter. BroadcastRecordWriter only needs maintain one BufferBuilder for all subpartitions, and NonBroadcastRecordWriter may need maintain one BufferBuilder per subpartition.
>> 
>> Another issue is whether this improvement is suitable for broadcastEmit(watermark) in NonBroadcastRecordWriter as you said in suggestion 2,3. I wonder it may decrease the buffer utilization if switch between broadcast and non-broadcast modes, even it may seem more tricky in implementation. I am still thinking of it.
>> 
>> Maybe we can implement the improvement for BroadcastPartitioner in first step and make sure one RecordSerializer for all subpartitions. That can reduce the memory overhead in RecordSerializer and the time cost in broadcast serialization scenarios.
>> 
>> Best,
>> 
>> Zhijiang
>> 
>> 
>> ------------------------------------------------------------------
>> 发件人:Piotr Nowojski <pi...@data-artisans.com>
>> 发送时间:2018年7月17日(星期二) 23:31
>> 收件人:dev <de...@flink.apache.org>; Zhijiang(wangzhijiang999) <wa...@aliyun.com>
>> 主 题:Re: [DISCUSS] Improve broadcast serialization
>> 
>> Hi
>> 
>> Generally speaking this would be a nice optimisation, however it might be tricky to implement. The thing to keep in mind is that currently interface allow to interleave broadcasting and normal sending, because of that at any given time some serialisers can have more data then others. For example when we have two output channels and we are looping following writes:
>> 
>> Write sth to 1. Channel
>> Broadcast to all channels
>> Write sth to 1. Channel
>> Broadcast to all channels
>> Write sth to 1. Channel
>> Broadcast to all channels
>> (…)
>> 
>> Thus buffers of different channels can fill out with different rates.
>> 
>>> In theory every record can be serialized only once and referenced for all the subpartitions in broadcast mode.
>> 
>> The problem here is that after records serialising, the only unit that can be referenced afterwards is “Buffer”. So that would leave us now with couple of options:
>> 
>> 1. Create a specialised BroadcastRecordWriter that supports ONLY broadcasting, guaranteeing that all channels always receive the same data. Here you could serialise records only once, to one BufferBuilder that could be shared and referenced by multiple BufferConsumers from different channels. Any non broadcast write would have to fail.
>> 
>> 2. Similar as above, but specialised in MOSTLY broadcasting. Operate as in 1. for broadcasts, but for any non broadcast write: finish current broadcasting BufferBuilder, flush all data on all channels, serialise single record to single channel using newly create BufferBuilder and also immediately finish/flush it, so that any subsequent broadcasts will work again as in 1.:
>> 
>> 3. Similar as 2, but lazily switch between broadcasting and non-broadcasting modes. It would have two modes of operating that could be switched back and forth: the same as currently implemented for non-broadcasted and optimised broadcast mode
>> 
>> Broadcast to all channels
>> Broadcast to all channels
>> Broadcast to all channels
>> Broadcast to all channels
>> Write sth to X Channel // this flushes all channels and clears/finishes previous BufferBuilder 
>> Write sth to Y Channel
>> Write sth to Y Channel
>> Write sth to Y Channel
>> Write sth to X Channel 
>> Broadcast to all channels // this flushes all channels and clears/finishes previous BufferBuilders, 
>> Broadcast to all channels
>> Broadcast to all channels
>> (…)
>> 
>> However both in 2. and 3. there would be very big penalty for mixing broadcast with normal writes.  
>> 
>> Piotrek
>> 
>>> On 13 Jul 2018, at 09:44, Zhijiang(wangzhijiang999) <wa...@aliyun.com.INVALID> wrote:
>>> 
>>> Hi all,
>>> 
>>> In current implementation, the RecordSerializer is created separately for each subpartition in RecordWriter, that means the number of serializers equals to the number of subpartitions.
>>> For broadcast partitioner, every record will be serialized many times in all the subpartitions, and this may bring bad performance to some extent.
>>> In theory every record can be serialized only once and referenced for all the subpartitions in broadcast mode.
>>> 
>>> To do so, I propose the following changes:
>>> 1. Create and maintain only one serializer in RecordWriter, and it will serialize the record for all the subpartitions. It makes sense for any partitioners, and the memory overhead can be also decreased, because every serializer will maintain some separate byte buffers internally.
>>> 2. Maybe we can abstract the RecordWriter as a base class used for other partitioner mode and implement a BroadcastRecordWriter for BroadcastPartitioner. And this new implementation will add buffer references based on the number of subpartitions before adding into subpartition queue.
>>> 3. Maybe we can remove StreamRecordWriter by migrating flusher from it to RecordWriter, then the uniform RecordWriter can be used for both stream and batch. The above BroadcastRecordWriter can aslo uniform for both stream and batch.
>>> 
>>> I am not sure whether this improvement is proposed before and what do you think of it?
>>> If necessary I can create JIRAs to contirbute it, and may need one commiter cooperate with me.
>>> 
>>> Best,
>>> 
>>> Zhijiang
>> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 


回复:[DISCUSS] Improve broadcast serialization

Posted by "Zhijiang(wangzhijiang999)" <wa...@aliyun.com.INVALID>.
I agree with the additional thoughts of a), b) and c).

In all the current implementations of ChannelSelector, the selector channels are either one or all, so it makes sense for change the interface as you suggested if we will not extend other selectors for partial channels in future. And the single channel implementation would reduce some overheads in arrays and loop. For broadcast selector, it is no need to retrun channels from selector and we can make a shortcut process for this special implementation.

Comparing 3 vs 5, I still prefer 3 currently which can reuse the current network process. We only create one BufferBuilder for al thel channels and build separate BufferConsumer for every channel sharing the same BufferBuilder. To do so, we just need a few changes on RecordWriter side, do not touch the following components in network stack. And it will already gain most of the performance benefits by doing so, which copies serialization temporary buffer only once to one BufferBuilder.

I can first create the JIRA for single channel interface if you have not done that before, and then continue with copying step by step. :)

Best,
Zhijiang
------------------------------------------------------------------
发件人:Piotr Nowojski <pi...@data-artisans.com>
发送时间:2018年10月18日(星期四) 17:47
收件人:Zhijiang(wangzhijiang999) <wa...@aliyun.com>
抄 送:Nico Kruber <ni...@data-artisans.com>; dev <de...@flink.apache.org>
主 题:Re: [DISCUSS] Improve broadcast serialization

Hey,

I also think that 3rd option is the most promising, however logic of “dirty” channels might be causing some overheads. I was also thinking about other option:

5. In case of ‘emit’ called on BroadcastRecordWriter, we could write it to common/shared BufferBuilder, but somehow marked it as targeted to only one channel - we would send it over the network to all of the receivers, but all except of one would ignore it. This might be easier to implement in BroadcastRecordWriter, but would require extra logic on the receiver side. With respect to the performance it also might be better compared to 3.

Couple of more thoughts:

a) if we select BroadcastRecordWriter, literally the only way how it can be polluted by non broadcast writes are latency markers via `randomEmit`. When choosing 3 vs 5, mixing broadcast and non broadcast happens very rarely, so we shouldn’t optimise for it, but pick something that’s easiest to implement.
b) there are no use cases where `ChannelSelector` returns anything else besides single channel or broadcast.

b) point brings me to one more thing. I was once playing with simplifying `ChannelSelector` interface by adding new one `SingleChannelSelector` with method:

`int selectChannel(T record, int numChannels);`

And it was resulting with ~10% performance speed up for network stack alone (overhead of creating singleton arrays and iterating over them). I didn’t follow up on this, because performance gain wasn’t super huge, while it complicated `RecordWriter`, since it had to handle both either `SingleChannelSelector` or `ChannelSelector`. Now that I realised that there are no use cases for selecting more then one, but not all of the channels and that anyway we go with broadcasting, we will have to special handle `BroadcastPartitioner`, that’s the perfect occasion to actually simplify the implementation and drop this multi channel ChannelSelector.

I think we should to this as a first step in a preparation before either 3. or 5. (changing ChannelSelector signature to:

int selectChannel(T record, int numChannels);

)

What do you think?

Piotrek

On 18 Oct 2018, at 06:12, Zhijiang(wangzhijiang999) <wa...@aliyun.com> wrote:
Hi Piotr,

Thanks for your replies and suggestions!

For my rough idea of skip index list, I agree with your concerns of performance for non-broadcast case and complicated implementation. Although I think this idea seems more unified in semantics for "emit", "broadcastEmit" and "randomEmit" APIs, maybe it is not worth going deep into it currently for global changes.

Currently RecordWriter provides three main methods to write elements in different semantics:

"broadcastEmit" would write the element to all the channels, used for watermark currently.
"randomEmit" would write the element to one random channel, used for latency marker currently.
"emit" would write the element to some channels via ChannelSelector, used for normal records currectly. And the selected channels may be one, some or all.

If we want to retain these APIs for different requirements, then the RecordWriter should not be aware of which kind of elements would be written via APIs, so we should not make any assumings in the implementation. In details, I know the "randomEmit" in only used for latency marker currently, but we can not confirm whether this API would be used for other elements in future, so we can not estimate how frequency is used for this API for different possiable elements which is my above concerns. I do not want to limit any future possibilities for these APIs caused by this improvement.

Considering the below suggestions:

1.  Inserting the elements via "randomEmit" in front of unfinished broadcast buffer will change the current sequence semantic. It may be not matter for latency marker currently, but may not be extented for future other elements.

2. If we easily implement "randomEmit" as the way of broadcast, I am wondering the broadcast storm in special cases and we also change the semantics to send the unnecessary elements for some channels.

3.  I prefer this way currently and it is similar with our previous discussion. And the implementation is more likely the way of current "broadcastEvent", which creates a new broadcast buffer for event, and finish the current buffer for all the channels before enqueuing this event buffer.

4. Yes, your sayings is write for current mode. And I want to pass a boolean parameter "isBroadcast" in the constructor of RecordWriter for indicating broadcast writes in specific processes, because the RecordWriter can not check ChannelSelector instance based on module dependency.

In conclusion, I want to implement this improvement based on the third point from current thoughting, which keeps the same behavior like normal "emit" mixing with "broadcastEvent".

Best,
Zhijiang
------------------------------------------------------------------
发件人:Piotr Nowojski <pi...@data-artisans.com>
发送时间:2018年10月17日(星期三) 19:25
收件人:Zhijiang(wangzhijiang999) <wa...@aliyun.com>
抄 送:Nico Kruber <ni...@data-artisans.com>; dev <de...@flink.apache.org>
主 题:Re: [DISCUSS] Improve broadcast serialization

Hi,

Regarding the second idea with skip index list, I would guess it might have bad performance impact in non broadcasting cases or would seriously complicate our Buffer implementation. Also it would make reading/data copying/slicing and other big chunk byte operations much more costly. Instead of memcpy whole buffer we would have to manually select the correct bits.

 > But I am just wondering if the switch is frequent between broadcasting and non-broadcasting operations

I haven't researched this topic any further then before. However my first guess would be that this switch doesn’t happen at all EXCEPT of `randomEmit` which is used for the latency markers (this statement requires further research/validation). Assuming that’s true.

1. Probably we can not/should not flush the broadcasted buffer, serialise randomEmit and flush it again, because this would prematurely emit latency marker - defeating it purpose and skewing the measured time. LatencyMarkers are expected to travel through pipeline at the exact same speed as regular records would.

2. Maybe we could just always broadcast the latency markers as well? This would be nice solution except of that at the level of RecordWriter we do not know whether this is latency marker or not - we no only that we were asked to “emit”, “randomEmit” or “broadcastEmit” and we have to handle them somehow (throwing exception?)

3. Assuming `randomEmit` or `emit` is rare, maybe we copy the broadcasted `BufferBuilder` into a new one, append there the record/latency marker so all except of one channel would share “broadcasted” BufferBuilder. Once we need to flush any of the buffers (either broadcasted or the “dirty” one) we flush them all and restart with all channels sharing a fresh new “broadcasted” BufferBuilder? 

4. For streaming isn't Broadcast currently realised via passing `BroadcastPartitioner` to the RecordWriter and using standard `emit` method? That would need to be changed/handled in order to optimise broadcast writes.

Piotrek

On 15 Oct 2018, at 12:02, Zhijiang(wangzhijiang999) <wa...@aliyun.com> wrote:
Let's come back to this discussion again.

Thanks for @Nico Kruber and @Piotr Nowojski  reviewing the PR of proposed 2.1 for serialization only once below and it is already merged into branch.

For proposed 2.2 for copy only once, we verified it is also very important for batch job related with join operation in our benchmark, so we want to focus on improving it based on 2.1.
Considering the initial conclusion before, we want to finish current broadcasting shared BufferBuilder when triggering non-broadcast operation which will request new separate BufferBuilder. But I am just wondering if the switch is frequent between broadcasting and non-broadcasting operations, the BufferBuilder may be filled with few data resulting in low resource utilization which may cause regression in special cases. And as long as one channel has not consumed the data by network transfer, this shared BufferBuilder can not be recycled.

Another raw idea is if we support one BufferBuilder shared by all the channels has different data regions for differnet partition indexes, that means one subpartition has non-continuous data distribution in the BufferBuilder. We can use skip index list to identify which regions belong to sepecific subpartiiton which can avoid finishing BufferBuilder during mode switch. But it seems more complicated with current process and i am not sure whether it has other performance concerns.

Wish any feedbacks for this issue, then i can further focus on it. 

Best,
Zhijiang

------------------------------------------------------------------
发件人:Zhijiang(wangzhijiang999) <wa...@aliyun.com.INVALID>
发送时间:2018年7月20日(星期五) 13:21
收件人:Piotr Nowojski <pi...@data-artisans.com>
抄 送:Nico Kruber <ni...@data-artisans.com>; dev <de...@flink.apache.org>
主 题:回复:[DISCUSS] Improve broadcast serialization

Ok, that is fine. :)

I will create JIRA today and submit the PR next week.

Zhijiang
------------------------------------------------------------------
发件人:Piotr Nowojski <pi...@data-artisans.com>
发送时间:2018年7月19日(星期四) 17:52
收件人:Zhijiang(wangzhijiang999) <wa...@aliyun.com>
抄 送:Nico Kruber <ni...@data-artisans.com>; dev <de...@flink.apache.org>
主 题:Re: [DISCUSS] Improve broadcast serialization

Hi,

I have only noticed your second response after sending my email :) 

Ok, now I think we are on the same page :) I think you can work on 2.1 and later on 2.2 if you will think that 2.1 is not enough. Once you create a Jira issues/PRs please CC me.

Piotrek  

On 19 Jul 2018, at 04:51, Zhijiang(wangzhijiang999) <wa...@aliyun.com> wrote:
Hi Piotr

1. I agree with we should discuss higher level first and focus on implementation on jira/pr. As long as RecordSerializer does not maintain the BufferBuilder, it can become stateless, then it can get BufferBuilderfrom the RecordWriter at any time.  And I think it is the precondition to improve serializing only once for multi channels, otherwise we have to select serializer based on target channel index.

2. I already corrected this thought in last reply, maybe you have not seen it before you reply. :)  
We can break the broadcast improvement into two steps:
2.1 Serialize the record into temporary byte buffer only once for multi selected channels. (currently serialize many times)
2.2 Copy the temporary byte buffer into BufferBuilder only once and create different BufferConsumers based on the same BufferBuilder for each channel. (currently copy many times)
Regarding 2.1, just the same as your proposal[c], it is worth to do currently and can get good benefits I think.
Regarding 2.2, considering mixed broadcast/non-broadcast writes, it has to flush/finish last broadcast BufferBuilder for current non-broadcast writes and vice versa. I agree with your proposal[2] for this issue, and we can further consider it in future, maybe there are other better ways for avoiding it.

4. My previous thought is to realize both above 2.1 and 2.2. The 2.1 is your proposal[c] which has no problem for mixed write mode, so no need additional flush. The 2.2 is just as your proposal[2] which concerns additional flush. Maybe my last reply make you misunderstand.

I can submit jira for above 2.1 first if no other concerns, thanks for the helpful advice. :)

Best,

Zhijiang
------------------------------------------------------------------
发件人:Piotr Nowojski <pi...@data-artisans.com>
发送时间:2018年7月18日(星期三) 20:04
收件人:Zhijiang(wangzhijiang999) <wa...@aliyun.com>; Nico Kruber <ni...@data-artisans.com>
抄 送:dev <de...@flink.apache.org>
主 题:Re: [DISCUSS] Improve broadcast serialization


Hi 

1. I want to define a new AbstractRecordWriter as base class which defines some abstract methods and utility codes. The current RecordWriter used for other partitioner and new BroadcastRecordWriter used only for BroadcastPartitioner will both extend AbstractRecordWriter. The fields in BroadcastPartitioner are extactly as you showed below, but for current RecordWriter it also only needs one RecordSerializer if we make the RecordSerializer has no internal state.

Lets first discuss what we would like to have/implement on higher level and later focus on implementation details. Regarding making RecordSerializer stateless, there were some discussions about it previously and it was on our TODO list but I don’t remember what was holding us back. Maybe Nico will remember?


2. You pointed the key problem that how to handle `randomEmit` in BroadcastRecordWriter, and I think this process may resue the `emit` logic in current RecordWriter. Then the `emit` and `broadcastEmit` logics in BroadcastRecordWriter will serialize data only once and copy to BufferBuilder only once. So this improvement is deterministic for BroadcastPartitioner.


What logic to reuse do you have in mind? 
4. As for 'broadcastEmit` improvement in RecordWriter for non-broadcast partitioner, we can also do as you suggested in option [2], but it has to finish/flush the previous BufferBuilder generated by common `emit` operation. So it may bring bad impacts on buffer utility which was improved well in event-driven flush feature. So I am not sure whether it is worth doing `broadcastEmit` improvement in RecordWriter.


The whole point of my proposal [c] was to avoid the need to flush. Code would need a little bit more refactoring but it should look something like this:

void broadcastEmit(record):
 serializedRecord = serializer.serialize(record)
 for bufferBuilder in bufferBuilders:
 bufferBuilder.append(serializedRecord)
 // if we overfilled bufferBuilder, finish it, request new one and continue writing

void emit(record, channel)
 serializedRecord = serializer.serialize(record)
 bufferBuilders[channel].append(serializedRecord)
 // if we overfilled bufferBuilder, finish it, request new one and continue writing

I do not see here a need for additional flushes and it should be strict improvement over current code base.


I already realized the demo covering above 1,2,5 before. I can create jiras after we reach a final agreement, then maybe you can help review PR if have time. :)


Sure :)

Piotrek

Best,

Zhijiang
------------------------------------------------------------------
发件人:Piotr Nowojski <pi...@data-artisans.com>
发送时间:2018年7月18日(星期三) 16:37
收件人:dev <de...@flink.apache.org>; Zhijiang(wangzhijiang999) <wa...@aliyun.com>
主 题:Re: [DISCUSS] Improve broadcast serialization

Hi,

Couple of more thoughts

a) I’m not sure if you would have to modify current RecordWriter at all. You could extract interface from current RecordWriter and just provide two implementations: current one and BroadcastRecordWriter. I’m not sure, but it doesn’t seem like they would duplicate/share lots of code. BroadcastRecordWriter would have fields:

private final RecordSerializer<T> serializers;

private final Optional<BufferBuilder> bufferBuilder;

Compared to RecordWriter’s arrays.

b) One thing that I noticed now are latency markers and randomEmit method. It prevents us from implementing option [1]. BroadcastRecordWriter would have to flush all channels on randomEmit (as I proposed in option [2]).

c) Another option to optimise broadcast writes (or for that matter all multi channel writes), would be to serialise record only once to SpanningRecordSerializer#serializationBuffer, but copy it multiple times to separate BufferBuilders. That would save us much more then half of the overhead (serialisation is more costly compared to data copying), while we would avoid problems with uneven state of channels. There would be no problems with mixed broadcast/non broadcast writes, this option could support both of them at the same time - in other words, it would be as generic as the current one.

d) Regarding StreamRecordWriter, other option is, that it could be refactored to a class implementing extracted RecordWriter interface and being a proxy/wrapper around another RecordWriter instance:

Class StreamRecordWriter implements RecordWriter {
  private final RecordWrtier recordWriter; //either broadcast or non broadcast 
  public void foo() {
    recordWriter.foo();
  }
}

To be honest I’m not sure at the moment which one would be better [2] or [c]. In ideal world, we might want to replace current RecordWriter with [c] and after that (if that’s not enough) to implement [2] on top of [c]. 

Piotrek

> On 18 Jul 2018, at 05:36, Zhijiang(wangzhijiang999) <wa...@aliyun.com.INVALID> wrote:
> 
> Hi Piotr,
> 
> Thanks for your replies and professional suggestions!
> 
> My initial thought is just as you said in first suggestion. The current RecordWriter will emit StreamRecord to some subpartition via ChannelSelector or broadcast events/watermark to all subpartitions directly.
> If the ChannelSelector implementation is BroadcastPartitioner, then we can create a specialized BroadcastRecordWriter to handle the 'emit', 'broadcastEmit', 'broadcastEvent', etc.
> To make it seems not tricky, I want to abstract the RecordWriter as a plugin, then implement a BroadcastRecordWriter and NonBroadcastRecordWriter separately to extend abstract RecordWriter. That means we divide the RecordWriter by ChannelSelector, and also we may remove current StreamRecordWriter to uniform the RecordWriter criteria in both stream and batch mode.
> 
> Considering specific implementations, I think one RecordSerializer can work for both BroadcastRecordWriter and NonBroadcastRecordWriter, but the precondition is making the RecordSerializer has no internal state, so we have to remove the BufferBuilder variable from SpanningRecordSerializer and pass it via addRecord/continueWritingWithNextBufferBuilder
> methods from RecordWriter. BroadcastRecordWriter only needs maintain one BufferBuilder for all subpartitions, and NonBroadcastRecordWriter may need maintain one BufferBuilder per subpartition.
> 
> Another issue is whether this improvement is suitable for broadcastEmit(watermark) in NonBroadcastRecordWriter as you said in suggestion 2,3. I wonder it may decrease the buffer utilization if switch between broadcast and non-broadcast modes, even it may seem more tricky in implementation. I am still thinking of it.
> 
> Maybe we can implement the improvement for BroadcastPartitioner in first step and make sure one RecordSerializer for all subpartitions. That can reduce the memory overhead in RecordSerializer and the time cost in broadcast serialization scenarios.
> 
> Best,
> 
> Zhijiang
> 
> 
> ------------------------------------------------------------------
> 发件人:Piotr Nowojski <pi...@data-artisans.com>
> 发送时间:2018年7月17日(星期二) 23:31
> 收件人:dev <de...@flink.apache.org>; Zhijiang(wangzhijiang999) <wa...@aliyun.com>
> 主 题:Re: [DISCUSS] Improve broadcast serialization
> 
> Hi
> 
> Generally speaking this would be a nice optimisation, however it might be tricky to implement. The thing to keep in mind is that currently interface allow to interleave broadcasting and normal sending, because of that at any given time some serialisers can have more data then others. For example when we have two output channels and we are looping following writes:
> 
> Write sth to 1. Channel
> Broadcast to all channels
> Write sth to 1. Channel
> Broadcast to all channels
> Write sth to 1. Channel
> Broadcast to all channels
> (…)
> 
> Thus buffers of different channels can fill out with different rates.
> 
>> In theory every record can be serialized only once and referenced for all the subpartitions in broadcast mode.
> 
> The problem here is that after records serialising, the only unit that can be referenced afterwards is “Buffer”. So that would leave us now with couple of options:
> 
> 1. Create a specialised BroadcastRecordWriter that supports ONLY broadcasting, guaranteeing that all channels always receive the same data. Here you could serialise records only once, to one BufferBuilder that could be shared and referenced by multiple BufferConsumers from different channels. Any non broadcast write would have to fail.
> 
> 2. Similar as above, but specialised in MOSTLY broadcasting. Operate as in 1. for broadcasts, but for any non broadcast write: finish current broadcasting BufferBuilder, flush all data on all channels, serialise single record to single channel using newly create BufferBuilder and also immediately finish/flush it, so that any subsequent broadcasts will work again as in 1.:
> 
> 3. Similar as 2, but lazily switch between broadcasting and non-broadcasting modes. It would have two modes of operating that could be switched back and forth: the same as currently implemented for non-broadcasted and optimised broadcast mode
> 
> Broadcast to all channels
> Broadcast to all channels
> Broadcast to all channels
> Broadcast to all channels
> Write sth to X Channel // this flushes all channels and clears/finishes previous BufferBuilder 
> Write sth to Y Channel
> Write sth to Y Channel
> Write sth to Y Channel
> Write sth to X Channel 
> Broadcast to all channels // this flushes all channels and clears/finishes previous BufferBuilders, 
> Broadcast to all channels
> Broadcast to all channels
> (…)
> 
> However both in 2. and 3. there would be very big penalty for mixing broadcast with normal writes.  
> 
> Piotrek
> 
>> On 13 Jul 2018, at 09:44, Zhijiang(wangzhijiang999) <wa...@aliyun.com.INVALID> wrote:
>> 
>> Hi all,
>> 
>> In current implementation, the RecordSerializer is created separately for each subpartition in RecordWriter, that means the number of serializers equals to the number of subpartitions.
>> For broadcast partitioner, every record will be serialized many times in all the subpartitions, and this may bring bad performance to some extent.
>> In theory every record can be serialized only once and referenced for all the subpartitions in broadcast mode.
>> 
>> To do so, I propose the following changes:
>> 1. Create and maintain only one serializer in RecordWriter, and it will serialize the record for all the subpartitions. It makes sense for any partitioners, and the memory overhead can be also decreased, because every serializer will maintain some separate byte buffers internally.
>> 2. Maybe we can abstract the RecordWriter as a base class used for other partitioner mode and implement a BroadcastRecordWriter for BroadcastPartitioner. And this new implementation will add buffer references based on the number of subpartitions before adding into subpartition queue.
>> 3. Maybe we can remove StreamRecordWriter by migrating flusher from it to RecordWriter, then the uniform RecordWriter can be used for both stream and batch. The above BroadcastRecordWriter can aslo uniform for both stream and batch.
>> 
>> I am not sure whether this improvement is proposed before and what do you think of it?
>> If necessary I can create JIRAs to contirbute it, and may need one commiter cooperate with me.
>> 
>> Best,
>> 
>> Zhijiang
> 











Re: [DISCUSS] Improve broadcast serialization

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hey,

I also think that 3rd option is the most promising, however logic of “dirty” channels might be causing some overheads. I was also thinking about other option:

5. In case of ‘emit’ called on BroadcastRecordWriter, we could write it to common/shared BufferBuilder, but somehow marked it as targeted to only one channel - we would send it over the network to all of the receivers, but all except of one would ignore it. This might be easier to implement in BroadcastRecordWriter, but would require extra logic on the receiver side. With respect to the performance it also might be better compared to 3.

Couple of more thoughts:

a) if we select BroadcastRecordWriter, literally the only way how it can be polluted by non broadcast writes are latency markers via `randomEmit`. When choosing 3 vs 5, mixing broadcast and non broadcast happens very rarely, so we shouldn’t optimise for it, but pick something that’s easiest to implement.
b) there are no use cases where `ChannelSelector` returns anything else besides single channel or broadcast.

b) point brings me to one more thing. I was once playing with simplifying `ChannelSelector` interface by adding new one `SingleChannelSelector` with method:

`int selectChannel(T record, int numChannels);`

And it was resulting with ~10% performance speed up for network stack alone (overhead of creating singleton arrays and iterating over them). I didn’t follow up on this, because performance gain wasn’t super huge, while it complicated `RecordWriter`, since it had to handle both either `SingleChannelSelector` or `ChannelSelector`. Now that I realised that there are no use cases for selecting more then one, but not all of the channels and that anyway we go with broadcasting, we will have to special handle `BroadcastPartitioner`, that’s the perfect occasion to actually simplify the implementation and drop this multi channel ChannelSelector.

I think we should to this as a first step in a preparation before either 3. or 5. (changing ChannelSelector signature to:

int selectChannel(T record, int numChannels);

)

What do you think?

Piotrek

> On 18 Oct 2018, at 06:12, Zhijiang(wangzhijiang999) <wa...@aliyun.com> wrote:
> 
> Hi Piotr,
> 
> Thanks for your replies and suggestions!
> 
> For my rough idea of skip index list, I agree with your concerns of performance for non-broadcast case and complicated implementation. Although I think this idea seems more unified in semantics for "emit", "broadcastEmit" and "randomEmit" APIs, maybe it is not worth going deep into it currently for global changes.
> 
> Currently RecordWriter provides three main methods to write elements in different semantics:
> 
> "broadcastEmit" would write the element to all the channels, used for watermark currently.
> "randomEmit" would write the element to one random channel, used for latency marker currently.
> "emit" would write the element to some channels via ChannelSelector, used for normal records currectly. And the selected channels may be one, some or all.
> 
> If we want to retain these APIs for different requirements, then the RecordWriter should not be aware of which kind of elements would be written via APIs, so we should not make any assumings in the implementation. In details, I know the "randomEmit" in only used for latency marker currently, but we can not confirm whether this API would be used for other elements in future, so we can not estimate how frequency is used for this API for different possiable elements which is my above concerns. I do not want to limit any future possibilities for these APIs caused by this improvement.
> 
> Considering the below suggestions:
> 
> 1.  Inserting the elements via "randomEmit" in front of unfinished broadcast buffer will change the current sequence semantic. It may be not matter for latency marker currently, but may not be extented for future other elements.
> 
> 2. If we easily implement "randomEmit" as the way of broadcast, I am wondering the broadcast storm in special cases and we also change the semantics to send the unnecessary elements for some channels.
> 
> 3.  I prefer this way currently and it is similar with our previous discussion. And the implementation is more likely the way of current "broadcastEvent", which creates a new broadcast buffer for event, and finish the current buffer for all the channels before enqueuing this event buffer.
> 
> 4. Yes, your sayings is write for current mode. And I want to pass a boolean parameter "isBroadcast" in the constructor of RecordWriter for indicating broadcast writes in specific processes, because the RecordWriter can not check ChannelSelector instance based on module dependency.
> 
> In conclusion, I want to implement this improvement based on the third point from current thoughting, which keeps the same behavior like normal "emit" mixing with "broadcastEvent".
> 
> Best,
> Zhijiang
> ------------------------------------------------------------------
> 发件人:Piotr Nowojski <pi...@data-artisans.com>
> 发送时间:2018年10月17日(星期三) 19:25
> 收件人:Zhijiang(wangzhijiang999) <wa...@aliyun.com>
> 抄 送:Nico Kruber <ni...@data-artisans.com>; dev <de...@flink.apache.org>
> 主 题:Re: [DISCUSS] Improve broadcast serialization
> 
> Hi,
> 
> Regarding the second idea with skip index list, I would guess it might have bad performance impact in non broadcasting cases or would seriously complicate our Buffer implementation. Also it would make reading/data copying/slicing and other big chunk byte operations much more costly. Instead of memcpy whole buffer we would have to manually select the correct bits.
> 
>  > But I am just wondering if the switch is frequent between broadcasting and non-broadcasting operations
> 
> I haven't researched this topic any further then before. However my first guess would be that this switch doesn’t happen at all EXCEPT of `randomEmit` which is used for the latency markers (this statement requires further research/validation). Assuming that’s true.
> 
> 1. Probably we can not/should not flush the broadcasted buffer, serialise randomEmit and flush it again, because this would prematurely emit latency marker - defeating it purpose and skewing the measured time. LatencyMarkers are expected to travel through pipeline at the exact same speed as regular records would.
> 
> 2. Maybe we could just always broadcast the latency markers as well? This would be nice solution except of that at the level of RecordWriter we do not know whether this is latency marker or not - we no only that we were asked to “emit”, “randomEmit” or “broadcastEmit” and we have to handle them somehow (throwing exception?)
> 
> 3. Assuming `randomEmit` or `emit` is rare, maybe we copy the broadcasted `BufferBuilder` into a new one, append there the record/latency marker so all except of one channel would share “broadcasted” BufferBuilder. Once we need to flush any of the buffers (either broadcasted or the “dirty” one) we flush them all and restart with all channels sharing a fresh new “broadcasted” BufferBuilder? 
> 
> 4. For streaming isn't Broadcast currently realised via passing `BroadcastPartitioner` to the RecordWriter and using standard `emit` method? That would need to be changed/handled in order to optimise broadcast writes.
> 
> Piotrek
> 
> On 15 Oct 2018, at 12:02, Zhijiang(wangzhijiang999) <wangzhijiang999@aliyun.com <ma...@aliyun.com>> wrote:
> 
> Let's come back to this discussion again.
> 
> Thanks for @Nico Kruber and @Piotr Nowojski  reviewing the PR of proposed 2.1 for serialization only once below and it is already merged into branch.
> 
> For proposed 2.2 for copy only once, we verified it is also very important for batch job related with join operation in our benchmark, so we want to focus on improving it based on 2.1.
> Considering the initial conclusion before, we want to finish current broadcasting shared BufferBuilder when triggering non-broadcast operation which will request new separate BufferBuilder. But I am just wondering if the switch is frequent between broadcasting and non-broadcasting operations, the BufferBuilder may be filled with few data resulting in low resource utilization which may cause regression in special cases. And as long as one channel has not consumed the data by network transfer, this shared BufferBuilder can not be recycled.
> 
> Another raw idea is if we support one BufferBuilder shared by all the channels has different data regions for differnet partition indexes, that means one subpartition has non-continuous data distribution in the BufferBuilder. We can use skip index list to identify which regions belong to sepecific subpartiiton which can avoid finishing BufferBuilder during mode switch. But it seems more complicated with current process and i am not sure whether it has other performance concerns.
> 
> Wish any feedbacks for this issue, then i can further focus on it. 
> 
> Best,
> Zhijiang
> 
> ------------------------------------------------------------------
> 发件人:Zhijiang(wangzhijiang999) <wangzhijiang999@aliyun.com.INVALID <ma...@aliyun.com.INVALID>>
> 发送时间:2018年7月20日(星期五) 13:21
> 收件人:Piotr Nowojski <piotr@data-artisans.com <ma...@data-artisans.com>>
> 抄 送:Nico Kruber <nico@data-artisans.com <ma...@data-artisans.com>>; dev <dev@flink.apache.org <ma...@flink.apache.org>>
> 主 题:回复:[DISCUSS] Improve broadcast serialization
> 
> Ok, that is fine. :)
> 
> I will create JIRA today and submit the PR next week.
> 
> Zhijiang
> ------------------------------------------------------------------
> 发件人:Piotr Nowojski <piotr@data-artisans.com <ma...@data-artisans.com>>
> 发送时间:2018年7月19日(星期四) 17:52
> 收件人:Zhijiang(wangzhijiang999) <wangzhijiang999@aliyun.com <ma...@aliyun.com>>
> 抄 送:Nico Kruber <nico@data-artisans.com <ma...@data-artisans.com>>; dev <dev@flink.apache.org <ma...@flink.apache.org>>
> 主 题:Re: [DISCUSS] Improve broadcast serialization
> 
> Hi,
> 
> I have only noticed your second response after sending my email :) 
> 
> Ok, now I think we are on the same page :) I think you can work on 2.1 and later on 2.2 if you will think that 2.1 is not enough. Once you create a Jira issues/PRs please CC me.
> 
> Piotrek  
> 
> On 19 Jul 2018, at 04:51, Zhijiang(wangzhijiang999) <wangzhijiang999@aliyun.com <ma...@aliyun.com>> wrote:
> Hi Piotr
> 
> 1. I agree with we should discuss higher level first and focus on implementation on jira/pr. As long as RecordSerializer does not maintain the BufferBuilder, it can become stateless, then it can get BufferBuilderfrom the RecordWriter at any time.  And I think it is the precondition to improve serializing only once for multi channels, otherwise we have to select serializer based on target channel index.
> 
> 2. I already corrected this thought in last reply, maybe you have not seen it before you reply. :)  
> We can break the broadcast improvement into two steps:
> 2.1 Serialize the record into temporary byte buffer only once for multi selected channels. (currently serialize many times)
> 2.2 Copy the temporary byte buffer into BufferBuilder only once and create different BufferConsumers based on the same BufferBuilder for each channel. (currently copy many times)
> Regarding 2.1, just the same as your proposal[c], it is worth to do currently and can get good benefits I think.
> Regarding 2.2, considering mixed broadcast/non-broadcast writes, it has to flush/finish last broadcast BufferBuilder for current non-broadcast writes and vice versa. I agree with your proposal[2] for this issue, and we can further consider it in future, maybe there are other better ways for avoiding it.
> 
> 4. My previous thought is to realize both above 2.1 and 2.2. The 2.1 is your proposal[c] which has no problem for mixed write mode, so no need additional flush. The 2.2 is just as your proposal[2] which concerns additional flush. Maybe my last reply make you misunderstand.
> 
> I can submit jira for above 2.1 first if no other concerns, thanks for the helpful advice. :)
> 
> Best,
> 
> Zhijiang
> ------------------------------------------------------------------
> 发件人:Piotr Nowojski <piotr@data-artisans.com <ma...@data-artisans.com>>
> 发送时间:2018年7月18日(星期三) 20:04
> 收件人:Zhijiang(wangzhijiang999) <wangzhijiang999@aliyun.com <ma...@aliyun.com>>; Nico Kruber <nico@data-artisans.com <ma...@data-artisans.com>>
> 抄 送:dev <dev@flink.apache.org <ma...@flink.apache.org>>
> 主 题:Re: [DISCUSS] Improve broadcast serialization
> 
> 
> Hi 
> 
> 1. I want to define a new AbstractRecordWriter as base class which defines some abstract methods and utility codes. The current RecordWriter used for other partitioner and new BroadcastRecordWriter used only for BroadcastPartitioner will both extend AbstractRecordWriter. The fields in BroadcastPartitioner are extactly as you showed below, but for current RecordWriter it also only needs one RecordSerializer if we make the RecordSerializer has no internal state.
> 
> Lets first discuss what we would like to have/implement on higher level and later focus on implementation details. Regarding making RecordSerializer stateless, there were some discussions about it previously and it was on our TODO list but I don’t remember what was holding us back. Maybe Nico will remember?
> 
> 
> 2. You pointed the key problem that how to handle `randomEmit` in BroadcastRecordWriter, and I think this process may resue the `emit` logic in current RecordWriter. Then the `emit` and `broadcastEmit` logics in BroadcastRecordWriter will serialize data only once and copy to BufferBuilder only once. So this improvement is deterministic for BroadcastPartitioner.
> 
> 
> What logic to reuse do you have in mind? 
> 4. As for 'broadcastEmit` improvement in RecordWriter for non-broadcast partitioner, we can also do as you suggested in option [2], but it has to finish/flush the previous BufferBuilder generated by common `emit` operation. So it may bring bad impacts on buffer utility which was improved well in event-driven flush feature. So I am not sure whether it is worth doing `broadcastEmit` improvement in RecordWriter.
> 
> 
> The whole point of my proposal [c] was to avoid the need to flush. Code would need a little bit more refactoring but it should look something like this:
> 
> void broadcastEmit(record):
>  serializedRecord = serializer.serialize(record)
>  for bufferBuilder in bufferBuilders:
>  bufferBuilder.append(serializedRecord)
>  // if we overfilled bufferBuilder, finish it, request new one and continue writing
> 
> void emit(record, channel)
>  serializedRecord = serializer.serialize(record)
>  bufferBuilders[channel].append(serializedRecord)
>  // if we overfilled bufferBuilder, finish it, request new one and continue writing
> 
> I do not see here a need for additional flushes and it should be strict improvement over current code base.
> 
> 
> I already realized the demo covering above 1,2,5 before. I can create jiras after we reach a final agreement, then maybe you can help review PR if have time. :)
> 
> 
> Sure :)
> 
> Piotrek
> 
> Best,
> 
> Zhijiang
> ------------------------------------------------------------------
> 发件人:Piotr Nowojski <piotr@data-artisans.com <ma...@data-artisans.com>>
> 发送时间:2018年7月18日(星期三) 16:37
> 收件人:dev <dev@flink.apache.org <ma...@flink.apache.org>>; Zhijiang(wangzhijiang999) <wangzhijiang999@aliyun.com <ma...@aliyun.com>>
> 主 题:Re: [DISCUSS] Improve broadcast serialization
> 
> Hi,
> 
> Couple of more thoughts
> 
> a) I’m not sure if you would have to modify current RecordWriter at all. You could extract interface from current RecordWriter and just provide two implementations: current one and BroadcastRecordWriter. I’m not sure, but it doesn’t seem like they would duplicate/share lots of code. BroadcastRecordWriter would have fields:
> 
> private final RecordSerializer<T> serializers;
> 
> private final Optional<BufferBuilder> bufferBuilder;
> 
> Compared to RecordWriter’s arrays.
> 
> b) One thing that I noticed now are latency markers and randomEmit method. It prevents us from implementing option [1]. BroadcastRecordWriter would have to flush all channels on randomEmit (as I proposed in option [2]).
> 
> c) Another option to optimise broadcast writes (or for that matter all multi channel writes), would be to serialise record only once to SpanningRecordSerializer#serializationBuffer, but copy it multiple times to separate BufferBuilders. That would save us much more then half of the overhead (serialisation is more costly compared to data copying), while we would avoid problems with uneven state of channels. There would be no problems with mixed broadcast/non broadcast writes, this option could support both of them at the same time - in other words, it would be as generic as the current one.
> 
> d) Regarding StreamRecordWriter, other option is, that it could be refactored to a class implementing extracted RecordWriter interface and being a proxy/wrapper around another RecordWriter instance:
> 
> Class StreamRecordWriter implements RecordWriter {
>   private final RecordWrtier recordWriter; //either broadcast or non broadcast 
>   public void foo() {
>     recordWriter.foo();
>   }
> }
> 
> To be honest I’m not sure at the moment which one would be better [2] or [c]. In ideal world, we might want to replace current RecordWriter with [c] and after that (if that’s not enough) to implement [2] on top of [c]. 
> 
> Piotrek
> 
> > On 18 Jul 2018, at 05:36, Zhijiang(wangzhijiang999) <wangzhijiang999@aliyun.com.INVALID <ma...@aliyun.com.INVALID>> wrote:
> > 
> > Hi Piotr,
> > 
> > Thanks for your replies and professional suggestions!
> > 
> > My initial thought is just as you said in first suggestion. The current RecordWriter will emit StreamRecord to some subpartition via ChannelSelector or broadcast events/watermark to all subpartitions directly.
> > If the ChannelSelector implementation is BroadcastPartitioner, then we can create a specialized BroadcastRecordWriter to handle the 'emit', 'broadcastEmit', 'broadcastEvent', etc.
> > To make it seems not tricky, I want to abstract the RecordWriter as a plugin, then implement a BroadcastRecordWriter and NonBroadcastRecordWriter separately to extend abstract RecordWriter. That means we divide the RecordWriter by ChannelSelector, and also we may remove current StreamRecordWriter to uniform the RecordWriter criteria in both stream and batch mode.
> > 
> > Considering specific implementations, I think one RecordSerializer can work for both BroadcastRecordWriter and NonBroadcastRecordWriter, but the precondition is making the RecordSerializer has no internal state, so we have to remove the BufferBuilder variable from SpanningRecordSerializer and pass it via addRecord/continueWritingWithNextBufferBuilder
> > methods from RecordWriter. BroadcastRecordWriter only needs maintain one BufferBuilder for all subpartitions, and NonBroadcastRecordWriter may need maintain one BufferBuilder per subpartition.
> > 
> > Another issue is whether this improvement is suitable for broadcastEmit(watermark) in NonBroadcastRecordWriter as you said in suggestion 2,3. I wonder it may decrease the buffer utilization if switch between broadcast and non-broadcast modes, even it may seem more tricky in implementation. I am still thinking of it.
> > 
> > Maybe we can implement the improvement for BroadcastPartitioner in first step and make sure one RecordSerializer for all subpartitions. That can reduce the memory overhead in RecordSerializer and the time cost in broadcast serialization scenarios.
> > 
> > Best,
> > 
> > Zhijiang
> > 
> > 
> > ------------------------------------------------------------------
> > 发件人:Piotr Nowojski <piotr@data-artisans.com <ma...@data-artisans.com>>
> > 发送时间:2018年7月17日(星期二) 23:31
> > 收件人:dev <dev@flink.apache.org <ma...@flink.apache.org>>; Zhijiang(wangzhijiang999) <wangzhijiang999@aliyun.com <ma...@aliyun.com>>
> > 主 题:Re: [DISCUSS] Improve broadcast serialization
> > 
> > Hi
> > 
> > Generally speaking this would be a nice optimisation, however it might be tricky to implement. The thing to keep in mind is that currently interface allow to interleave broadcasting and normal sending, because of that at any given time some serialisers can have more data then others. For example when we have two output channels and we are looping following writes:
> > 
> > Write sth to 1. Channel
> > Broadcast to all channels
> > Write sth to 1. Channel
> > Broadcast to all channels
> > Write sth to 1. Channel
> > Broadcast to all channels
> > (…)
> > 
> > Thus buffers of different channels can fill out with different rates.
> > 
> >> In theory every record can be serialized only once and referenced for all the subpartitions in broadcast mode.
> > 
> > The problem here is that after records serialising, the only unit that can be referenced afterwards is “Buffer”. So that would leave us now with couple of options:
> > 
> > 1. Create a specialised BroadcastRecordWriter that supports ONLY broadcasting, guaranteeing that all channels always receive the same data. Here you could serialise records only once, to one BufferBuilder that could be shared and referenced by multiple BufferConsumers from different channels. Any non broadcast write would have to fail.
> > 
> > 2. Similar as above, but specialised in MOSTLY broadcasting. Operate as in 1. for broadcasts, but for any non broadcast write: finish current broadcasting BufferBuilder, flush all data on all channels, serialise single record to single channel using newly create BufferBuilder and also immediately finish/flush it, so that any subsequent broadcasts will work again as in 1.:
> > 
> > 3. Similar as 2, but lazily switch between broadcasting and non-broadcasting modes. It would have two modes of operating that could be switched back and forth: the same as currently implemented for non-broadcasted and optimised broadcast mode
> > 
> > Broadcast to all channels
> > Broadcast to all channels
> > Broadcast to all channels
> > Broadcast to all channels
> > Write sth to X Channel // this flushes all channels and clears/finishes previous BufferBuilder 
> > Write sth to Y Channel
> > Write sth to Y Channel
> > Write sth to Y Channel
> > Write sth to X Channel 
> > Broadcast to all channels // this flushes all channels and clears/finishes previous BufferBuilders, 
> > Broadcast to all channels
> > Broadcast to all channels
> > (…)
> > 
> > However both in 2. and 3. there would be very big penalty for mixing broadcast with normal writes.  
> > 
> > Piotrek
> > 
> >> On 13 Jul 2018, at 09:44, Zhijiang(wangzhijiang999) <wangzhijiang999@aliyun.com.INVALID <ma...@aliyun.com.INVALID>> wrote:
> >> 
> >> Hi all,
> >> 
> >> In current implementation, the RecordSerializer is created separately for each subpartition in RecordWriter, that means the number of serializers equals to the number of subpartitions.
> >> For broadcast partitioner, every record will be serialized many times in all the subpartitions, and this may bring bad performance to some extent.
> >> In theory every record can be serialized only once and referenced for all the subpartitions in broadcast mode.
> >> 
> >> To do so, I propose the following changes:
> >> 1. Create and maintain only one serializer in RecordWriter, and it will serialize the record for all the subpartitions. It makes sense for any partitioners, and the memory overhead can be also decreased, because every serializer will maintain some separate byte buffers internally.
> >> 2. Maybe we can abstract the RecordWriter as a base class used for other partitioner mode and implement a BroadcastRecordWriter for BroadcastPartitioner. And this new implementation will add buffer references based on the number of subpartitions before adding into subpartition queue.
> >> 3. Maybe we can remove StreamRecordWriter by migrating flusher from it to RecordWriter, then the uniform RecordWriter can be used for both stream and batch. The above BroadcastRecordWriter can aslo uniform for both stream and batch.
> >> 
> >> I am not sure whether this improvement is proposed before and what do you think of it?
> >> If necessary I can create JIRAs to contirbute it, and may need one commiter cooperate with me.
> >> 
> >> Best,
> >> 
> >> Zhijiang
> > 
> 
> 
> 
> 
> 
> 
> 
> 


回复:[DISCUSS] Improve broadcast serialization

Posted by "Zhijiang(wangzhijiang999)" <wa...@aliyun.com.INVALID>.
Hi Piotr,

Thanks for your replies and suggestions!

For my rough idea of skip index list, I agree with your concerns of performance for non-broadcast case and complicated implementation. Although I think this idea seems more unified in semantics for "emit", "broadcastEmit" and "randomEmit" APIs, maybe it is not worth going deep into it currently for global changes.

Currently RecordWriter provides three main methods to write elements in different semantics:

"broadcastEmit" would write the element to all the channels, used for watermark currently.
"randomEmit" would write the element to one random channel, used for latency marker currently.
"emit" would write the element to some channels via ChannelSelector, used for normal records currectly. And the selected channels may be one, some or all.

If we want to retain these APIs for different requirements, then the RecordWriter should not be aware of which kind of elements would be written via APIs, so we should not make any assumings in the implementation. In details, I know the "randomEmit" in only used for latency marker currently, but we can not confirm whether this API would be used for other elements in future, so we can not estimate how frequency is used for this API for different possiable elements which is my above concerns. I do not want to limit any future possibilities for these APIs caused by this improvement.

Considering the below suggestions:

1.  Inserting the elements via "randomEmit" in front of unfinished broadcast buffer will change the current sequence semantic. It may be not matter for latency marker currently, but may not be extented for future other elements.

2. If we easily implement "randomEmit" as the way of broadcast, I am wondering the broadcast storm in special cases and we also change the semantics to send the unnecessary elements for some channels.

3.  I prefer this way currently and it is similar with our previous discussion. And the implementation is more likely the way of current "broadcastEvent", which creates a new broadcast buffer for event, and finish the current buffer for all the channels before enqueuing this event buffer.

4. Yes, your sayings is write for current mode. And I want to pass a boolean parameter "isBroadcast" in the constructor of RecordWriter for indicating broadcast writes in specific processes, because the RecordWriter can not check ChannelSelector instance based on module dependency.

In conclusion, I want to implement this improvement based on the third point from current thoughting, which keeps the same behavior like normal "emit" mixing with "broadcastEvent".

Best,
Zhijiang
------------------------------------------------------------------
发件人:Piotr Nowojski <pi...@data-artisans.com>
发送时间:2018年10月17日(星期三) 19:25
收件人:Zhijiang(wangzhijiang999) <wa...@aliyun.com>
抄 送:Nico Kruber <ni...@data-artisans.com>; dev <de...@flink.apache.org>
主 题:Re: [DISCUSS] Improve broadcast serialization

Hi,

Regarding the second idea with skip index list, I would guess it might have bad performance impact in non broadcasting cases or would seriously complicate our Buffer implementation. Also it would make reading/data copying/slicing and other big chunk byte operations much more costly. Instead of memcpy whole buffer we would have to manually select the correct bits.

 > But I am just wondering if the switch is frequent between broadcasting and non-broadcasting operations

I haven't researched this topic any further then before. However my first guess would be that this switch doesn’t happen at all EXCEPT of `randomEmit` which is used for the latency markers (this statement requires further research/validation). Assuming that’s true.

1. Probably we can not/should not flush the broadcasted buffer, serialise randomEmit and flush it again, because this would prematurely emit latency marker - defeating it purpose and skewing the measured time. LatencyMarkers are expected to travel through pipeline at the exact same speed as regular records would.

2. Maybe we could just always broadcast the latency markers as well? This would be nice solution except of that at the level of RecordWriter we do not know whether this is latency marker or not - we no only that we were asked to “emit”, “randomEmit” or “broadcastEmit” and we have to handle them somehow (throwing exception?)

3. Assuming `randomEmit` or `emit` is rare, maybe we copy the broadcasted `BufferBuilder` into a new one, append there the record/latency marker so all except of one channel would share “broadcasted” BufferBuilder. Once we need to flush any of the buffers (either broadcasted or the “dirty” one) we flush them all and restart with all channels sharing a fresh new “broadcasted” BufferBuilder? 

4. For streaming isn't Broadcast currently realised via passing `BroadcastPartitioner` to the RecordWriter and using standard `emit` method? That would need to be changed/handled in order to optimise broadcast writes.

Piotrek

On 15 Oct 2018, at 12:02, Zhijiang(wangzhijiang999) <wa...@aliyun.com> wrote:
Let's come back to this discussion again.

Thanks for @Nico Kruber and @Piotr Nowojski  reviewing the PR of proposed 2.1 for serialization only once below and it is already merged into branch.

For proposed 2.2 for copy only once, we verified it is also very important for batch job related with join operation in our benchmark, so we want to focus on improving it based on 2.1.
Considering the initial conclusion before, we want to finish current broadcasting shared BufferBuilder when triggering non-broadcast operation which will request new separate BufferBuilder. But I am just wondering if the switch is frequent between broadcasting and non-broadcasting operations, the BufferBuilder may be filled with few data resulting in low resource utilization which may cause regression in special cases. And as long as one channel has not consumed the data by network transfer, this shared BufferBuilder can not be recycled.

Another raw idea is if we support one BufferBuilder shared by all the channels has different data regions for differnet partition indexes, that means one subpartition has non-continuous data distribution in the BufferBuilder. We can use skip index list to identify which regions belong to sepecific subpartiiton which can avoid finishing BufferBuilder during mode switch. But it seems more complicated with current process and i am not sure whether it has other performance concerns.

Wish any feedbacks for this issue, then i can further focus on it. 

Best,
Zhijiang

------------------------------------------------------------------
发件人:Zhijiang(wangzhijiang999) <wa...@aliyun.com.INVALID>
发送时间:2018年7月20日(星期五) 13:21
收件人:Piotr Nowojski <pi...@data-artisans.com>
抄 送:Nico Kruber <ni...@data-artisans.com>; dev <de...@flink.apache.org>
主 题:回复:[DISCUSS] Improve broadcast serialization

Ok, that is fine. :)

I will create JIRA today and submit the PR next week.

Zhijiang
------------------------------------------------------------------
发件人:Piotr Nowojski <pi...@data-artisans.com>
发送时间:2018年7月19日(星期四) 17:52
收件人:Zhijiang(wangzhijiang999) <wa...@aliyun.com>
抄 送:Nico Kruber <ni...@data-artisans.com>; dev <de...@flink.apache.org>
主 题:Re: [DISCUSS] Improve broadcast serialization

Hi,

I have only noticed your second response after sending my email :) 

Ok, now I think we are on the same page :) I think you can work on 2.1 and later on 2.2 if you will think that 2.1 is not enough. Once you create a Jira issues/PRs please CC me.

Piotrek  

On 19 Jul 2018, at 04:51, Zhijiang(wangzhijiang999) <wa...@aliyun.com> wrote:
Hi Piotr

1. I agree with we should discuss higher level first and focus on implementation on jira/pr. As long as RecordSerializer does not maintain the BufferBuilder, it can become stateless, then it can get BufferBuilderfrom the RecordWriter at any time.  And I think it is the precondition to improve serializing only once for multi channels, otherwise we have to select serializer based on target channel index.

2. I already corrected this thought in last reply, maybe you have not seen it before you reply. :)  
We can break the broadcast improvement into two steps:
2.1 Serialize the record into temporary byte buffer only once for multi selected channels. (currently serialize many times)
2.2 Copy the temporary byte buffer into BufferBuilder only once and create different BufferConsumers based on the same BufferBuilder for each channel. (currently copy many times)
Regarding 2.1, just the same as your proposal[c], it is worth to do currently and can get good benefits I think.
Regarding 2.2, considering mixed broadcast/non-broadcast writes, it has to flush/finish last broadcast BufferBuilder for current non-broadcast writes and vice versa. I agree with your proposal[2] for this issue, and we can further consider it in future, maybe there are other better ways for avoiding it.

4. My previous thought is to realize both above 2.1 and 2.2. The 2.1 is your proposal[c] which has no problem for mixed write mode, so no need additional flush. The 2.2 is just as your proposal[2] which concerns additional flush. Maybe my last reply make you misunderstand.

I can submit jira for above 2.1 first if no other concerns, thanks for the helpful advice. :)

Best,

Zhijiang
------------------------------------------------------------------
发件人:Piotr Nowojski <pi...@data-artisans.com>
发送时间:2018年7月18日(星期三) 20:04
收件人:Zhijiang(wangzhijiang999) <wa...@aliyun.com>; Nico Kruber <ni...@data-artisans.com>
抄 送:dev <de...@flink.apache.org>
主 题:Re: [DISCUSS] Improve broadcast serialization


Hi 

1. I want to define a new AbstractRecordWriter as base class which defines some abstract methods and utility codes. The current RecordWriter used for other partitioner and new BroadcastRecordWriter used only for BroadcastPartitioner will both extend AbstractRecordWriter. The fields in BroadcastPartitioner are extactly as you showed below, but for current RecordWriter it also only needs one RecordSerializer if we make the RecordSerializer has no internal state.

Lets first discuss what we would like to have/implement on higher level and later focus on implementation details. Regarding making RecordSerializer stateless, there were some discussions about it previously and it was on our TODO list but I don’t remember what was holding us back. Maybe Nico will remember?


2. You pointed the key problem that how to handle `randomEmit` in BroadcastRecordWriter, and I think this process may resue the `emit` logic in current RecordWriter. Then the `emit` and `broadcastEmit` logics in BroadcastRecordWriter will serialize data only once and copy to BufferBuilder only once. So this improvement is deterministic for BroadcastPartitioner.


What logic to reuse do you have in mind? 
4. As for 'broadcastEmit` improvement in RecordWriter for non-broadcast partitioner, we can also do as you suggested in option [2], but it has to finish/flush the previous BufferBuilder generated by common `emit` operation. So it may bring bad impacts on buffer utility which was improved well in event-driven flush feature. So I am not sure whether it is worth doing `broadcastEmit` improvement in RecordWriter.


The whole point of my proposal [c] was to avoid the need to flush. Code would need a little bit more refactoring but it should look something like this:

void broadcastEmit(record):
 serializedRecord = serializer.serialize(record)
 for bufferBuilder in bufferBuilders:
 bufferBuilder.append(serializedRecord)
 // if we overfilled bufferBuilder, finish it, request new one and continue writing

void emit(record, channel)
 serializedRecord = serializer.serialize(record)
 bufferBuilders[channel].append(serializedRecord)
 // if we overfilled bufferBuilder, finish it, request new one and continue writing

I do not see here a need for additional flushes and it should be strict improvement over current code base.


I already realized the demo covering above 1,2,5 before. I can create jiras after we reach a final agreement, then maybe you can help review PR if have time. :)


Sure :)

Piotrek

Best,

Zhijiang
------------------------------------------------------------------
发件人:Piotr Nowojski <pi...@data-artisans.com>
发送时间:2018年7月18日(星期三) 16:37
收件人:dev <de...@flink.apache.org>; Zhijiang(wangzhijiang999) <wa...@aliyun.com>
主 题:Re: [DISCUSS] Improve broadcast serialization

Hi,

Couple of more thoughts

a) I’m not sure if you would have to modify current RecordWriter at all. You could extract interface from current RecordWriter and just provide two implementations: current one and BroadcastRecordWriter. I’m not sure, but it doesn’t seem like they would duplicate/share lots of code. BroadcastRecordWriter would have fields:

private final RecordSerializer<T> serializers;

private final Optional<BufferBuilder> bufferBuilder;

Compared to RecordWriter’s arrays.

b) One thing that I noticed now are latency markers and randomEmit method. It prevents us from implementing option [1]. BroadcastRecordWriter would have to flush all channels on randomEmit (as I proposed in option [2]).

c) Another option to optimise broadcast writes (or for that matter all multi channel writes), would be to serialise record only once to SpanningRecordSerializer#serializationBuffer, but copy it multiple times to separate BufferBuilders. That would save us much more then half of the overhead (serialisation is more costly compared to data copying), while we would avoid problems with uneven state of channels. There would be no problems with mixed broadcast/non broadcast writes, this option could support both of them at the same time - in other words, it would be as generic as the current one.

d) Regarding StreamRecordWriter, other option is, that it could be refactored to a class implementing extracted RecordWriter interface and being a proxy/wrapper around another RecordWriter instance:

Class StreamRecordWriter implements RecordWriter {
  private final RecordWrtier recordWriter; //either broadcast or non broadcast 
  public void foo() {
    recordWriter.foo();
  }
}

To be honest I’m not sure at the moment which one would be better [2] or [c]. In ideal world, we might want to replace current RecordWriter with [c] and after that (if that’s not enough) to implement [2] on top of [c]. 

Piotrek

> On 18 Jul 2018, at 05:36, Zhijiang(wangzhijiang999) <wa...@aliyun.com.INVALID> wrote:
> 
> Hi Piotr,
> 
> Thanks for your replies and professional suggestions!
> 
> My initial thought is just as you said in first suggestion. The current RecordWriter will emit StreamRecord to some subpartition via ChannelSelector or broadcast events/watermark to all subpartitions directly.
> If the ChannelSelector implementation is BroadcastPartitioner, then we can create a specialized BroadcastRecordWriter to handle the 'emit', 'broadcastEmit', 'broadcastEvent', etc.
> To make it seems not tricky, I want to abstract the RecordWriter as a plugin, then implement a BroadcastRecordWriter and NonBroadcastRecordWriter separately to extend abstract RecordWriter. That means we divide the RecordWriter by ChannelSelector, and also we may remove current StreamRecordWriter to uniform the RecordWriter criteria in both stream and batch mode.
> 
> Considering specific implementations, I think one RecordSerializer can work for both BroadcastRecordWriter and NonBroadcastRecordWriter, but the precondition is making the RecordSerializer has no internal state, so we have to remove the BufferBuilder variable from SpanningRecordSerializer and pass it via addRecord/continueWritingWithNextBufferBuilder
> methods from RecordWriter. BroadcastRecordWriter only needs maintain one BufferBuilder for all subpartitions, and NonBroadcastRecordWriter may need maintain one BufferBuilder per subpartition.
> 
> Another issue is whether this improvement is suitable for broadcastEmit(watermark) in NonBroadcastRecordWriter as you said in suggestion 2,3. I wonder it may decrease the buffer utilization if switch between broadcast and non-broadcast modes, even it may seem more tricky in implementation. I am still thinking of it.
> 
> Maybe we can implement the improvement for BroadcastPartitioner in first step and make sure one RecordSerializer for all subpartitions. That can reduce the memory overhead in RecordSerializer and the time cost in broadcast serialization scenarios.
> 
> Best,
> 
> Zhijiang
> 
> 
> ------------------------------------------------------------------
> 发件人:Piotr Nowojski <pi...@data-artisans.com>
> 发送时间:2018年7月17日(星期二) 23:31
> 收件人:dev <de...@flink.apache.org>; Zhijiang(wangzhijiang999) <wa...@aliyun.com>
> 主 题:Re: [DISCUSS] Improve broadcast serialization
> 
> Hi
> 
> Generally speaking this would be a nice optimisation, however it might be tricky to implement. The thing to keep in mind is that currently interface allow to interleave broadcasting and normal sending, because of that at any given time some serialisers can have more data then others. For example when we have two output channels and we are looping following writes:
> 
> Write sth to 1. Channel
> Broadcast to all channels
> Write sth to 1. Channel
> Broadcast to all channels
> Write sth to 1. Channel
> Broadcast to all channels
> (…)
> 
> Thus buffers of different channels can fill out with different rates.
> 
>> In theory every record can be serialized only once and referenced for all the subpartitions in broadcast mode.
> 
> The problem here is that after records serialising, the only unit that can be referenced afterwards is “Buffer”. So that would leave us now with couple of options:
> 
> 1. Create a specialised BroadcastRecordWriter that supports ONLY broadcasting, guaranteeing that all channels always receive the same data. Here you could serialise records only once, to one BufferBuilder that could be shared and referenced by multiple BufferConsumers from different channels. Any non broadcast write would have to fail.
> 
> 2. Similar as above, but specialised in MOSTLY broadcasting. Operate as in 1. for broadcasts, but for any non broadcast write: finish current broadcasting BufferBuilder, flush all data on all channels, serialise single record to single channel using newly create BufferBuilder and also immediately finish/flush it, so that any subsequent broadcasts will work again as in 1.:
> 
> 3. Similar as 2, but lazily switch between broadcasting and non-broadcasting modes. It would have two modes of operating that could be switched back and forth: the same as currently implemented for non-broadcasted and optimised broadcast mode
> 
> Broadcast to all channels
> Broadcast to all channels
> Broadcast to all channels
> Broadcast to all channels
> Write sth to X Channel // this flushes all channels and clears/finishes previous BufferBuilder 
> Write sth to Y Channel
> Write sth to Y Channel
> Write sth to Y Channel
> Write sth to X Channel 
> Broadcast to all channels // this flushes all channels and clears/finishes previous BufferBuilders, 
> Broadcast to all channels
> Broadcast to all channels
> (…)
> 
> However both in 2. and 3. there would be very big penalty for mixing broadcast with normal writes.  
> 
> Piotrek
> 
>> On 13 Jul 2018, at 09:44, Zhijiang(wangzhijiang999) <wa...@aliyun.com.INVALID> wrote:
>> 
>> Hi all,
>> 
>> In current implementation, the RecordSerializer is created separately for each subpartition in RecordWriter, that means the number of serializers equals to the number of subpartitions.
>> For broadcast partitioner, every record will be serialized many times in all the subpartitions, and this may bring bad performance to some extent.
>> In theory every record can be serialized only once and referenced for all the subpartitions in broadcast mode.
>> 
>> To do so, I propose the following changes:
>> 1. Create and maintain only one serializer in RecordWriter, and it will serialize the record for all the subpartitions. It makes sense for any partitioners, and the memory overhead can be also decreased, because every serializer will maintain some separate byte buffers internally.
>> 2. Maybe we can abstract the RecordWriter as a base class used for other partitioner mode and implement a BroadcastRecordWriter for BroadcastPartitioner. And this new implementation will add buffer references based on the number of subpartitions before adding into subpartition queue.
>> 3. Maybe we can remove StreamRecordWriter by migrating flusher from it to RecordWriter, then the uniform RecordWriter can be used for both stream and batch. The above BroadcastRecordWriter can aslo uniform for both stream and batch.
>> 
>> I am not sure whether this improvement is proposed before and what do you think of it?
>> If necessary I can create JIRAs to contirbute it, and may need one commiter cooperate with me.
>> 
>> Best,
>> 
>> Zhijiang
> 









Re: [DISCUSS] Improve broadcast serialization

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi,

Regarding the second idea with skip index list, I would guess it might have bad performance impact in non broadcasting cases or would seriously complicate our Buffer implementation. Also it would make reading/data copying/slicing and other big chunk byte operations much more costly. Instead of memcpy whole buffer we would have to manually select the correct bits.

 > But I am just wondering if the switch is frequent between broadcasting and non-broadcasting operations

I haven't researched this topic any further then before. However my first guess would be that this switch doesn’t happen at all EXCEPT of `randomEmit` which is used for the latency markers (this statement requires further research/validation). Assuming that’s true.

1. Probably we can not/should not flush the broadcasted buffer, serialise randomEmit and flush it again, because this would prematurely emit latency marker - defeating it purpose and skewing the measured time. LatencyMarkers are expected to travel through pipeline at the exact same speed as regular records would.

2. Maybe we could just always broadcast the latency markers as well? This would be nice solution except of that at the level of RecordWriter we do not know whether this is latency marker or not - we no only that we were asked to “emit”, “randomEmit” or “broadcastEmit” and we have to handle them somehow (throwing exception?)

3. Assuming `randomEmit` or `emit` is rare, maybe we copy the broadcasted `BufferBuilder` into a new one, append there the record/latency marker so all except of one channel would share “broadcasted” BufferBuilder. Once we need to flush any of the buffers (either broadcasted or the “dirty” one) we flush them all and restart with all channels sharing a fresh new “broadcasted” BufferBuilder? 

4. For streaming isn't Broadcast currently realised via passing `BroadcastPartitioner` to the RecordWriter and using standard `emit` method? That would need to be changed/handled in order to optimise broadcast writes.

Piotrek

> On 15 Oct 2018, at 12:02, Zhijiang(wangzhijiang999) <wa...@aliyun.com> wrote:
> 
> Let's come back to this discussion again.
> 
> Thanks for @Nico Kruber and @Piotr Nowojski  reviewing the PR of proposed 2.1 for serialization only once below and it is already merged into branch.
> 
> For proposed 2.2 for copy only once, we verified it is also very important for batch job related with join operation in our benchmark, so we want to focus on improving it based on 2.1.
> Considering the initial conclusion before, we want to finish current broadcasting shared BufferBuilder when triggering non-broadcast operation which will request new separate BufferBuilder. But I am just wondering if the switch is frequent between broadcasting and non-broadcasting operations, the BufferBuilder may be filled with few data resulting in low resource utilization which may cause regression in special cases. And as long as one channel has not consumed the data by network transfer, this shared BufferBuilder can not be recycled.
> 
> Another raw idea is if we support one BufferBuilder shared by all the channels has different data regions for differnet partition indexes, that means one subpartition has non-continuous data distribution in the BufferBuilder. We can use skip index list to identify which regions belong to sepecific subpartiiton which can avoid finishing BufferBuilder during mode switch. But it seems more complicated with current process and i am not sure whether it has other performance concerns.
> 
> Wish any feedbacks for this issue, then i can further focus on it. 
> 
> Best,
> Zhijiang
> 
> ------------------------------------------------------------------
> 发件人:Zhijiang(wangzhijiang999) <wa...@aliyun.com.INVALID>
> 发送时间:2018年7月20日(星期五) 13:21
> 收件人:Piotr Nowojski <pi...@data-artisans.com>
> 抄 送:Nico Kruber <ni...@data-artisans.com>; dev <de...@flink.apache.org>
> 主 题:回复:[DISCUSS] Improve broadcast serialization
> 
> Ok, that is fine. :)
> 
> I will create JIRA today and submit the PR next week.
> 
> Zhijiang
> ------------------------------------------------------------------
> 发件人:Piotr Nowojski <pi...@data-artisans.com>
> 发送时间:2018年7月19日(星期四) 17:52
> 收件人:Zhijiang(wangzhijiang999) <wa...@aliyun.com>
> 抄 送:Nico Kruber <ni...@data-artisans.com>; dev <de...@flink.apache.org>
> 主 题:Re: [DISCUSS] Improve broadcast serialization
> 
> Hi,
> 
> I have only noticed your second response after sending my email :) 
> 
> Ok, now I think we are on the same page :) I think you can work on 2.1 and later on 2.2 if you will think that 2.1 is not enough. Once you create a Jira issues/PRs please CC me.
> 
> Piotrek  
> 
> On 19 Jul 2018, at 04:51, Zhijiang(wangzhijiang999) <wa...@aliyun.com> wrote:
> Hi Piotr
> 
> 1. I agree with we should discuss higher level first and focus on implementation on jira/pr. As long as RecordSerializer does not maintain the BufferBuilder, it can become stateless, then it can get BufferBuilderfrom the RecordWriter at any time.  And I think it is the precondition to improve serializing only once for multi channels, otherwise we have to select serializer based on target channel index.
> 
> 2. I already corrected this thought in last reply, maybe you have not seen it before you reply. :)  
> We can break the broadcast improvement into two steps:
> 2.1 Serialize the record into temporary byte buffer only once for multi selected channels. (currently serialize many times)
> 2.2 Copy the temporary byte buffer into BufferBuilder only once and create different BufferConsumers based on the same BufferBuilder for each channel. (currently copy many times)
> Regarding 2.1, just the same as your proposal[c], it is worth to do currently and can get good benefits I think.
> Regarding 2.2, considering mixed broadcast/non-broadcast writes, it has to flush/finish last broadcast BufferBuilder for current non-broadcast writes and vice versa. I agree with your proposal[2] for this issue, and we can further consider it in future, maybe there are other better ways for avoiding it.
> 
> 4. My previous thought is to realize both above 2.1 and 2.2. The 2.1 is your proposal[c] which has no problem for mixed write mode, so no need additional flush. The 2.2 is just as your proposal[2] which concerns additional flush. Maybe my last reply make you misunderstand.
> 
> I can submit jira for above 2.1 first if no other concerns, thanks for the helpful advice. :)
> 
> Best,
> 
> Zhijiang
> ------------------------------------------------------------------
> 发件人:Piotr Nowojski <pi...@data-artisans.com>
> 发送时间:2018年7月18日(星期三) 20:04
> 收件人:Zhijiang(wangzhijiang999) <wa...@aliyun.com>; Nico Kruber <ni...@data-artisans.com>
> 抄 送:dev <de...@flink.apache.org>
> 主 题:Re: [DISCUSS] Improve broadcast serialization
> 
> 
> Hi 
> 
> 1. I want to define a new AbstractRecordWriter as base class which defines some abstract methods and utility codes. The current RecordWriter used for other partitioner and new BroadcastRecordWriter used only for BroadcastPartitioner will both extend AbstractRecordWriter. The fields in BroadcastPartitioner are extactly as you showed below, but for current RecordWriter it also only needs one RecordSerializer if we make the RecordSerializer has no internal state.
> 
> Lets first discuss what we would like to have/implement on higher level and later focus on implementation details. Regarding making RecordSerializer stateless, there were some discussions about it previously and it was on our TODO list but I don’t remember what was holding us back. Maybe Nico will remember?
> 
> 
> 2. You pointed the key problem that how to handle `randomEmit` in BroadcastRecordWriter, and I think this process may resue the `emit` logic in current RecordWriter. Then the `emit` and `broadcastEmit` logics in BroadcastRecordWriter will serialize data only once and copy to BufferBuilder only once. So this improvement is deterministic for BroadcastPartitioner.
> 
> 
> What logic to reuse do you have in mind? 
> 4. As for 'broadcastEmit` improvement in RecordWriter for non-broadcast partitioner, we can also do as you suggested in option [2], but it has to finish/flush the previous BufferBuilder generated by common `emit` operation. So it may bring bad impacts on buffer utility which was improved well in event-driven flush feature. So I am not sure whether it is worth doing `broadcastEmit` improvement in RecordWriter.
> 
> 
> The whole point of my proposal [c] was to avoid the need to flush. Code would need a little bit more refactoring but it should look something like this:
> 
> void broadcastEmit(record):
>  serializedRecord = serializer.serialize(record)
>  for bufferBuilder in bufferBuilders:
>  bufferBuilder.append(serializedRecord)
>  // if we overfilled bufferBuilder, finish it, request new one and continue writing
> 
> void emit(record, channel)
>  serializedRecord = serializer.serialize(record)
>  bufferBuilders[channel].append(serializedRecord)
>  // if we overfilled bufferBuilder, finish it, request new one and continue writing
> 
> I do not see here a need for additional flushes and it should be strict improvement over current code base.
> 
> 
> I already realized the demo covering above 1,2,5 before. I can create jiras after we reach a final agreement, then maybe you can help review PR if have time. :)
> 
> 
> Sure :)
> 
> Piotrek
> 
> Best,
> 
> Zhijiang
> ------------------------------------------------------------------
> 发件人:Piotr Nowojski <pi...@data-artisans.com>
> 发送时间:2018年7月18日(星期三) 16:37
> 收件人:dev <de...@flink.apache.org>; Zhijiang(wangzhijiang999) <wa...@aliyun.com>
> 主 题:Re: [DISCUSS] Improve broadcast serialization
> 
> Hi,
> 
> Couple of more thoughts
> 
> a) I’m not sure if you would have to modify current RecordWriter at all. You could extract interface from current RecordWriter and just provide two implementations: current one and BroadcastRecordWriter. I’m not sure, but it doesn’t seem like they would duplicate/share lots of code. BroadcastRecordWriter would have fields:
> 
> private final RecordSerializer<T> serializers;
> 
> private final Optional<BufferBuilder> bufferBuilder;
> 
> Compared to RecordWriter’s arrays.
> 
> b) One thing that I noticed now are latency markers and randomEmit method. It prevents us from implementing option [1]. BroadcastRecordWriter would have to flush all channels on randomEmit (as I proposed in option [2]).
> 
> c) Another option to optimise broadcast writes (or for that matter all multi channel writes), would be to serialise record only once to SpanningRecordSerializer#serializationBuffer, but copy it multiple times to separate BufferBuilders. That would save us much more then half of the overhead (serialisation is more costly compared to data copying), while we would avoid problems with uneven state of channels. There would be no problems with mixed broadcast/non broadcast writes, this option could support both of them at the same time - in other words, it would be as generic as the current one.
> 
> d) Regarding StreamRecordWriter, other option is, that it could be refactored to a class implementing extracted RecordWriter interface and being a proxy/wrapper around another RecordWriter instance:
> 
> Class StreamRecordWriter implements RecordWriter {
>   private final RecordWrtier recordWriter; //either broadcast or non broadcast 
>   public void foo() {
>     recordWriter.foo();
>   }
> }
> 
> To be honest I’m not sure at the moment which one would be better [2] or [c]. In ideal world, we might want to replace current RecordWriter with [c] and after that (if that’s not enough) to implement [2] on top of [c]. 
> 
> Piotrek
> 
> > On 18 Jul 2018, at 05:36, Zhijiang(wangzhijiang999) <wa...@aliyun.com.INVALID> wrote:
> > 
> > Hi Piotr,
> > 
> > Thanks for your replies and professional suggestions!
> > 
> > My initial thought is just as you said in first suggestion. The current RecordWriter will emit StreamRecord to some subpartition via ChannelSelector or broadcast events/watermark to all subpartitions directly.
> > If the ChannelSelector implementation is BroadcastPartitioner, then we can create a specialized BroadcastRecordWriter to handle the 'emit', 'broadcastEmit', 'broadcastEvent', etc.
> > To make it seems not tricky, I want to abstract the RecordWriter as a plugin, then implement a BroadcastRecordWriter and NonBroadcastRecordWriter separately to extend abstract RecordWriter. That means we divide the RecordWriter by ChannelSelector, and also we may remove current StreamRecordWriter to uniform the RecordWriter criteria in both stream and batch mode.
> > 
> > Considering specific implementations, I think one RecordSerializer can work for both BroadcastRecordWriter and NonBroadcastRecordWriter, but the precondition is making the RecordSerializer has no internal state, so we have to remove the BufferBuilder variable from SpanningRecordSerializer and pass it via addRecord/continueWritingWithNextBufferBuilder
> > methods from RecordWriter. BroadcastRecordWriter only needs maintain one BufferBuilder for all subpartitions, and NonBroadcastRecordWriter may need maintain one BufferBuilder per subpartition.
> > 
> > Another issue is whether this improvement is suitable for broadcastEmit(watermark) in NonBroadcastRecordWriter as you said in suggestion 2,3. I wonder it may decrease the buffer utilization if switch between broadcast and non-broadcast modes, even it may seem more tricky in implementation. I am still thinking of it.
> > 
> > Maybe we can implement the improvement for BroadcastPartitioner in first step and make sure one RecordSerializer for all subpartitions. That can reduce the memory overhead in RecordSerializer and the time cost in broadcast serialization scenarios.
> > 
> > Best,
> > 
> > Zhijiang
> > 
> > 
> > ------------------------------------------------------------------
> > 发件人:Piotr Nowojski <pi...@data-artisans.com>
> > 发送时间:2018年7月17日(星期二) 23:31
> > 收件人:dev <de...@flink.apache.org>; Zhijiang(wangzhijiang999) <wa...@aliyun.com>
> > 主 题:Re: [DISCUSS] Improve broadcast serialization
> > 
> > Hi
> > 
> > Generally speaking this would be a nice optimisation, however it might be tricky to implement. The thing to keep in mind is that currently interface allow to interleave broadcasting and normal sending, because of that at any given time some serialisers can have more data then others. For example when we have two output channels and we are looping following writes:
> > 
> > Write sth to 1. Channel
> > Broadcast to all channels
> > Write sth to 1. Channel
> > Broadcast to all channels
> > Write sth to 1. Channel
> > Broadcast to all channels
> > (…)
> > 
> > Thus buffers of different channels can fill out with different rates.
> > 
> >> In theory every record can be serialized only once and referenced for all the subpartitions in broadcast mode.
> > 
> > The problem here is that after records serialising, the only unit that can be referenced afterwards is “Buffer”. So that would leave us now with couple of options:
> > 
> > 1. Create a specialised BroadcastRecordWriter that supports ONLY broadcasting, guaranteeing that all channels always receive the same data. Here you could serialise records only once, to one BufferBuilder that could be shared and referenced by multiple BufferConsumers from different channels. Any non broadcast write would have to fail.
> > 
> > 2. Similar as above, but specialised in MOSTLY broadcasting. Operate as in 1. for broadcasts, but for any non broadcast write: finish current broadcasting BufferBuilder, flush all data on all channels, serialise single record to single channel using newly create BufferBuilder and also immediately finish/flush it, so that any subsequent broadcasts will work again as in 1.:
> > 
> > 3. Similar as 2, but lazily switch between broadcasting and non-broadcasting modes. It would have two modes of operating that could be switched back and forth: the same as currently implemented for non-broadcasted and optimised broadcast mode
> > 
> > Broadcast to all channels
> > Broadcast to all channels
> > Broadcast to all channels
> > Broadcast to all channels
> > Write sth to X Channel // this flushes all channels and clears/finishes previous BufferBuilder 
> > Write sth to Y Channel
> > Write sth to Y Channel
> > Write sth to Y Channel
> > Write sth to X Channel 
> > Broadcast to all channels // this flushes all channels and clears/finishes previous BufferBuilders, 
> > Broadcast to all channels
> > Broadcast to all channels
> > (…)
> > 
> > However both in 2. and 3. there would be very big penalty for mixing broadcast with normal writes.  
> > 
> > Piotrek
> > 
> >> On 13 Jul 2018, at 09:44, Zhijiang(wangzhijiang999) <wa...@aliyun.com.INVALID> wrote:
> >> 
> >> Hi all,
> >> 
> >> In current implementation, the RecordSerializer is created separately for each subpartition in RecordWriter, that means the number of serializers equals to the number of subpartitions.
> >> For broadcast partitioner, every record will be serialized many times in all the subpartitions, and this may bring bad performance to some extent.
> >> In theory every record can be serialized only once and referenced for all the subpartitions in broadcast mode.
> >> 
> >> To do so, I propose the following changes:
> >> 1. Create and maintain only one serializer in RecordWriter, and it will serialize the record for all the subpartitions. It makes sense for any partitioners, and the memory overhead can be also decreased, because every serializer will maintain some separate byte buffers internally.
> >> 2. Maybe we can abstract the RecordWriter as a base class used for other partitioner mode and implement a BroadcastRecordWriter for BroadcastPartitioner. And this new implementation will add buffer references based on the number of subpartitions before adding into subpartition queue.
> >> 3. Maybe we can remove StreamRecordWriter by migrating flusher from it to RecordWriter, then the uniform RecordWriter can be used for both stream and batch. The above BroadcastRecordWriter can aslo uniform for both stream and batch.
> >> 
> >> I am not sure whether this improvement is proposed before and what do you think of it?
> >> If necessary I can create JIRAs to contirbute it, and may need one commiter cooperate with me.
> >> 
> >> Best,
> >> 
> >> Zhijiang
> > 
> 
> 
> 
> 
> 
>