You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Chen Wang <ch...@gmail.com> on 2014/02/20 23:40:54 UTC
Kafka SimpleConsumer not working
Hi,
I am using kafka for the first time, and was running the sample from
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
However, I cannot read any data from kafka. The kafka has 10 partitions,and
I tried to read from any of them. The fetch can succeed, however, the
message size returned is always 0( System.out
.println("the message size is" + messageSet
.kafka$javaapi$message$ByteBufferMessageSet$$underlying()
.size());). Is there something apparent missing for my case?
while (a_maxReads > 0) {
if (consumer == null) {
consumer = new SimpleConsumer(leadBroker, a_port, 100000,
10240 * 1024, clientName);
}
System.out.println("start fetching");
System.out.println("the readoffset is" + readOffset);
FetchRequest req = new FetchRequestBuilder().clientId(clientName)
.addFetch(a_topic, a_partition, readOffset, 1000000)
.build();
FetchResponse fetchResponse = consumer.fetch(req);
System.out.println("finish fetching");
if (fetchResponse.hasError()) {
numErrors++;
// Something went wrong!
short code = fetchResponse.errorCode(a_topic, a_partition);
System.out.println("Error fetching data from the Broker:" +
leadBroker + " Reason: " + code);
if (numErrors > 5)
break;
if (code == ErrorMapping.OffsetOutOfRangeCode()) {
// We asked for an invalid offset. For simple case ask for
// the last element to reset
readOffset = getLastOffset(consumer, a_topic, a_partition,
kafka.api.OffsetRequest.LatestTime(), clientName);
continue;
}
consumer.close();
consumer = null;
leadBroker = findNewLeader(leadBroker, a_topic, a_partition,
a_port);
continue;
}
numErrors = 0;
long numRead = 0;
System.out.println("The topic is:" + a_topic + " partition is : " +
a_partition);
ByteBufferMessageSet messageSet = fetchResponse.messageSet(a_topic,
a_partition);
System.out
.println("the message size is" + messageSet
.kafka$javaapi$message$ByteBufferMessageSet$$underlying()
.size());
for (MessageAndOffset messageAndOffset: messageSet) {
long currentOffset = messageAndOffset.offset();
if (currentOffset < readOffset) {
System.out.println("Found an old offset: " + currentOffset + "
Expecting: " + readOffset);
continue;
}
readOffset = messageAndOffset.nextOffset();
ByteBuffer payload = messageAndOffset.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
System.out.println(String.valueOf(messageAndOffset.offset()) + ": "
+ new String(bytes, "UTF-8"));
numRead++;
a_maxReads--;
}
if (numRead == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {}
Thanks much!
Chen
Re: Kafka SimpleConsumer not working
Posted by Chen Wang <ch...@gmail.com>.
i am using 0.8.0. The high level api works as expected.
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.0</version>
On Thu, Feb 20, 2014 at 2:40 PM, Chen Wang <ch...@gmail.com>wrote:
> Hi,
> I am using kafka for the first time, and was running the sample from
>
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
>
> However, I cannot read any data from kafka. The kafka has 10
> partitions,and I tried to read from any of them. The fetch can succeed,
> however, the message size returned is always 0( System.out
> .println("the message size is" + messageSet
> .kafka$javaapi$message$ByteBufferMessageSet$$underlying()
> .size());). Is there something apparent missing for my case?
>
>
> while (a_maxReads > 0) {
> if (consumer == null) {
> consumer = new SimpleConsumer(leadBroker, a_port, 100000,
> 10240 * 1024, clientName);
> }
>
> System.out.println("start fetching");
> System.out.println("the readoffset is" + readOffset);
>
> FetchRequest req = new FetchRequestBuilder().clientId(clientName)
> .addFetch(a_topic, a_partition, readOffset, 1000000)
> .build();
> FetchResponse fetchResponse = consumer.fetch(req);
> System.out.println("finish fetching");
> if (fetchResponse.hasError()) {
> numErrors++;
> // Something went wrong!
> short code = fetchResponse.errorCode(a_topic, a_partition);
> System.out.println("Error fetching data from the Broker:" +
> leadBroker + " Reason: " + code);
> if (numErrors > 5)
> break;
> if (code == ErrorMapping.OffsetOutOfRangeCode()) {
> // We asked for an invalid offset. For simple case ask for
> // the last element to reset
> readOffset = getLastOffset(consumer, a_topic, a_partition,
> kafka.api.OffsetRequest.LatestTime(), clientName);
> continue;
> }
> consumer.close();
> consumer = null;
> leadBroker = findNewLeader(leadBroker, a_topic, a_partition,
> a_port);
> continue;
> }
> numErrors = 0;
>
> long numRead = 0;
> System.out.println("The topic is:" + a_topic + " partition is : " +
> a_partition);
> ByteBufferMessageSet messageSet = fetchResponse.messageSet(a_topic,
> a_partition);
> System.out
> .println("the message size is" + messageSet
> .kafka$javaapi$message$ByteBufferMessageSet$$underlying()
> .size());
> for (MessageAndOffset messageAndOffset: messageSet) {
> long currentOffset = messageAndOffset.offset();
> if (currentOffset < readOffset) {
> System.out.println("Found an old offset: " + currentOffset + "
> Expecting: " + readOffset);
> continue;
> }
> readOffset = messageAndOffset.nextOffset();
> ByteBuffer payload = messageAndOffset.message().payload();
>
> byte[] bytes = new byte[payload.limit()];
> payload.get(bytes);
> System.out.println(String.valueOf(messageAndOffset.offset()) + ":
> " + new String(bytes, "UTF-8"));
> numRead++;
> a_maxReads--;
> }
>
> if (numRead == 0) {
> try {
> Thread.sleep(1000);
> } catch (InterruptedException ie) {}
>
> Thanks much!
> Chen
>