You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "xueyu (JIRA)" <ji...@apache.org> on 2019/01/14 15:09:00 UTC

[jira] [Commented] (FLINK-11046) ElasticSearch6Connector cause thread blocked when index failed with retry

    [ https://issues.apache.org/jira/browse/FLINK-11046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16742179#comment-16742179 ] 

xueyu commented on FLINK-11046:
-------------------------------

Hi, [~dawidwys], have you worked on this issue? If not could you please assign this issue to me..I investigated this issue these two days and have some ideas to fix it and wanted to have a try on it. According to [~tzulitai] comments, my thought was writing a new RequestIndexer which uses BulkRequest to buffer action requests. I was a little late to assign it... Thanks~

> ElasticSearch6Connector cause thread blocked when index failed with retry
> -------------------------------------------------------------------------
>
>                 Key: FLINK-11046
>                 URL: https://issues.apache.org/jira/browse/FLINK-11046
>             Project: Flink
>          Issue Type: Bug
>          Components: ElasticSearch Connector
>    Affects Versions: 1.6.2
>            Reporter: luoguohao
>            Assignee: Dawid Wysakowicz
>            Priority: Major
>
> When i'm using es6 sink to index into es, bulk process with some exception catched, and  i trying to reindex the document with the call `indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()` method, but things goes incorrect. The call thread stuck there, and with the thread dump, i saw the `bulkprocessor` object was locked by other thread. 
> {code:java}
> public interface ActionRequestFailureHandler extends Serializable {
>  void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable;
> }
> {code}
> After i read the code implemented in the `indexer.add(action)`, i find that `synchronized` is needed on each add operation.
> {code:java}
> private synchronized void internalAdd(DocWriteRequest request, @Nullable Object payload) {
>   ensureOpen();
>   bulkRequest.add(request, payload);
>   executeIfNeeded();
> }
> {code}
> And, at i also noticed that `bulkprocessor` object would also locked in the bulk process thread. 
> the bulk process operation is in the following code:
> {code:java}
> public void execute(BulkRequest bulkRequest, long executionId) {
>     Runnable toRelease = () -> {};
>     boolean bulkRequestSetupSuccessful = false;
>     try {
>         listener.beforeBulk(executionId, bulkRequest);
>         semaphore.acquire();
>         toRelease = semaphore::release;
>         CountDownLatch latch = new CountDownLatch(1);
>         retry.withBackoff(consumer, bulkRequest, new ActionListener<BulkResponse>() {
>             @Override
>             public void onResponse(BulkResponse response) {
>                 try {
>                     listener.afterBulk(executionId, bulkRequest, response);
>                 } finally {
>                     semaphore.release();
>                     latch.countDown();
>                 }
>             }
>             @Override
>             public void onFailure(Exception e) {
>                 try {
>                     listener.afterBulk(executionId, bulkRequest, e);
>                 } finally {
>                     semaphore.release();
>                     latch.countDown();
>                 }
>             }
>         }, Settings.EMPTY);
>         bulkRequestSetupSuccessful = true;
>        if (concurrentRequests == 0) {
>            latch.await();
>         }
>     } catch (InterruptedException e) {
>         Thread.currentThread().interrupt();
>         logger.info(() -> new ParameterizedMessage("Bulk request {} has been cancelled.", executionId), e);
>         listener.afterBulk(executionId, bulkRequest, e);
>     } catch (Exception e) {
>         logger.warn(() -> new ParameterizedMessage("Failed to execute bulk request {}.", executionId), e);
>         listener.afterBulk(executionId, bulkRequest, e);
>     } finally {
>         if (bulkRequestSetupSuccessful == false) {  // if we fail on client.bulk() release the semaphore
>             toRelease.run();
>         }
>     }
> }
> {code}
> As the read line i marked above, i think, that's the reason why the retry operation thread was block, because the the bulk process thread never release the lock on `bulkprocessor`.  and, i also trying to figure out why the field `concurrentRequests` was set to zero. And i saw the the initialize for bulkprocessor in class `ElasticsearchSinkBase`:
> {code:java}
> protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) {
>  ...
>  BulkProcessor.Builder bulkProcessorBuilder =      callBridge.createBulkProcessorBuilder(client, listener);
>  // This makes flush() blocking
>  bulkProcessorBuilder.setConcurrentRequests(0);
>  
>  ...
>  return bulkProcessorBuilder.build();
> }
> {code}
>  this field value was set to zero explicitly. So, all things seems to make sense, but i still wonder why the retry operation is not in the same thread as the bulk process execution, after i read the code, `bulkAsync` method might be the last puzzle.
> {code:java}
> @Override
> public BulkProcessor.Builder createBulkProcessorBuilder(RestHighLevelClient client, BulkProcessor.Listener listener) {
>  return BulkProcessor.builder(client::bulkAsync, listener);
> }
> {code}
> So, I hope someone can help to fix this problem, or given some suggestions, and also i can make a try to take it. 
>  Thanks a lot !



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)