You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by DuanSky <sk...@qq.com> on 2016/06/24 10:56:37 UTC
kafka lost data when use scala API to send data.
Hello With Respect,
Here I met a problem when use scala API to send/receive data to/from kafka brokers. I write a very simple producer and consumer code(just like the official examples), I found the code with Java API can work correctly, but the code with Scala API may lost data. Here is details.
Config: I down load kafka_2.11-0.10.0.0.tgz binary files and start it on single mode. Just one broker and one zookeeper, use default configuration.
Problem:
(1)Java API Test
I write a simple consumer and producer program with Java API first. The producer code is like this
code A
void produce() {
int messageNo = 1;
while (messageNo <= Config.count) {
for (String topic : KafkaConfig.topics.split(",")) {
String key = String.valueOf(messageNo);
String data = topic + "-" + new Date();
producer.send(new KeyedMessage<String, String>(topic, key ,data));
System.out.println(topic + "#" + key + "#" + data);
}
messageNo ++;
}
}
--------------------------------------------------------------------------------------------------------------------------------------------------------------
The consumer code is like this:
code B
void consume() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
for (String topic : Config.topics.split(",")) {
topicCountMap.put(topic, new Integer(1));
}
final Map<String, List<KafkaStream<String, String>>> consumerMap =
consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
for (final String topic : Config.topics.split(",")) {
new Thread(new Runnable() {
public void run() {
int count = 0;
KafkaStream<String, String> stream = consumerMap.get(topic).get(0);
ConsumerIterator<String, String> it = stream.iterator();
while (it.hasNext()) {
count ++;
MessageAndMetadata<String,String> message = it.next();
System.out.println(count + "#" + message.topic() +":" + message.key() + ":"+message.message());
}
}
}).start();
}
}
--------------------------------------------------------------------------------------------------------------------------------------------------------------
As I change the number of Config.count (which is the total number of every topic, here I use two topic -- a and b) I found that the consumer will receive the same number data no matter what the count is. So Java API is correct, but when I do the same thing using Scala API, I found some data may lost when send to the kafka brokers.
(2) Kafka API Test
I write a simple producer program with Scala API, part of it like this
--------------------------------------------------------------------------------------------------------------------------------------------------------------
code C
def main(args:Array[String]): Unit ={
val producer = {
// Zookeeper connection properties
val props = new util.HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.ACKS_CONFIG,"-1")
new KafkaProducer[String, String](props)
}
(1 to Config.count).foreach(key => {
Config.topics.split(",").foreach( topic =>{
val data = topic+new Date().toString
val message = new ProducerRecord[String,String](topic, key+"", data)
producer.send(message)
System.out.println(topic + "#" + key + "#" + data)
})
})
}
--------------------------------------------------------------------------------------------------------------------------------------------------------------
Step:
(1) First I start a consumer node waiting for receiving data;(use code B above)
(2) Then I start a producer node to produce data. (use code C above)
Problem:
I found that when I start code C, the producer produce data very fast that sending 100 messages takes no more than 1 second. While the Producer with Java API(code A above) cannot send data so fast; the consumer(code B) can only receive 34 topics a and 34 topics b then stand by, in fact I produce 100 messages. I have changed the count of the message that I send to Kafka, but no matter how many data I want to send, the producer can only receive half then I send sometimes less. More messages I send then more data I will lost.[lost means I can not send them successfully to brokers or I send successfully but cannot receive? In fact what I saw is I received part of the data and will not consume any more]
Try:
I found that the producer of Scala API send data too fast, so I add Thread.sleep(time) after send a single data. I found I do works! when I set the time = 100, I found It rearly lost data. But sometimes I still cannot receive the same count of data I send.
Question:
If I use the scala API wrong? Or something else...
I offered my codes in attachment. Looking forward your reply and advice.