You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Yuze Wei <yu...@outlook.com> on 2023/04/17 05:26:07 UTC

Flink Streaming API ElasticSearch Connector 长连接

各位大佬好!

我在使用Flink ES连接器的时候,有时候报以下错误:

Caused by: java.io.IOException
breakpoint : 远程主机强迫关闭了一个现有的连接

初步判断,应该是没有维持住长连接保活,所以如果一段时间不写入数据,连接就断了。

请问各位大佬,ElasticSearch Connector 有什么参数可以维持长连接吗?

ElasticSearch Connector 代码如下:

jsonStringStream
        .sinkTo(
                new Elasticsearch7SinkBuilder<String>()
                        // Instructs the sink to emit after every Nth buffered element
                        .setBulkFlushMaxActions(1)
                        .setHosts(
                                new HttpHost(
                                        Conn.getInstance().getProp("elasticsearch.hosts"),
                                        Integer.parseInt(
                                                Conn.getInstance().getProp("elasticsearch.port")
                                        ),
                                        Conn.getInstance().getProp("elasticsearch.scheme")
                                )
                        )
                        .setEmitter(
                                (page, context, indexer) ->
                                        indexer.add(
                                                new IndexRequest(Conn.getInstance().getProp("elasticsearch.index.page"))
                                                        .source(page, XContentType.JSON)
                                        )
                        )
                        .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000)
                        .build());

报错如下:
[cid:71c0dada-02b0-4a0d-b16f-97bb0d65167f]

多谢指教!