You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by DuanSky <sk...@qq.com> on 2016/06/24 10:56:37 UTC

kafka lost data when use scala API to send data.

Hello With Respect,
  Here I met a problem when use scala API to send/receive data to/from kafka brokers. I write a very simple producer and consumer code(just like the official examples), I found the code with Java API can work correctly, but the code with Scala API may lost data. Here is details.


Config: I down load kafka_2.11-0.10.0.0.tgz binary files and start it on single mode. Just one broker and one zookeeper, use default configuration.


Problem: 
(1)Java API Test 
  I write a simple consumer and producer program with Java API first. The producer code is like this
code A
void produce() {
    int messageNo = 1;
    while (messageNo <= Config.count) {
        for (String topic : KafkaConfig.topics.split(",")) {
            String key = String.valueOf(messageNo);
            String data = topic + "-" + new Date();
            producer.send(new KeyedMessage<String, String>(topic, key ,data));
            System.out.println(topic + "#" + key + "#" + data);
        }
        messageNo ++;
    }
}
--------------------------------------------------------------------------------------------------------------------------------------------------------------
The consumer code is like this:
code B
void consume() {
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

    StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
    StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());

    for (String topic : Config.topics.split(",")) {
        topicCountMap.put(topic, new Integer(1));
    }

    final Map<String, List<KafkaStream<String, String>>> consumerMap =
            consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);

    for (final String topic : Config.topics.split(",")) {
        new Thread(new Runnable() {
            public void run() {
                int count = 0;
                KafkaStream<String, String> stream = consumerMap.get(topic).get(0);
                ConsumerIterator<String, String> it = stream.iterator();
                while (it.hasNext()) {
                    count ++;
                    MessageAndMetadata<String,String> message = it.next();
                    System.out.println(count + "#"  + message.topic() +":" + message.key() + ":"+message.message());
                }
            }
        }).start();

    }

}
--------------------------------------------------------------------------------------------------------------------------------------------------------------
As I change the number of Config.count (which is the total number of every topic, here I use two topic -- a and b) I found that the consumer will receive the same number data no matter what the count is. So Java API is correct, but when I do the same thing using Scala API, I found some data may lost when send to the kafka brokers.


(2) Kafka API Test
  I write a simple producer program with Scala API, part of it like this
--------------------------------------------------------------------------------------------------------------------------------------------------------------
code C
def main(args:Array[String]): Unit ={
  val producer = {
    // Zookeeper connection properties
    val props = new util.HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.brokers)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.ACKS_CONFIG,"-1")

    new KafkaProducer[String, String](props)
  }

  (1 to Config.count).foreach(key => {
    Config.topics.split(",").foreach( topic =>{
      val data = topic+new Date().toString
      val message = new ProducerRecord[String,String](topic, key+"", data)
      producer.send(message)
      System.out.println(topic + "#" + key + "#" + data)
    })
  })

}
--------------------------------------------------------------------------------------------------------------------------------------------------------------
Step:
(1) First I start a consumer node waiting for receiving data;(use code B above)
(2) Then I start a producer node to produce data. (use code C above) 

Problem:
 I found that when I start code C, the producer produce data very fast that sending 100 messages takes no more than 1 second. While the Producer with Java API(code A above) cannot send data so fast; the consumer(code B) can only receive 34 topics a and 34 topics b then stand by, in fact I produce 100 messages. I have changed the count of the message that I send to Kafka, but no matter how many data I want to send, the producer can only receive half then I send sometimes less. More messages I send then more data I will lost.[lost means I can not send them successfully to brokers or I send successfully but cannot receive? In fact what I saw is I received part of the data and will not consume any more] 
Try:
  I found that the producer of Scala API send data too fast, so I add Thread.sleep(time) after send a single data. I found I do works! when I set the time = 100, I found It rearly lost data. But sometimes I still cannot receive the same count of data I send.


Question:
If I use the scala API wrong? Or something else...
I offered my codes in attachment. Looking forward your reply and advice.