You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Chesnay Schepler (JIRA)" <ji...@apache.org> on 2018/01/23 14:45:00 UTC
[jira] [Commented] (FLINK-8489) Data is not emitted by second
ElasticSearch connector
[ https://issues.apache.org/jira/browse/FLINK-8489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16335857#comment-16335857 ]
Chesnay Schepler commented on FLINK-8489:
-----------------------------------------
What happens if you provide separate config maps to each sink? From a quick skim it appears that the sink may remove properties from them.
> Data is not emitted by second ElasticSearch connector
> -----------------------------------------------------
>
> Key: FLINK-8489
> URL: https://issues.apache.org/jira/browse/FLINK-8489
> Project: Flink
> Issue Type: Bug
> Components: ElasticSearch Connector
> Affects Versions: 1.4.0
> Reporter: Fabian Hueske
> Priority: Critical
>
> A user reported [this issue|https://lists.apache.org/thread.html/e91c71beb45d6df879effa16c52f2c71aa6ef1a54ef0a8ac4ccc72ee@%3Cuser.flink.apache.org%3E] on the user@f.a.o mailing list.
> *Setup:*
> * A program with two pipelines that write to ElasticSearch. The pipelines can be connected or completely separate.
> * ElasticSearch 5.6.4, connector {{flink-connector-elasticsearch5_2.11}}
> *Problem:*
> Only one of the ES connectors correctly emits data. The other connector writes a single record and then stops emitting data (or does not write any data at all). The problem does not exist, if the second ES connector is replaced by a different connector (for example Cassandra).
> Below is a program to reproduce the issue:
> {code:java}
> public class ElasticSearchTest1 {
> public static void main(String[] args) throws Exception {
>
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>
> // set elasticsearch connection details
> Map<String, String> config = new HashMap<>();
> config.put("bulk.flush.max.actions", "1");
> config.put("cluster.name", "<cluster name>");
> List<InetSocketAddress> transports = new ArrayList<>();
> transports.add(new InetSocketAddress(InetAddress.getByName("<host ip>"), 9300));
>
> //Set properties for Kafka Streaming
> Properties properties = new Properties();
> properties.setProperty("bootstrap.servers", "<host ip>"+":9092");
> properties.setProperty("group.id", "testGroup");
> properties.setProperty("auto.offset.reset", "latest");
>
> //Create consumer for log records
>
> FlinkKafkaConsumer011 inputConsumer1 = new FlinkKafkaConsumer011<>("elastic_test1", new JSONDeserializationSchema(), properties);
>
> DataStream<RecordOne> firstStream = env
> .addSource(inputConsumer1)
> .flatMap(new CreateRecordOne());
>
> firstStream
> .addSink(new ElasticsearchSink<RecordOne>(config,
> transports,
> new ElasticSearchOutputRecord("elastic_test_index1","elastic_test_index1")));
>
> FlinkKafkaConsumer011 inputConsumer2 = new FlinkKafkaConsumer011<>("elastic_test2", new JSONDeserializationSchema(), properties);
>
> DataStream<RecordTwo> secondStream = env
> .addSource(inputConsumer2)
> .flatMap(new CreateRecordTwo());
>
> secondStream
> .addSink(new ElasticsearchSink<RecordTwo>(config,
> transports,
> new ElasticSearchOutputRecord2("elastic_test_index2","elastic_test_index2")));
>
> env.execute("Elastic Search Test");
> }
> }
> public class ElasticSearchOutputRecord implements ElasticsearchSinkFunction<RecordOne> {
> String index;
> String type;
> // Initialize filter function
> public ElasticSearchOutputRecord(String index, String type) {
> this.index = index;
> this.type = type;
> }
> // construct index request
> @Override
> public void process(
> RecordOne record,
> RuntimeContext ctx,
> RequestIndexer indexer) {
> // construct JSON document to index
> Map<String, String> json = new HashMap<>();
>
> json.put("item_one", record.item1);
> json.put("item_two", record.item2);
>
> IndexRequest rqst = Requests.indexRequest()
> .index(index) // index name
> .type(type) // mapping name
> .source(json);
> indexer.add(rqst);
> }
> }
> public class ElasticSearchOutputRecord2 implements ElasticsearchSinkFunction<RecordTwo> {
> String index;
> String type;
> // Initialize filter function
> public ElasticSearchOutputRecord2(String index, String type) {
> this.index = index;
> this.type = type;
> }
> // construct index request
> @Override
> public void process(
> RecordTwo record,
> RuntimeContext ctx,
> RequestIndexer indexer) {
> // construct JSON document to index
> Map<String, String> json = new HashMap<>();
>
> json.put("item_three", record.item3);
> json.put("item_four", record.item4);
>
> IndexRequest rqst = Requests.indexRequest()
> .index(index) // index name
> .type(type) // mapping name
> .source(json);
> indexer.add(rqst);
> }
> }
> public class CreateRecordOne implements FlatMapFunction<ObjectNode,RecordOne> {
>
> static final Logger log = LoggerFactory.getLogger(CreateRecordOne.class);
>
> @Override
> public void flatMap(ObjectNode value, Collector<RecordOne> out) throws Exception {
> try {
> out.collect(new RecordOne(value.get("item1").asText(),value.get("item2").asText()));
> }
> catch(Exception e) {
> log.error("error while creating RecordOne", e);
> }
> }
> }
> public class CreateRecordTwo implements FlatMapFunction<ObjectNode,RecordTwo> {
>
> static final Logger log = LoggerFactory.getLogger(CreateRecordTwo.class);
>
> @Override
> public void flatMap(ObjectNode value, Collector<RecordTwo> out) throws Exception {
> try {
> out.collect(new RecordTwo(value.get("item1").asText(),value.get("item2").asText()));
> }
> catch(Exception e) {
> log.error("error while creating RecordTwo", e);
> }
> }
> }
> public class RecordOne {
>
> public String item1;
> public String item2;
>
> public RecordOne() {};
>
> public RecordOne (
>
> String item1,
> String item2
>
> ) {
>
> this.item1 = item1;
> this.item2 = item2;
>
> }
> }
> public class RecordTwo {
>
> public String item3;
> public String item4;
>
> public RecordTwo() {};
>
> public RecordTwo (
> String item3,
> String item4
> ) {
> this.item3 = item3;
> this.item4 = item4;
>
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)