You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Echo Lee (Jira)" <ji...@apache.org> on 2021/01/15 02:52:00 UTC

[jira] [Commented] (FLINK-20228) ESsink accur thread blocked

    [ https://issues.apache.org/jira/browse/FLINK-20228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17265650#comment-17265650 ] 

Echo Lee commented on FLINK-20228:
----------------------------------

[~liqi316] You can look at [ISSUE-47599|https://github.com/elastic/elasticsearch/issues/47599] .

In versions of Elasticsearch prior to 7.3.0 can result in a deadlock. Using flink-connector-elasticsearch7 should solve the problem.

> ESsink accur thread blocked
> ---------------------------
>
>                 Key: FLINK-20228
>                 URL: https://issues.apache.org/jira/browse/FLINK-20228
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / ElasticSearch
>    Affects Versions: 1.9.1
>            Reporter: liqi316
>            Priority: Blocker
>             Fix For: 1.9.1
>
>
> I will write the data to es(es version is 5.6.8. and flink use flink-connector-elasticsearch6), the task can run good at begin.But,after a while, the checkpoint will failed,and there are no any exception in taskmanager log .I get the jvm stack info as follow:
> java.lang.Thread.State: BLOCKED (on object monitor)
>  at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:770)
>  - waiting to lock <0x0000000713f7fef8> (a java.lang.Object)
>  at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:706)
>  at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.triggerCheckpoint(SourceStreamTask.java:184)
>  at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1169)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> "Legacy Source Thread - Source: Custom Source -> Map -> SourceConversion(table=[Unregistered_DataStream_2], fields=[C01, C02, C03, C04, C05, C06, C07, C08, C09, C10, C11, C12, C13, C14, C15, C16, C17, C18, C1
>  9, C20, C21, C22, C23, C24, C25, C26, C27, C28, C29, C30, C31, C32, C33, C34, C35, C36, C37, C38, C39, C40, C41, C42, C43, C44, C45, C46, C47, C48, C49, C50, C51, C52, C53, C54, C55, C56, C57, C58, C59, C60, 
>  C61, C62, C63, C64, C65, C66, C67, C68, C69, C70, C71, C72, topic, record_timestamp, key, partition, offset, rawmessage, isvalidate, proctime]) -> Calc(select=[C01, C02, C03, C04, C05, C06, C07, C08, C09, C10
>  , C11, C12, C13, (C14 CONCAT _UTF-16LE' ' CONCAT C15) AS C14, C15, C16, C17, C18, C19, C20, C21, C22, C23, C24, C25, C26, C27, C28, C29, C30, C31, C32, C33, C34, C35, C36, C37, C38, C39, C40, C41, C42, C43, C
>  44, C45, C46, C47, C48, C49, C50, C51, C52, C53, C54, C55, C56, C57, C58, C59, C60, C61, C62, C63, C64, C65, C66, C67, C68, C69, C70, C71, C72]) -> SinkConversionToTuple2 -> Filter -> Map -> SourceConversion(
>  table=[Unregistered_DataStream_7], fields=[C01, C02, C03, C04, C05, C06, C07, C08, C09, C10, C11, C12, C13, C14, C15, C16, C17, C18, C19, C20, C21, C22, C23, C24, C25, C26, C27, C28, C29, C30, C31, C32, C33, 
>  C34, C35, C36, C37, C38, C39, C40, C41, C42, C43, C44, C45, C46, C47, C48, C49, C50, C51, C52, C53, C54, C55, C56, C57, C58, C59, C60, C61, C62, C63, C64, C65, C66, C67, C68, C69, C70, C71, C72]) -> SinkConve
>  rsionToRow -> Sink: Elasticsearch (1/6)" #91 prio=5 os_prio=0 tid=0x00007f3e7d096800 nid=0xe050 waiting on condition [0x00007f3e41393000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x000000071f04f2d8> (a java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>  at org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:86)
>  at org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:339)
>  at org.elasticsearch.action.bulk.BulkProcessor.executeIfNeeded(BulkProcessor.java:330)
>  at org.elasticsearch.action.bulk.BulkProcessor.internalAdd(BulkProcessor.java:288)
>  - locked <0x0000000713d8c188> (a org.elasticsearch.action.bulk.BulkProcessor)
>  at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:271)
>  at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:267)
>  at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:253)
>  at org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6BulkProcessorIndexer.add(Elasticsearch6BulkProcessorIndexer.java:72)
>  at io.github.interestinglab.waterdrop.flink.sink.function.ElasticsearchRichSink.process(ElasticsearchRichSink.java:119)
>  at io.github.interestinglab.waterdrop.flink.sink.function.ElasticsearchRichSink.process(ElasticsearchRichSink.java:25)
>  at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:308)
>  at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>  at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>  at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>  at SinkConversion$236.processElement(Unknown Source)
>  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>  at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>  at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>  at SourceConversion$234.processElement(Unknown Source)
>  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>  at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>  at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>  at SourceConversion$234.processElement(Unknown Source)
>  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>  at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>  at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>  at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>  at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>  at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>  at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
>  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>  at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>  at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>  at SinkConversion$232.processElement(Unknown Source)
>  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>  at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>  at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>  at StreamExecCalc$229.processElement(Unknown Source)
>  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>  at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>  at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>  at SourceConversion$80.processElement(Unknown Source)
>  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>  at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>  at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>  at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>  at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>  at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>  at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>  - locked <0x0000000713f7fef8> (a java.lang.Object)
>  at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
>  at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
>  - locked <0x0000000713f7fef8> (a java.lang.Object)
>  at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)