You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Site Register <si...@ymail.com.INVALID> on 2020/11/19 21:54:14 UTC

How to set key for producing batch messages into Kafka?

Hi Camel Users,

I can set key for an individual message using KafkaConstants.KEY in the header. However, the whole process took extremely long time (more than 20 mins) to produce 500K messages with key into Kafka topic. I also tried to produce messages in batch without assigning the key which took 14 seconds for 500K messages. So my question is how to assign the key for the batch messages?
Processor(GsonStreamProcessor):
if (numberOfRecords % batchSize == 0) {

List<String> batchList = new ArrayList<String>();

exchange.getIn().setBody(batchList);
exchange.getIn().setHeader(KafkaConstants.KEY, "key-"+keyCount);     //This line not working with batch messages

ProducerTemplate template = exchange.getContext().createProducerTemplate();

template.send("vm:msg", exchange); 
}

Route:
from("direct:start")
.process("GsonStreamProcessor")
.log("Done");
 from("vm:msg")
.to("kafka:.....");
 

Thank you,




Re: How to set key for producing batch messages into Kafka?

Posted by Site Register <si...@ymail.com.INVALID>.
 I changed to below route and it took 20 seconds to produce 500K messages with key into Kafka topic. However, I tried to use pure java application to produce the same amount of messages with key which only took 10 ~11 seconds. Is there a way to make the same performance as pure java in camel? 
I even tried to use the same pure java codes inside of processor. But it took 22 seconds. Not sure which part went wrong.

Revised Route:

from("vm:msg")
.split(body())
.parallelProcessing()
.process("SetKeyProcessor")
.to("kafka:....")
.end()
.log("${header.Counter}");


--------------------------------------------------------------------------------------------------------
Pure Java Codes:
    int recordCt=0;    long startTime = System.currentTimeMillis();
    Properties properties = new Properties();    properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");    properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());    properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());    properties.setProperty(ProducerConfig.ACKS_CONFIG, "1"); // leader replica saves    properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false"); // ensure we don't push duplicates    properties.setProperty(ProducerConfig.RETRIES_CONFIG, "3"); //    properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "0"); // was 1 can set to 10 millis for more batching    properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16340"); // Default setting
    Producer<String, String> producer = new KafkaProducer<>(properties);
    String jsonFilePath = "d:\\temp\\test.json"; // 500K messages, file size 400MB
    try (JsonReader jsonReader = new JsonReader(new FileReader(jsonFilePath)))      {        Gson gson = new GsonBuilder().create();        jsonReader.beginArray(); //start of json array        int numberOfRecords = 0;
        while (jsonReader.hasNext()){ //next json array element
            String message = gson.fromJson(jsonReader, JsonObject.class).toString();            recordCt += 1;            producer.send(new ProducerRecord<>("topic_name", "instanceKey", message));
            if (recordCt % 10000 == 0) {                System.out.println("Published -> " + String.format("%,d", recordCt));            }
         }
        jsonReader.endArray();
    } catch (IOException e) {        e.printStackTrace();    }
    producer.close();
    System.out.println("");    System.out.println("Total records published to topic:" + String.format("%,d", recordCt));
    long durationInSecs = (System.currentTimeMillis() - startTime) / 1000;    System.out.println("");    System.out.println("Total elapsed time to process: " + durationInSecs + " seconds");    System.out.println(""); }


Thank you,

    On Thursday, November 19, 2020, 04:54:30 PM EST, Site Register <si...@ymail.com.invalid> wrote:  
 
 Hi Camel Users,

I can set key for an individual message using KafkaConstants.KEY in the header. However, the whole process took extremely long time (more than 20 mins) to produce 500K messages with key into Kafka topic. I also tried to produce messages in batch without assigning the key which took 14 seconds for 500K messages. So my question is how to assign the key for the batch messages?
Processor(GsonStreamProcessor):
if (numberOfRecords % batchSize == 0) {

List<String> batchList = new ArrayList<String>();

exchange.getIn().setBody(batchList);
exchange.getIn().setHeader(KafkaConstants.KEY, "key-"+keyCount);     //This line not working with batch messages

ProducerTemplate template = exchange.getContext().createProducerTemplate();

template.send("vm:msg", exchange); 
}

Route:
from("direct:start")
.process("GsonStreamProcessor")
.log("Done");
 from("vm:msg")
.to("kafka:.....");
 

Thank you,