You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "luoguohao (JIRA)" <ji...@apache.org> on 2018/12/02 07:19:00 UTC
[jira] [Created] (FLINK-11046) ElasticSearch6Connector cause thread
blocked when index failed with retry
luoguohao created FLINK-11046:
---------------------------------
Summary: ElasticSearch6Connector cause thread blocked when index failed with retry
Key: FLINK-11046
URL: https://issues.apache.org/jira/browse/FLINK-11046
Project: Flink
Issue Type: Bug
Components: ElasticSearch Connector
Affects Versions: 1.6.2
Reporter: luoguohao
When i'm using es6 sink to index into es, bulk process with some exception catched, and i trying to reindex the document with the call `indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()` method, but things goes incorrect. The call thread stuck there, and with the thread dump, i saw the `bulkprocessor` object was locked by other thread.
{code:java}
public interface ActionRequestFailureHandler extends Serializable {
void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable;
}
{code}
After i read the code implemented in the `indexer.add(action)`, i find that `synchronized` is needed on each add operation.
{code:java}
private synchronized void internalAdd(DocWriteRequest request, @Nullable Object payload) {
ensureOpen();
bulkRequest.add(request, payload);
executeIfNeeded();
}
{code}
And, at i also noticed that `bulkprocessor` object would also locked in the bulk process thread.
the bulk process operation is in the following code:
{code:java}
public void execute(BulkRequest bulkRequest, long executionId) {
Runnable toRelease = () -> {};
boolean bulkRequestSetupSuccessful = false;
try {
listener.beforeBulk(executionId, bulkRequest);
semaphore.acquire();
toRelease = semaphore::release;
CountDownLatch latch = new CountDownLatch(1);
retry.withBackoff(consumer, bulkRequest, new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse response) {
try {
listener.afterBulk(executionId, bulkRequest, response);
} finally {
semaphore.release();
latch.countDown();
}
}
@Override
public void onFailure(Exception e) {
try {
listener.afterBulk(executionId, bulkRequest, e);
} finally {
semaphore.release();
latch.countDown();
}
}
}, Settings.EMPTY);
bulkRequestSetupSuccessful = true;
if (concurrentRequests == 0) {
latch.await();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.info(() -> new ParameterizedMessage("Bulk request {} has been cancelled.", executionId), e);
listener.afterBulk(executionId, bulkRequest, e);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("Failed to execute bulk request {}.", executionId), e);
listener.afterBulk(executionId, bulkRequest, e);
} finally {
if (bulkRequestSetupSuccessful == false) { // if we fail on client.bulk() release the semaphore
toRelease.run();
}
}
}
{code}
As the read line i marked above, i think, that's the reason why the retry operation thread was block, because the the bulk process thread never release the lock on `bulkprocessor`. and, i also trying to figure out why the field `concurrentRequests` was set to zero. And i saw the the initialize for bulkprocessor in class `ElasticsearchSinkBase`:
{code:java}
protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) {
...
BulkProcessor.Builder bulkProcessorBuilder = callBridge.createBulkProcessorBuilder(client, listener);
// This makes flush() blocking
bulkProcessorBuilder.setConcurrentRequests(0);
...
return bulkProcessorBuilder.build();
}
{code}
this field value was set to zero explicitly. So, all things seems to make sense, but i still wonder why the retry operation is not in the same thread as the bulk process execution, after i read the code, `bulkAsync` method might be the last puzzle.
{code:java}
@Override
public BulkProcessor.Builder createBulkProcessorBuilder(RestHighLevelClient client, BulkProcessor.Listener listener) {
return BulkProcessor.builder(client::bulkAsync, listener);
}
{code}
So, I hope someone can help to fix this problem, or given some suggestions, and also i can make a try to take it.
Thanks a lot !
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)