You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Fabian Hueske (JIRA)" <ji...@apache.org> on 2018/01/23 09:49:00 UTC
[jira] [Created] (FLINK-8489) Data is not emitted by second
ElasticSearch connector
Fabian Hueske created FLINK-8489:
------------------------------------
Summary: Data is not emitted by second ElasticSearch connector
Key: FLINK-8489
URL: https://issues.apache.org/jira/browse/FLINK-8489
Project: Flink
Issue Type: Bug
Components: ElasticSearch Connector
Affects Versions: 1.4.0
Reporter: Fabian Hueske
A user reported [this issue|https://lists.apache.org/thread.html/e91c71beb45d6df879effa16c52f2c71aa6ef1a54ef0a8ac4ccc72ee@%3Cuser.flink.apache.org%3E] on the user@f.a.o mailing list.
*Setup:*
* A program with two pipelines that write to ElasticSearch. The pipelines can be connected or completely separate.
* ElasticSearch 5.6.4, connector {{flink-connector-elasticsearch5_2.11}}
*Problem:*
Only one of the ES connectors correctly emits data. The other connector writes a single record and then stops emitting data (or does not write any data at all). The problem does not exist, if the second ES connector is replaced by a different connector (for example Cassandra).
Below is a program to reproduce the issue:
{code:java}
public class ElasticSearchTest1 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// set elasticsearch connection details
Map<String, String> config = new HashMap<>();
config.put("bulk.flush.max.actions", "1");
config.put("cluster.name", "<cluster name>");
List<InetSocketAddress> transports = new ArrayList<>();
transports.add(new InetSocketAddress(InetAddress.getByName("<host ip>"), 9300));
//Set properties for Kafka Streaming
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "<host ip>"+":9092");
properties.setProperty("group.id", "testGroup");
properties.setProperty("auto.offset.reset", "latest");
//Create consumer for log records
FlinkKafkaConsumer011 inputConsumer1 = new FlinkKafkaConsumer011<>("elastic_test1", new JSONDeserializationSchema(), properties);
DataStream<RecordOne> firstStream = env
.addSource(inputConsumer1)
.flatMap(new CreateRecordOne());
firstStream
.addSink(new ElasticsearchSink<RecordOne>(config,
transports,
new ElasticSearchOutputRecord("elastic_test_index1","elastic_test_index1")));
FlinkKafkaConsumer011 inputConsumer2 = new FlinkKafkaConsumer011<>("elastic_test2", new JSONDeserializationSchema(), properties);
DataStream<RecordTwo> secondStream = env
.addSource(inputConsumer2)
.flatMap(new CreateRecordTwo());
secondStream
.addSink(new ElasticsearchSink<RecordTwo>(config,
transports,
new ElasticSearchOutputRecord2("elastic_test_index2","elastic_test_index2")));
env.execute("Elastic Search Test");
}
}
public class ElasticSearchOutputRecord implements ElasticsearchSinkFunction<RecordOne> {
String index;
String type;
// Initialize filter function
public ElasticSearchOutputRecord(String index, String type) {
this.index = index;
this.type = type;
}
// construct index request
@Override
public void process(
RecordOne record,
RuntimeContext ctx,
RequestIndexer indexer) {
// construct JSON document to index
Map<String, String> json = new HashMap<>();
json.put("item_one", record.item1);
json.put("item_two", record.item2);
IndexRequest rqst = Requests.indexRequest()
.index(index) // index name
.type(type) // mapping name
.source(json);
indexer.add(rqst);
}
}
public class ElasticSearchOutputRecord2 implements ElasticsearchSinkFunction<RecordTwo> {
String index;
String type;
// Initialize filter function
public ElasticSearchOutputRecord2(String index, String type) {
this.index = index;
this.type = type;
}
// construct index request
@Override
public void process(
RecordTwo record,
RuntimeContext ctx,
RequestIndexer indexer) {
// construct JSON document to index
Map<String, String> json = new HashMap<>();
json.put("item_three", record.item3);
json.put("item_four", record.item4);
IndexRequest rqst = Requests.indexRequest()
.index(index) // index name
.type(type) // mapping name
.source(json);
indexer.add(rqst);
}
}
public class CreateRecordOne implements FlatMapFunction<ObjectNode,RecordOne> {
static final Logger log = LoggerFactory.getLogger(CreateRecordOne.class);
@Override
public void flatMap(ObjectNode value, Collector<RecordOne> out) throws Exception {
try {
out.collect(new RecordOne(value.get("item1").asText(),value.get("item2").asText()));
}
catch(Exception e) {
log.error("error while creating RecordOne", e);
}
}
}
public class CreateRecordTwo implements FlatMapFunction<ObjectNode,RecordTwo> {
static final Logger log = LoggerFactory.getLogger(CreateRecordTwo.class);
@Override
public void flatMap(ObjectNode value, Collector<RecordTwo> out) throws Exception {
try {
out.collect(new RecordTwo(value.get("item1").asText(),value.get("item2").asText()));
}
catch(Exception e) {
log.error("error while creating RecordTwo", e);
}
}
}
public class RecordOne {
public String item1;
public String item2;
public RecordOne() {};
public RecordOne (
String item1,
String item2
) {
this.item1 = item1;
this.item2 = item2;
}
}
public class RecordTwo {
public String item3;
public String item4;
public RecordTwo() {};
public RecordTwo (
String item3,
String item4
) {
this.item3 = item3;
this.item4 = item4;
}
}
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)