You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by "Caokun (Jack, Platform)" <ca...@huawei.com> on 2017/06/20 15:06:52 UTC

kafka version 0.10.2.1 consumer can not get the message

Hello experts
I write the kafka demo with java .
The prouct can send the message but the consumer can not get the message
My  kafka configuration is ok
./kafka-console-producer.sh --broker-list localhost:9080 --topic testkun
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic testkun --from-beginning

The following is the java code of consumer,product ,app ant interface
Thakns a lot

package com.huawei.business;
import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.Consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;


public class hwKafkaConsumer extends Thread {
    private final String topic;

    public hwKafkaConsumer(final String topic)
    {

        this.topic = topic;
    }
    @Override
    public void  run()
    {

    Properties props = new Properties();
    props.put("bootstrap.servers", "10.179.165.7:9080");
    props.put("zookeeper.connect", KafkaProperties.zkConnect);
    props.put("group.id", KafkaProperties.groupId);
    props.put("enable.auto.commit", "true");//
    props.put("auto.commit.interval.ms", "1000");//
    props.put("zookeeper.session.timeout.ms", "40000");
    props.put("zookeeper.sync.time.ms", "200");
    props.put("auto.commit.interval.ms", "1000");
    props.put("rebalance.backoff.ms", "2000");
    props.put("rebalance.max.retries", "10");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    props.put("auto.offset.reset", "earliest");//add to comsumer the earliest message
    Consumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList(this.topic));
    System.out.printf("consumer :the topic is "+this.topic+"   ");


    while (true) {
        System.out.printf("consumer:recieve the message in while loop  ");
        ConsumerRecords<String, String> records = consumer.poll(100);//seem to hang here
              System.out.printf("  consumer after create records  ");
        for (ConsumerRecord<String, String> record : records) {
                 System.out.printf("for loop to parse the message.....");
            System.out.printf("offset = %d, key = %s, value11111 = %s%n", record.offset(), record.key(), record.value());
        }
        System.out.printf("  consumer after for loop  ");

     }


    }
  }

Any one can help to find the problem?
Thanks a lot

the product test is as following:

package com.huawei.business;
import java.util.Date;
import java.util.Properties;


import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

//import kafka.producer.KeyedMessage;
//import kafka.producer.ProducerConfig;

public class hwKafkaProducer extends Thread
{
    //private final kafka.javaapi.producer.Producer<Integer, String> producer;
    private final String topic;
    // private final Properties props = new Properties();

    public hwKafkaProducer(final String topic)
    {
       // props.put("serializer.class", "kafka.serializer.StringEncoder");
       // props.put("metadata.broker.list", "10.179.165.7:9080");
       // producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));

        this.topic = topic;
    }
    @Override
    public void run()
    {
        int messageNo = 1;

        Properties props = new Properties();
        props.put("bootstrap.servers", "10.179.165.7:9080");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("metadata.broker.list", "10.179.165.7:9080");
        props.put("bootstrap.servers", "10.179.165.7:9080");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("group.id", KafkaProperties.groupId);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
       // Producer<String, String> producer = new KafkaProducer<>(props);

        Producer<String, String> producer = new KafkaProducer<>(props);
        for(int i = 0; i <1; i++){
            System.out.printf("send  the message:create record.");
            System.out.printf("send  the message: the topic is "+this.topic);
            ProducerRecord<String, String> data = new ProducerRecord<String, String>(this.topic, Integer.toString(i),"helloworld_ni_hao");
            System.out.printf("send  the message:before send.");
           // producer.send(data);

            producer.send(data,new Callback() {
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    System.out.println("why why why?: " );
                    if(e != null) {
                       e.printStackTrace();
                    } else {
                       System.out.println("bbbb: " + metadata.offset());
                       System.out.println("cccc: " );
                    }
                }
            });

            System.out.printf("send  the message:the data.values is."+data.value());
            System.out.printf("send  the message:after send.");
        }

        producer.close();
        /*
        while (true)
        {
            System.out.printf("send  the message");
            long runtime = new Date().getTime();
            String ip = "123456";
            String msg = "mymessage";
            ProducerRecord<String, String> data = new ProducerRecord<String, String>(this.topic, ip, msg);
            producer.send(data,
                     new Callback() {
                 public void onCompletion(RecordMetadata metadata, Exception e) {
                     if(e != null) {
                        e.printStackTrace();
                     } else {
                        System.out.println("The offset of the record we just sent is: " + metadata.offset());
                     }
                 }
             });
        }
        */
        //producer.close();
    }
}

The app is
package com.huawei.business;

public class app
{
    public static void main(final String[] args)
    {
        final hwKafkaProducer producerThread = new hwKafkaProducer(KafkaProperties.topic);
        producerThread.start();
        final hwKafkaConsumer consumerThread = new hwKafkaConsumer(KafkaProperties.topic);
        consumerThread.start();

    }
}

The properties java file is

package com.huawei.business;

/**
* Created by x00343661 on 2016/6/12.
*/
public interface KafkaProperties
{
    final static String zkConnect = "10.179.165.7:2181";
    final static String groupId = "group1";
    final static String topic = "yqf";
    final static String kafkaServerURL = "10.179.165.7";
    final static int kafkaServerPort = 9080;
    final static int kafkaProducerBufferSize = 64 * 1024;
    final static int connectionTimeOut = 20000;
    final static int reconnectInterval = 10000;
    final static String topic2 = "topic2";
    final static String topic3 = "topic3";
    final static String clientId = "SimpleConsumerDemoClient";
}