You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Chen Wang <ch...@gmail.com> on 2014/02/20 23:40:54 UTC

Kafka SimpleConsumer not working

Hi,
I am using kafka for the first time, and was running the sample from
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

However, I cannot read any data from kafka. The kafka has 10 partitions,and
I tried to read from any of them. The fetch can succeed, however, the
message size returned is always 0( System.out
        .println("the message size is" + messageSet
            .kafka$javaapi$message$ByteBufferMessageSet$$underlying()
            .size());). Is there something apparent missing for my case?


while (a_maxReads > 0) {
    if (consumer == null) {
        consumer = new SimpleConsumer(leadBroker, a_port, 100000,
            10240 * 1024, clientName);
    }

    System.out.println("start fetching");
    System.out.println("the readoffset is" + readOffset);

    FetchRequest req = new FetchRequestBuilder().clientId(clientName)
        .addFetch(a_topic, a_partition, readOffset, 1000000)
        .build();
    FetchResponse fetchResponse = consumer.fetch(req);
    System.out.println("finish fetching");
    if (fetchResponse.hasError()) {
        numErrors++;
        // Something went wrong!
        short code = fetchResponse.errorCode(a_topic, a_partition);
        System.out.println("Error fetching data from the Broker:" +
leadBroker + " Reason: " + code);
        if (numErrors > 5)
            break;
        if (code == ErrorMapping.OffsetOutOfRangeCode()) {
            // We asked for an invalid offset. For simple case ask for
            // the last element to reset
            readOffset = getLastOffset(consumer, a_topic, a_partition,
                kafka.api.OffsetRequest.LatestTime(), clientName);
            continue;
        }
        consumer.close();
        consumer = null;
        leadBroker = findNewLeader(leadBroker, a_topic, a_partition,
            a_port);
        continue;
    }
    numErrors = 0;

    long numRead = 0;
    System.out.println("The topic is:" + a_topic + " partition is : " +
a_partition);
    ByteBufferMessageSet messageSet = fetchResponse.messageSet(a_topic,
        a_partition);
    System.out
        .println("the message size is" + messageSet
            .kafka$javaapi$message$ByteBufferMessageSet$$underlying()
            .size());
    for (MessageAndOffset messageAndOffset: messageSet) {
        long currentOffset = messageAndOffset.offset();
        if (currentOffset < readOffset) {
            System.out.println("Found an old offset: " + currentOffset + "
Expecting: " + readOffset);
            continue;
        }
        readOffset = messageAndOffset.nextOffset();
        ByteBuffer payload = messageAndOffset.message().payload();

        byte[] bytes = new byte[payload.limit()];
        payload.get(bytes);
        System.out.println(String.valueOf(messageAndOffset.offset()) + ": "
+ new String(bytes, "UTF-8"));
        numRead++;
        a_maxReads--;
    }

    if (numRead == 0) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException ie) {}

Thanks much!
Chen

Re: Kafka SimpleConsumer not working

Posted by Chen Wang <ch...@gmail.com>.
i am using 0.8.0. The high level api works as expected.

<dependency>

 <groupId>org.apache.kafka</groupId>

 <artifactId>kafka_2.10</artifactId>

 <version>0.8.0</version>


On Thu, Feb 20, 2014 at 2:40 PM, Chen Wang <ch...@gmail.com>wrote:

> Hi,
> I am using kafka for the first time, and was running the sample from
>
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
>
> However, I cannot read any data from kafka. The kafka has 10
> partitions,and I tried to read from any of them. The fetch can succeed,
> however, the message size returned is always 0( System.out
>         .println("the message size is" + messageSet
>             .kafka$javaapi$message$ByteBufferMessageSet$$underlying()
>             .size());). Is there something apparent missing for my case?
>
>
> while (a_maxReads > 0) {
>     if (consumer == null) {
>         consumer = new SimpleConsumer(leadBroker, a_port, 100000,
>             10240 * 1024, clientName);
>     }
>
>     System.out.println("start fetching");
>     System.out.println("the readoffset is" + readOffset);
>
>     FetchRequest req = new FetchRequestBuilder().clientId(clientName)
>         .addFetch(a_topic, a_partition, readOffset, 1000000)
>         .build();
>     FetchResponse fetchResponse = consumer.fetch(req);
>     System.out.println("finish fetching");
>     if (fetchResponse.hasError()) {
>         numErrors++;
>         // Something went wrong!
>         short code = fetchResponse.errorCode(a_topic, a_partition);
>         System.out.println("Error fetching data from the Broker:" +
> leadBroker + " Reason: " + code);
>         if (numErrors > 5)
>             break;
>         if (code == ErrorMapping.OffsetOutOfRangeCode()) {
>             // We asked for an invalid offset. For simple case ask for
>             // the last element to reset
>             readOffset = getLastOffset(consumer, a_topic, a_partition,
>                 kafka.api.OffsetRequest.LatestTime(), clientName);
>             continue;
>         }
>         consumer.close();
>         consumer = null;
>         leadBroker = findNewLeader(leadBroker, a_topic, a_partition,
>             a_port);
>         continue;
>     }
>     numErrors = 0;
>
>     long numRead = 0;
>     System.out.println("The topic is:" + a_topic + " partition is : " +
> a_partition);
>     ByteBufferMessageSet messageSet = fetchResponse.messageSet(a_topic,
>         a_partition);
>     System.out
>         .println("the message size is" + messageSet
>             .kafka$javaapi$message$ByteBufferMessageSet$$underlying()
>             .size());
>     for (MessageAndOffset messageAndOffset: messageSet) {
>         long currentOffset = messageAndOffset.offset();
>         if (currentOffset < readOffset) {
>             System.out.println("Found an old offset: " + currentOffset + "
> Expecting: " + readOffset);
>             continue;
>         }
>         readOffset = messageAndOffset.nextOffset();
>         ByteBuffer payload = messageAndOffset.message().payload();
>
>         byte[] bytes = new byte[payload.limit()];
>         payload.get(bytes);
>         System.out.println(String.valueOf(messageAndOffset.offset()) + ":
> " + new String(bytes, "UTF-8"));
>         numRead++;
>         a_maxReads--;
>     }
>
>     if (numRead == 0) {
>         try {
>             Thread.sleep(1000);
>         } catch (InterruptedException ie) {}
>
> Thanks much!
> Chen
>