You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@metron.apache.org by nickwallen <gi...@git.apache.org> on 2018/06/01 21:17:17 UTC

[GitHub] metron pull request #1045: METRON-1594: KafkaWriter is asynchronous and may ...

Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1045#discussion_r192517781
  
    --- Diff: metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java ---
    @@ -156,33 +172,61 @@ public void configure(String sensorName, WriterConfiguration configuration) {
         }
       }
     
    +  @Override
    +  public void init(Map stormConf, TopologyContext topologyContext, WriterConfiguration config)
    +      throws Exception {
    +    if(this.zkQuorum != null && this.brokerUrl == null) {
    +      try {
    +        this.brokerUrl = Joiner.on(",").join(KafkaUtils.INSTANCE.getBrokersFromZookeeper(this.zkQuorum));
    +      } catch (Exception e) {
    +        throw new IllegalStateException("Cannot read kafka brokers from zookeeper and you didn't specify them, giving up!", e);
    +      }
    +    }
    +    this.kafkaProducer = new KafkaProducer<>(createProducerConfigs());
    +  }
    +
       public Map<String, Object> createProducerConfigs() {
         Map<String, Object> producerConfig = new HashMap<>();
         producerConfig.put("bootstrap.servers", brokerUrl);
         producerConfig.put("key.serializer", keySerializer);
         producerConfig.put("value.serializer", valueSerializer);
         producerConfig.put("request.required.acks", requiredAcks);
    +    producerConfig.put(ProducerConfig.BATCH_SIZE_CONFIG, DEFAULT_BATCH_SIZE);
         producerConfig.putAll(producerConfigs == null?new HashMap<>():producerConfigs);
         producerConfig = KafkaUtils.INSTANCE.normalizeProtocol(producerConfig);
         return producerConfig;
       }
     
       @Override
    -  public void init() {
    -    if(this.zkQuorum != null && this.brokerUrl == null) {
    +  public BulkWriterResponse write(String sensorType, WriterConfiguration configurations,
    +      Iterable<Tuple> tuples, List<JSONObject> messages) {
    +    BulkWriterResponse writerResponse = new BulkWriterResponse();
    +
    +    List<Map.Entry<Tuple, Future>> results = new ArrayList<>();
    +    int i = 0;
    +    for (Tuple tuple : tuples) {
    +      JSONObject message = messages.get(i++);
    +      Future future = kafkaProducer
    +          .send(new ProducerRecord<String, String>(kafkaTopic, message.toJSONString()));
    --- End diff --
    
    Should we be more defensive when we transform the message to JSON? Considering the broad use of this class, someone might inject something that causes problems during serialization.


---