You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Stu Smith <st...@gmail.com> on 2015/10/30 22:07:26 UTC

Using the 0.8.2.2 KafkaConsumer Client

Hello!

  I'm running into trouble using the latest Kafka client.

0.8.2.2 appears to be listed as a stable release on Maven Central:

http://mvnrepository.com/artifact/org.apache.kafka/kafka-clients

And it only includes the:

org.apache.kafka.clients.consumer.KafkaConsumer client

All the other Consumers listed in the documents do not appear to be
available in this release.

However, in the example code for the 0.8.2.2 branch, it covers the
ConsumerConnector client:

https://github.com/apache/kafka/blob/0.8.2.2/examples/src/main/java/kafka/examples/Consumer.java

Which no longer exists in the 0.8.2.2 release.

The KafkaConsumer client I get always returns null on poll(), similar to
behavior reported for the 0.8.2 branch (but not the 0.8.2.2 branch):

http://mail-archives.apache.org/mod_mbox/kafka-users/201502.mbox/%3CCAOeJiJj-c747Ak99qioytrD4=E24W8SiVqgx=OOQFkVdb7+97w@mail.gmail.com%3E

So it appear to 0.8.2.2 shipped with an old, broken KafkaConsumer client,
but removed the older, working ConsumerConnector / MessageStream interface.

Is KafkaConsumer expected to work in 0.8.2.2 ?
Or are we expected to use the old client, and I'm somehow not seeing the
package?

I confirmed I have messages waiting by using the java producer api, and
listening with the consoleConsumer application, and it happily prints
whatever the producer sends. However, the ConsoleConsumer appears to be
using the scala API, so it can't provide any leads on how to use the java
one.

Or am I doing something wrong ?

scannerKafkaProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
scannerKafkaProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
"true");
scannerKafkaProperties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
"10");
scannerKafkaProperties.put(ConsumerConfig.SESSION_TIMEOUT_MS, "30000");
scannerKafkaProperties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY,
"roundrobin");
scannerKafkaProperties.put("zookeeper.session.timeout.ms", "400");
scannerKafkaProperties.put("zookeeper.sync.time.ms", "200");
scannerKafkaProperties.put("zookeeper.connect","localhost:2181");

...
private static final String DESERIALIZER =
"org.apache.kafka.common.serialization.StringDeserializer";
private static final String SERIALIZER =
"org.apache.kafka.common.serialization.StringSerializer";...

kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
DESERIALIZER);
kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
DESERIALIZER);
kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, SERIALIZER);
kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
SERIALIZER);
kafkaProperties.put("offsets.storage","kafka");
kafkaProperties.put("dual.commit.enabled", "false");
...
this.kafkaConsumer = new KafkaConsumer<>( kafkaProperties );
this.kafkaProducer = new KafkaProducer<>( kafkaProperties );
...
TopicPartition topicPartition = new TopicPartition(this.topic,0);
this.kafkaConsumer.subscribe(topicPartition);
...
while( this.running ) {
Map<String, ConsumerRecords<String, String>> messages =
this.kafkaConsumer.poll(messageWaitTimeout);
if( messages == null ) {
    //this.log.debug("Finished polling, no messages received.");
    for( int i = 0; i < 200; ++i ) {
        this.kafkaProducer.send(new ProducerRecord<>(this.topic, 0, "test",
"test"));
    }
    continue;
}
....
(And to re-iterate, the console consumer does pick up the messages, if I
run it, but the Java API does not).

Or is the 0.8.2.2 High-Level Java API simple not usable?

Take care,
  -stu

Re: Using the 0.8.2.2 KafkaConsumer Client

Posted by Pratapi Hemant Patel <he...@gmail.com>.
Actually in 0.8.2.2 only kafkaproducer is fully implemented not
Kafkaconsumer.

Here is the implementation of kafkaconsumer poll method in 0.8.2.2.

@Override

    public Map<String, ConsumerRecords<K,V>> poll(long timeout) {

        // TODO Auto-generated method stub

        return null;

    }

KafkaConsumer will be released in 0.9, (perhaps in this Nov, as stated by
some people on this mailer).

Best Regards,
Hemant
9810752184 / 9013982184
On 31-Oct-2015 3:29 am, "Stu Smith" <st...@gmail.com> wrote:

> Note that I did work around this issue, by including the entire Kafka:
>
> <dependency>
>     <groupId>org.apache.kafka</groupId>
>     <artifactId>kafka_2.10</artifactId>
>     <version>0.8.2.2</version>
> </dependency>
>
> dependency, and using the legacy Consumer API,
> instead of the kafka-clients dependency:
>
> <dependency>
>     <groupId>org.apache.kafka</groupId>
>     <artifactId>kafka-clients</artifactId>
>     <version>0.8.2.2</version>
> </dependency>
>
> Listed in the documentation:
>
> http://kafka.apache.org/documentation.html#producerapi
>
> Since this dependency is called out right before it outlines the Consumer
> API, and the Consumer API docs don't mention that the Consumer API in the
> kafka-clients dependency is broken, it might be helpful if documentation
> points at that the kafka-clients dependency contains a broken Consumer, and
> the kafka_2.10 dependency should be used to access the legacy api.
>
> Take care,
>   -stu
>
> On Fri, Oct 30, 2015 at 2:07 PM, Stu Smith <st...@gmail.com> wrote:
>
> > Hello!
> >
> >   I'm running into trouble using the latest Kafka client.
> >
> > 0.8.2.2 appears to be listed as a stable release on Maven Central:
> >
> > http://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
> >
> > And it only includes the:
> >
> > org.apache.kafka.clients.consumer.KafkaConsumer client
> >
> > All the other Consumers listed in the documents do not appear to be
> > available in this release.
> >
> > However, in the example code for the 0.8.2.2 branch, it covers the
> > ConsumerConnector client:
> >
> >
> >
> https://github.com/apache/kafka/blob/0.8.2.2/examples/src/main/java/kafka/examples/Consumer.java
> >
> > Which no longer exists in the 0.8.2.2 release.
> >
> > The KafkaConsumer client I get always returns null on poll(), similar to
> > behavior reported for the 0.8.2 branch (but not the 0.8.2.2 branch):
> >
> >
> >
> http://mail-archives.apache.org/mod_mbox/kafka-users/201502.mbox/%3CCAOeJiJj-c747Ak99qioytrD4=E24W8SiVqgx=OOQFkVdb7+97w@mail.gmail.com%3E
> >
> > So it appear to 0.8.2.2 shipped with an old, broken KafkaConsumer client,
> > but removed the older, working ConsumerConnector / MessageStream
> interface.
> >
> > Is KafkaConsumer expected to work in 0.8.2.2 ?
> > Or are we expected to use the old client, and I'm somehow not seeing the
> > package?
> >
> > I confirmed I have messages waiting by using the java producer api, and
> > listening with the consoleConsumer application, and it happily prints
> > whatever the producer sends. However, the ConsoleConsumer appears to be
> > using the scala API, so it can't provide any leads on how to use the java
> > one.
> >
> > Or am I doing something wrong ?
> >
> > scannerKafkaProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
> > "localhost:9092");
> > scannerKafkaProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
> > "true");
> > scannerKafkaProperties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
> > "10");
> > scannerKafkaProperties.put(ConsumerConfig.SESSION_TIMEOUT_MS, "30000");
> > scannerKafkaProperties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY,
> > "roundrobin");
> > scannerKafkaProperties.put("zookeeper.session.timeout.ms", "400");
> > scannerKafkaProperties.put("zookeeper.sync.time.ms", "200");
> > scannerKafkaProperties.put("zookeeper.connect","localhost:2181");
> >
> > ...
> > private static final String DESERIALIZER =
> > "org.apache.kafka.common.serialization.StringDeserializer";
> > private static final String SERIALIZER =
> > "org.apache.kafka.common.serialization.StringSerializer";...
> >
> > kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> > DESERIALIZER);
> > kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> > DESERIALIZER);
> > kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
> > SERIALIZER);
> > kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> > SERIALIZER);
> > kafkaProperties.put("offsets.storage","kafka");
> > kafkaProperties.put("dual.commit.enabled", "false");
> > ...
> > this.kafkaConsumer = new KafkaConsumer<>( kafkaProperties );
> > this.kafkaProducer = new KafkaProducer<>( kafkaProperties );
> > ...
> > TopicPartition topicPartition = new TopicPartition(this.topic,0);
> > this.kafkaConsumer.subscribe(topicPartition);
> > ...
> > while( this.running ) {
> > Map<String, ConsumerRecords<String, String>> messages =
> > this.kafkaConsumer.poll(messageWaitTimeout);
> > if( messages == null ) {
> >     //this.log.debug("Finished polling, no messages received.");
> >     for( int i = 0; i < 200; ++i ) {
> >         this.kafkaProducer.send(new ProducerRecord<>(this.topic, 0,
> > "test", "test"));
> >     }
> >     continue;
> > }
> > ....
> > (And to re-iterate, the console consumer does pick up the messages, if I
> > run it, but the Java API does not).
> >
> > Or is the 0.8.2.2 High-Level Java API simple not usable?
> >
> > Take care,
> >   -stu
> >
>

Re: Using the 0.8.2.2 KafkaConsumer Client

Posted by Stu Smith <st...@gmail.com>.
Note that I did work around this issue, by including the entire Kafka:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.8.2.2</version>
</dependency>

dependency, and using the legacy Consumer API,
instead of the kafka-clients dependency:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.8.2.2</version>
</dependency>

Listed in the documentation:

http://kafka.apache.org/documentation.html#producerapi

Since this dependency is called out right before it outlines the Consumer
API, and the Consumer API docs don't mention that the Consumer API in the
kafka-clients dependency is broken, it might be helpful if documentation
points at that the kafka-clients dependency contains a broken Consumer, and
the kafka_2.10 dependency should be used to access the legacy api.

Take care,
  -stu

On Fri, Oct 30, 2015 at 2:07 PM, Stu Smith <st...@gmail.com> wrote:

> Hello!
>
>   I'm running into trouble using the latest Kafka client.
>
> 0.8.2.2 appears to be listed as a stable release on Maven Central:
>
> http://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
>
> And it only includes the:
>
> org.apache.kafka.clients.consumer.KafkaConsumer client
>
> All the other Consumers listed in the documents do not appear to be
> available in this release.
>
> However, in the example code for the 0.8.2.2 branch, it covers the
> ConsumerConnector client:
>
>
> https://github.com/apache/kafka/blob/0.8.2.2/examples/src/main/java/kafka/examples/Consumer.java
>
> Which no longer exists in the 0.8.2.2 release.
>
> The KafkaConsumer client I get always returns null on poll(), similar to
> behavior reported for the 0.8.2 branch (but not the 0.8.2.2 branch):
>
>
> http://mail-archives.apache.org/mod_mbox/kafka-users/201502.mbox/%3CCAOeJiJj-c747Ak99qioytrD4=E24W8SiVqgx=OOQFkVdb7+97w@mail.gmail.com%3E
>
> So it appear to 0.8.2.2 shipped with an old, broken KafkaConsumer client,
> but removed the older, working ConsumerConnector / MessageStream interface.
>
> Is KafkaConsumer expected to work in 0.8.2.2 ?
> Or are we expected to use the old client, and I'm somehow not seeing the
> package?
>
> I confirmed I have messages waiting by using the java producer api, and
> listening with the consoleConsumer application, and it happily prints
> whatever the producer sends. However, the ConsoleConsumer appears to be
> using the scala API, so it can't provide any leads on how to use the java
> one.
>
> Or am I doing something wrong ?
>
> scannerKafkaProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092");
> scannerKafkaProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
> "true");
> scannerKafkaProperties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
> "10");
> scannerKafkaProperties.put(ConsumerConfig.SESSION_TIMEOUT_MS, "30000");
> scannerKafkaProperties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY,
> "roundrobin");
> scannerKafkaProperties.put("zookeeper.session.timeout.ms", "400");
> scannerKafkaProperties.put("zookeeper.sync.time.ms", "200");
> scannerKafkaProperties.put("zookeeper.connect","localhost:2181");
>
> ...
> private static final String DESERIALIZER =
> "org.apache.kafka.common.serialization.StringDeserializer";
> private static final String SERIALIZER =
> "org.apache.kafka.common.serialization.StringSerializer";...
>
> kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> DESERIALIZER);
> kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> DESERIALIZER);
> kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
> SERIALIZER);
> kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> SERIALIZER);
> kafkaProperties.put("offsets.storage","kafka");
> kafkaProperties.put("dual.commit.enabled", "false");
> ...
> this.kafkaConsumer = new KafkaConsumer<>( kafkaProperties );
> this.kafkaProducer = new KafkaProducer<>( kafkaProperties );
> ...
> TopicPartition topicPartition = new TopicPartition(this.topic,0);
> this.kafkaConsumer.subscribe(topicPartition);
> ...
> while( this.running ) {
> Map<String, ConsumerRecords<String, String>> messages =
> this.kafkaConsumer.poll(messageWaitTimeout);
> if( messages == null ) {
>     //this.log.debug("Finished polling, no messages received.");
>     for( int i = 0; i < 200; ++i ) {
>         this.kafkaProducer.send(new ProducerRecord<>(this.topic, 0,
> "test", "test"));
>     }
>     continue;
> }
> ....
> (And to re-iterate, the console consumer does pick up the messages, if I
> run it, but the Java API does not).
>
> Or is the 0.8.2.2 High-Level Java API simple not usable?
>
> Take care,
>   -stu
>