You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Flink Jira Bot (Jira)" <ji...@apache.org> on 2021/10/30 10:41:00 UTC

[jira] [Updated] (FLINK-20579) eash es sink will have

     [ https://issues.apache.org/jira/browse/FLINK-20579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Flink Jira Bot updated FLINK-20579:
-----------------------------------
    Labels: auto-deprioritized-major stale-minor  (was: auto-deprioritized-major)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Minor but is unassigned and neither itself nor its Sub-Tasks have been updated for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is still Minor, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized.


> eash es sink will have 
> -----------------------
>
>                 Key: FLINK-20579
>                 URL: https://issues.apache.org/jira/browse/FLINK-20579
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / ElasticSearch
>            Reporter: donglei
>            Priority: Minor
>              Labels: auto-deprioritized-major, stale-minor
>
> BulkProcessorListener beforebulk must have the same route  to speed up write to es
>  
> As we know bulk with same route will send to es only one node and with one netio one disk io  so every 
>  
> !http://km.oa.com/files/photos/captures/202007/1593922902_79_w1275_h710.png!
>  
> Therefore, we take the following method. The beforeBulk in ElasticsearchSinkBase writes the same bulk according to the same batch.  like this,
> private class BulkProcessorListener implements BulkProcessor.Listener {
> @Override
> public void beforeBulk(long executionId, BulkRequest request) {
> if (routePreBulk) {//Need to verify, whether there is a route set upstream
> String routing = UUID.randomUUID() + "_" + executionId;
> List<ActionRequest> requests = request.requests();
> requests.forEach(x -> {
> ((IndexRequest) x).routing(routing);
> });
> LOG.info("start bulk actions: {}, routing: {}", request.numberOfActions(), routing);
> }
> }
> The advantage of this is that when there are many es fragments later, because every bulk has the same route sent to the same es node, it saves es data splitting time and data landing time, and improves es performance.
> Preliminary estimates, this part can improve the performance of more than 2 times.
> The discussion points here are:
> Q: can we use  keyby and with same route value
> A: Since we use this function to improve performance, setting the same route value after upstream keyby cannot guarantee that all data will be sent in one batch, such as 1w data and one route value, but there is no guarantee that 1w data will be in the same batch. .
> Q: How to judge whether to add route value
> A: Since oceanus cannot provide an external API interface, it is recommended to sample here, for example, to see if there is a route in a batch, if there are none, think that this sink does not need a route value.
> Q: Is the data uniform
> A: we has been running for a long time. In this setting, because bulk is route value is uniform, es data is uniform
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)