You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 王佩 <wa...@cmcm.com> on 2019/09/18 01:49:41 UTC

Flink DataStream 用ElasticsearchSink 写ES ConnectionClosedException异常

在Flink 写ES,当ES集群繁忙时,会有如下异常:

2019-09-17 16:01:02 ERROR
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase
afterBulk 430 Failed Elasticsearch bulk request: Connection closed
org.apache.http.ConnectionClosedException: Connection closed
at
org.apache.http.nio.protocol.HttpAsyncRequestExecutor.endOfInput(HttpAsyncRequestExecutor.java:345)
at
org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:261)
at
org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
at
org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
at
org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
at
org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
at
org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
at
org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
at
org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
at
org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at
org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
at java.lang.Thread.run(Thread.java:745)
2019-09-17 16:01:02 ERROR
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase
afterBulk 430 Failed Elasticsearch bulk request: Connection closed
org.apache.http.ConnectionClosedException: Connection closed
at
org.apache.http.nio.protocol.HttpAsyncRequestExecutor.endOfInput(HttpAsyncRequestExecutor.java:345)
at
org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:261)
at
org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
at
org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
at
org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
at
org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
at
org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
at
org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
at
org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
at
org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at
org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
at java.lang.Thread.run(Thread.java:745)

Flink 用ElasticsearchSink 写ES时遇到这种异常后,不会自动恢复连接。

目前已知:

ElasticsearchSink中的异常处理,需要通过ElasticsearchSink的setFailureHandler方法来定义各种失败处理方式(如丢弃数据或将数据重新加入队列),但这里不能实现当ConnectionClosedException时,重新打开ES连接。

希望Flink遇到这种异常后,能自动重新打开ES的连接,该如何实现?

感谢