You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Fabian Hueske (JIRA)" <ji...@apache.org> on 2018/01/23 09:49:00 UTC

[jira] [Created] (FLINK-8489) Data is not emitted by second ElasticSearch connector

Fabian Hueske created FLINK-8489:
------------------------------------

             Summary: Data is not emitted by second ElasticSearch connector
                 Key: FLINK-8489
                 URL: https://issues.apache.org/jira/browse/FLINK-8489
             Project: Flink
          Issue Type: Bug
          Components: ElasticSearch Connector
    Affects Versions: 1.4.0
            Reporter: Fabian Hueske


A user reported [this issue|https://lists.apache.org/thread.html/e91c71beb45d6df879effa16c52f2c71aa6ef1a54ef0a8ac4ccc72ee@%3Cuser.flink.apache.org%3E] on the user@f.a.o mailing list.

*Setup:*
 * A program with two pipelines that write to ElasticSearch. The pipelines can be connected or completely separate.
 * ElasticSearch 5.6.4, connector {{flink-connector-elasticsearch5_2.11}}

*Problem:*
 Only one of the ES connectors correctly emits data. The other connector writes a single record and then stops emitting data (or does not write any data at all). The problem does not exist, if the second ES connector is replaced by a different connector (for example Cassandra).

Below is a program to reproduce the issue:
{code:java}
public class ElasticSearchTest1 {

	public static void main(String[] args) throws Exception {
		
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		
		// set elasticsearch connection details	
		Map<String, String> config = new HashMap<>();
		config.put("bulk.flush.max.actions", "1");
		config.put("cluster.name", "<cluster name>");
		List<InetSocketAddress> transports = new ArrayList<>();		
		transports.add(new InetSocketAddress(InetAddress.getByName("<host ip>"), 9300));
		
		//Set properties for Kafka Streaming
	        Properties properties = new Properties();
		properties.setProperty("bootstrap.servers", "<host ip>"+":9092");
		properties.setProperty("group.id", "testGroup");
		properties.setProperty("auto.offset.reset", "latest");	
				
		//Create consumer for log records
		
		FlinkKafkaConsumer011 inputConsumer1 = new FlinkKafkaConsumer011<>("elastic_test1", new JSONDeserializationSchema(), properties);
				
		DataStream<RecordOne> firstStream = env
				.addSource(inputConsumer1)
				.flatMap(new CreateRecordOne());
		 	
		firstStream		
		.addSink(new ElasticsearchSink<RecordOne>(config, 
				transports, 
				new ElasticSearchOutputRecord("elastic_test_index1","elastic_test_index1")));
		
		FlinkKafkaConsumer011 inputConsumer2 = new FlinkKafkaConsumer011<>("elastic_test2", new JSONDeserializationSchema(), properties);
		
		DataStream<RecordTwo> secondStream = env
					.addSource(inputConsumer2)		
					.flatMap(new CreateRecordTwo());
		 	
		secondStream		
		.addSink(new ElasticsearchSink<RecordTwo>(config, 
				transports, 
				new ElasticSearchOutputRecord2("elastic_test_index2","elastic_test_index2")));
				
		env.execute("Elastic Search Test");
	}
}

public class ElasticSearchOutputRecord implements ElasticsearchSinkFunction<RecordOne> {

	String index;
	String type;
    // Initialize filter function
    public ElasticSearchOutputRecord(String index, String type) {
        this.index = index;
        this.type = type;
    }
	// construct index request
	@Override
	public void process(
			RecordOne record,
		RuntimeContext ctx,
		RequestIndexer indexer) {

		// construct JSON document to index
		Map<String, String> json = new HashMap<>();
		
		json.put("item_one", record.item1);      
		json.put("item_two", record.item2);      
						
		IndexRequest rqst = Requests.indexRequest()
				.index(index)           // index name
				.type(type)     // mapping name
				.source(json);

		indexer.add(rqst);
	}
}

public class ElasticSearchOutputRecord2 implements ElasticsearchSinkFunction<RecordTwo> {

	String index;
	String type;
    // Initialize filter function
    public ElasticSearchOutputRecord2(String index, String type) {
        this.index = index;
        this.type = type;
    }
	// construct index request
	@Override
	public void process(
			RecordTwo record,
		RuntimeContext ctx,
		RequestIndexer indexer) {

		// construct JSON document to index
		Map<String, String> json = new HashMap<>();
		
		json.put("item_three", record.item3);      
		json.put("item_four", record.item4);      
						
		IndexRequest rqst = Requests.indexRequest()
				.index(index)           // index name
				.type(type)     // mapping name
				.source(json);

		indexer.add(rqst);
	}
}

public class CreateRecordOne implements FlatMapFunction<ObjectNode,RecordOne> {
	
	static final Logger log = LoggerFactory.getLogger(CreateRecordOne.class);
	
	@Override
	public void flatMap(ObjectNode value, Collector<RecordOne> out) throws Exception {
		try {
			out.collect(new RecordOne(value.get("item1").asText(),value.get("item2").asText()));
		}
		catch(Exception e) {
			log.error("error while creating RecordOne", e);
		}
	}

}

public class CreateRecordTwo implements FlatMapFunction<ObjectNode,RecordTwo> {
	
	static final Logger log = LoggerFactory.getLogger(CreateRecordTwo.class);
	
	@Override
	public void flatMap(ObjectNode value, Collector<RecordTwo> out) throws Exception {
		try {
			out.collect(new RecordTwo(value.get("item1").asText(),value.get("item2").asText()));
		}
		catch(Exception e) {
			log.error("error while creating RecordTwo", e);
		}
	}

}

public class RecordOne {	
	
	public String item1;	
	public String item2;	
		
	public RecordOne() {};
	
	public RecordOne (
			
			String item1,	
			String item2	
			 						
			) {	
		
				 this.item1 =	item1;
				 this.item2 = item2;	
				 
	}		
}

public class RecordTwo {	
	
	public String item3;	
	public String item4;	
		
	public RecordTwo() {};
	
	public RecordTwo (			
			String item3,	
			String item4			 						
			) {		
				 this.item3 =	item3;
				 this.item4 = item4;	
				 
	}		
}
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)