You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by "Caokun (Jack, Platform)" <ca...@huawei.com> on 2017/06/20 15:06:52 UTC
kafka version 0.10.2.1 consumer can not get the message
Hello experts
I write the kafka demo with java .
The prouct can send the message but the consumer can not get the message
My kafka configuration is ok
./kafka-console-producer.sh --broker-list localhost:9080 --topic testkun
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic testkun --from-beginning
The following is the java code of consumer,product ,app ant interface
Thakns a lot
package com.huawei.business;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
public class hwKafkaConsumer extends Thread {
private final String topic;
public hwKafkaConsumer(final String topic)
{
this.topic = topic;
}
@Override
public void run()
{
Properties props = new Properties();
props.put("bootstrap.servers", "10.179.165.7:9080");
props.put("zookeeper.connect", KafkaProperties.zkConnect);
props.put("group.id", KafkaProperties.groupId);
props.put("enable.auto.commit", "true");//
props.put("auto.commit.interval.ms", "1000");//
props.put("zookeeper.session.timeout.ms", "40000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("rebalance.backoff.ms", "2000");
props.put("rebalance.max.retries", "10");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");//add to comsumer the earliest message
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(this.topic));
System.out.printf("consumer :the topic is "+this.topic+" ");
while (true) {
System.out.printf("consumer:recieve the message in while loop ");
ConsumerRecords<String, String> records = consumer.poll(100);//seem to hang here
System.out.printf(" consumer after create records ");
for (ConsumerRecord<String, String> record : records) {
System.out.printf("for loop to parse the message.....");
System.out.printf("offset = %d, key = %s, value11111 = %s%n", record.offset(), record.key(), record.value());
}
System.out.printf(" consumer after for loop ");
}
}
}
Any one can help to find the problem?
Thanks a lot
the product test is as following:
package com.huawei.business;
import java.util.Date;
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
//import kafka.producer.KeyedMessage;
//import kafka.producer.ProducerConfig;
public class hwKafkaProducer extends Thread
{
//private final kafka.javaapi.producer.Producer<Integer, String> producer;
private final String topic;
// private final Properties props = new Properties();
public hwKafkaProducer(final String topic)
{
// props.put("serializer.class", "kafka.serializer.StringEncoder");
// props.put("metadata.broker.list", "10.179.165.7:9080");
// producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
this.topic = topic;
}
@Override
public void run()
{
int messageNo = 1;
Properties props = new Properties();
props.put("bootstrap.servers", "10.179.165.7:9080");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list", "10.179.165.7:9080");
props.put("bootstrap.servers", "10.179.165.7:9080");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("group.id", KafkaProperties.groupId);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
// Producer<String, String> producer = new KafkaProducer<>(props);
Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i <1; i++){
System.out.printf("send the message:create record.");
System.out.printf("send the message: the topic is "+this.topic);
ProducerRecord<String, String> data = new ProducerRecord<String, String>(this.topic, Integer.toString(i),"helloworld_ni_hao");
System.out.printf("send the message:before send.");
// producer.send(data);
producer.send(data,new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
System.out.println("why why why?: " );
if(e != null) {
e.printStackTrace();
} else {
System.out.println("bbbb: " + metadata.offset());
System.out.println("cccc: " );
}
}
});
System.out.printf("send the message:the data.values is."+data.value());
System.out.printf("send the message:after send.");
}
producer.close();
/*
while (true)
{
System.out.printf("send the message");
long runtime = new Date().getTime();
String ip = "123456";
String msg = "mymessage";
ProducerRecord<String, String> data = new ProducerRecord<String, String>(this.topic, ip, msg);
producer.send(data,
new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null) {
e.printStackTrace();
} else {
System.out.println("The offset of the record we just sent is: " + metadata.offset());
}
}
});
}
*/
//producer.close();
}
}
The app is
package com.huawei.business;
public class app
{
public static void main(final String[] args)
{
final hwKafkaProducer producerThread = new hwKafkaProducer(KafkaProperties.topic);
producerThread.start();
final hwKafkaConsumer consumerThread = new hwKafkaConsumer(KafkaProperties.topic);
consumerThread.start();
}
}
The properties java file is
package com.huawei.business;
/**
* Created by x00343661 on 2016/6/12.
*/
public interface KafkaProperties
{
final static String zkConnect = "10.179.165.7:2181";
final static String groupId = "group1";
final static String topic = "yqf";
final static String kafkaServerURL = "10.179.165.7";
final static int kafkaServerPort = 9080;
final static int kafkaProducerBufferSize = 64 * 1024;
final static int connectionTimeOut = 20000;
final static int reconnectInterval = 10000;
final static String topic2 = "topic2";
final static String topic3 = "topic3";
final static String clientId = "SimpleConsumerDemoClient";
}