You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 桑普拉斯 <de...@foxmail.com> on 2020/02/11 03:24:41 UTC
Flink sink到ElasticSearch失败
最简单的sink to ElasticSearch场景,程序运行没有报错,但是ES里就是没写进去数据。
源代码如下:
package etl.estest;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import java.util.*;
public class EsTest1 {
public static void main(String[] args) throws Exception {
test2();
}
private static void test2() throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Properties properties = new Properties();
properties.put("bootstrap.servers","10.67.18.100:9092");
properties.put("zookeeper.connect","10.67.18.100:2180");
properties.put("group.id","test-consumer-group");
FlinkKafkaConsumer<String> pas = new FlinkKafkaConsumer<String>("nms.pas", new SimpleStringSchema(), properties);
DataStreamSource<String> pas_stream = env.addSource(pas);
pas_stream.print();
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("10.67.18.100", 9310, "http"));
ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);
return Requests.indexRequest()
.index("pas-meeting-data-2020.1.17")
.type("_doc")
.source(json);
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}
);
pas_stream.addSink(esSinkBuilder.build());
env.execute("sss");
}
}