You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 桑普拉斯 <de...@foxmail.com> on 2020/02/11 03:24:41 UTC

Flink sink到ElasticSearch失败

最简单的sink to ElasticSearch场景,程序运行没有报错,但是ES里就是没写进去数据。


源代码如下:


package etl.estest;

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;

import java.util.*;

public class EsTest1 {
&nbsp;&nbsp;&nbsp; public static void main(String[] args) throws Exception {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; test2();
&nbsp;&nbsp;&nbsp; }
&nbsp;&nbsp;&nbsp; private static void test2() throws Exception{
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; env.setParallelism(1);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Properties properties = new Properties();
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; properties.put("bootstrap.servers","10.67.18.100:9092");
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; properties.put("zookeeper.connect","10.67.18.100:2180");
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; properties.put("group.id","test-consumer-group");
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; FlinkKafkaConsumer<String&gt; pas = new FlinkKafkaConsumer<String&gt;("nms.pas", new SimpleStringSchema(), properties);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; DataStreamSource<String&gt; pas_stream = env.addSource(pas);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; pas_stream.print();

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; List<HttpHost&gt; httpHosts = new ArrayList<&gt;();
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; httpHosts.add(new HttpHost("10.67.18.100", 9310, "http"));
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ElasticsearchSink.Builder<String&gt; esSinkBuilder = new ElasticsearchSink.Builder<&gt;(
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; httpHosts,
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; new ElasticsearchSinkFunction<String&gt;() {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; public IndexRequest createIndexRequest(String element) {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Map<String, String&gt; json = new HashMap<&gt;();
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; json.put("data", element);

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return Requests.indexRequest()
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .index("pas-meeting-data-2020.1.17")&nbsp;&nbsp;&nbsp; 
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .type("_doc")
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .source(json);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; @Override
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; indexer.add(createIndexRequest(element));
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; );
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; pas_stream.addSink(esSinkBuilder.build());
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; env.execute("sss");
&nbsp;&nbsp;&nbsp; }
}