You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Yuze Wei <yu...@outlook.com> on 2023/04/17 05:26:07 UTC
Flink Streaming API ElasticSearch Connector 长连接
各位大佬好!
我在使用Flink ES连接器的时候,有时候报以下错误:
Caused by: java.io.IOException
breakpoint : 远程主机强迫关闭了一个现有的连接
初步判断,应该是没有维持住长连接保活,所以如果一段时间不写入数据,连接就断了。
请问各位大佬,ElasticSearch Connector 有什么参数可以维持长连接吗?
ElasticSearch Connector 代码如下:
jsonStringStream
.sinkTo(
new Elasticsearch7SinkBuilder<String>()
// Instructs the sink to emit after every Nth buffered element
.setBulkFlushMaxActions(1)
.setHosts(
new HttpHost(
Conn.getInstance().getProp("elasticsearch.hosts"),
Integer.parseInt(
Conn.getInstance().getProp("elasticsearch.port")
),
Conn.getInstance().getProp("elasticsearch.scheme")
)
)
.setEmitter(
(page, context, indexer) ->
indexer.add(
new IndexRequest(Conn.getInstance().getProp("elasticsearch.index.page"))
.source(page, XContentType.JSON)
)
)
.setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000)
.build());
报错如下:
[cid:71c0dada-02b0-4a0d-b16f-97bb0d65167f]
多谢指教!