You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "donglei (Jira)" <ji...@apache.org> on 2020/12/14 01:29:00 UTC

[jira] [Commented] (FLINK-20579) eash es sink will have

    [ https://issues.apache.org/jira/browse/FLINK-20579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17248708#comment-17248708 ] 

donglei commented on FLINK-20579:
---------------------------------

因此,我们采取的如下的方法,在ElasticsearchSinkBase中的beforeBulk根据同一个批次,写入同一个bulk。

private class BulkProcessorListener implements BulkProcessor.Listener \{
	@Override
	public void beforeBulk(long executionId, BulkRequest request) {

		if (routePreBulk) {//需要验证下,是否上游有设置route
			String routing = UUID.randomUUID() + "_" + executionId;
			List<ActionRequest> requests = request.requests();
			requests.forEach(x -> {
				((IndexRequest) x).routing(routing);
			});
			LOG.info("start bulk actions: {}, routing: {}", request.numberOfActions(), routing);
		}
	}
{{}}

这样做的好处是,后面es分片多的时候,由于每次的bulk都有同一个route都发到同一个es node,节省es数据拆分时间和数据落地时间,提升es性能。
初步估计,这部分能提升2倍以上的性能。

此处的讨论点是:
 # 上游keyby是否可以
由于我们是采用此功能提升性能的,上游keyby之后设置同一个route值,无法保证所有的数据都在一个批次发送,如1w条数据一个route值,但是无法保证1w条数据刚好在同一批次。
 # 怎么样判断是否要加route值
由于oceanus不能提供对外的API接口,建议此处采样,比如看一个批次有没有route,如果都没有,认为此sink不需要route值。
 # 数据是否均匀
鹰眼运行很久,这种设置,由于bulk基本均匀,es数据均匀

> eash es sink will have 
> -----------------------
>
>                 Key: FLINK-20579
>                 URL: https://issues.apache.org/jira/browse/FLINK-20579
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: donglei
>            Priority: Major
>
> BulkProcessorListener beforebulk must have the same route  to speed up write to es
>  
> As we know bulk with same route will send to es only one node and with one netio one disk io  so every 
>  
> !http://km.oa.com/files/photos/captures/202007/1593922902_79_w1275_h710.png!
>  
> Therefore, we take the following method. The beforeBulk in ElasticsearchSinkBase writes the same bulk according to the same batch.  like this,
> private class BulkProcessorListener implements BulkProcessor.Listener {
> @Override
> public void beforeBulk(long executionId, BulkRequest request) {
> if (routePreBulk) {//Need to verify, whether there is a route set upstream
> String routing = UUID.randomUUID() + "_" + executionId;
> List<ActionRequest> requests = request.requests();
> requests.forEach(x -> {
> ((IndexRequest) x).routing(routing);
> });
> LOG.info("start bulk actions: {}, routing: {}", request.numberOfActions(), routing);
> }
> }
> The advantage of this is that when there are many es fragments later, because every bulk has the same route sent to the same es node, it saves es data splitting time and data landing time, and improves es performance.
> Preliminary estimates, this part can improve the performance of more than 2 times.
> The discussion points here are:
> Q: can we use  keyby and with same route value
> A: Since we use this function to improve performance, setting the same route value after upstream keyby cannot guarantee that all data will be sent in one batch, such as 1w data and one route value, but there is no guarantee that 1w data will be in the same batch. .
> Q: How to judge whether to add route value
> A: Since oceanus cannot provide an external API interface, it is recommended to sample here, for example, to see if there is a route in a batch, if there are none, think that this sink does not need a route value.
> Q: Is the data uniform
> A: we has been running for a long time. In this setting, because bulk is route value is uniform, es data is uniform
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)