You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@metron.apache.org by nickwallen <gi...@git.apache.org> on 2018/06/01 21:17:17 UTC
[GitHub] metron pull request #1045: METRON-1594: KafkaWriter is asynchronous and may ...
Github user nickwallen commented on a diff in the pull request:
https://github.com/apache/metron/pull/1045#discussion_r192517781
--- Diff: metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java ---
@@ -156,33 +172,61 @@ public void configure(String sensorName, WriterConfiguration configuration) {
}
}
+ @Override
+ public void init(Map stormConf, TopologyContext topologyContext, WriterConfiguration config)
+ throws Exception {
+ if(this.zkQuorum != null && this.brokerUrl == null) {
+ try {
+ this.brokerUrl = Joiner.on(",").join(KafkaUtils.INSTANCE.getBrokersFromZookeeper(this.zkQuorum));
+ } catch (Exception e) {
+ throw new IllegalStateException("Cannot read kafka brokers from zookeeper and you didn't specify them, giving up!", e);
+ }
+ }
+ this.kafkaProducer = new KafkaProducer<>(createProducerConfigs());
+ }
+
public Map<String, Object> createProducerConfigs() {
Map<String, Object> producerConfig = new HashMap<>();
producerConfig.put("bootstrap.servers", brokerUrl);
producerConfig.put("key.serializer", keySerializer);
producerConfig.put("value.serializer", valueSerializer);
producerConfig.put("request.required.acks", requiredAcks);
+ producerConfig.put(ProducerConfig.BATCH_SIZE_CONFIG, DEFAULT_BATCH_SIZE);
producerConfig.putAll(producerConfigs == null?new HashMap<>():producerConfigs);
producerConfig = KafkaUtils.INSTANCE.normalizeProtocol(producerConfig);
return producerConfig;
}
@Override
- public void init() {
- if(this.zkQuorum != null && this.brokerUrl == null) {
+ public BulkWriterResponse write(String sensorType, WriterConfiguration configurations,
+ Iterable<Tuple> tuples, List<JSONObject> messages) {
+ BulkWriterResponse writerResponse = new BulkWriterResponse();
+
+ List<Map.Entry<Tuple, Future>> results = new ArrayList<>();
+ int i = 0;
+ for (Tuple tuple : tuples) {
+ JSONObject message = messages.get(i++);
+ Future future = kafkaProducer
+ .send(new ProducerRecord<String, String>(kafkaTopic, message.toJSONString()));
--- End diff --
Should we be more defensive when we transform the message to JSON? Considering the broad use of this class, someone might inject something that causes problems during serialization.
---