You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Koen Vantomme <ko...@gmail.com> on 2016/10/23 16:10:21 UTC

Consumer error : This consumer has already been closed

Hello,

I'm creating a simple consumer in JAVA, the first time I run the consumer
it works fine.
I stop the application. When I want to rerun the consumer I get error
message "This consumer has already been closed"

Any suggestions ?
Regards,
Koen

2016-10-23 17:17:34,261 [main] INFO   AppInfoParser - Kafka commitId :
23c69d62a0cabf06
Exception in thread "main" java.lang.IllegalStateException: This consumer
has already been closed.
at
org.apache.kafka.clients.consumer.KafkaConsumer.ensureNotClosed(KafkaConsumer.java:1310)
at
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1321)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:844)


The code :

String topic ="testmetrics";
String group ="cg1";

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", group);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
        "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
        "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<String,
String>(props);


consumer.subscribe(Arrays.asList(topic));
System.out.println("Subscribed to topic " + topic);
int i = 0;

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
        {
        System.out.printf("offset = %d, key = %s, value = %s\n",
                record.offset(), record.key(), record.value());
        }
c

Re: Consumer error : This consumer has already been closed

Posted by Gourab Chowdhury <go...@gmail.com>.
I can see that you have done consumer.subscribe(Arrays.asList(topic)); I
don't know if this is a problem but try consumer.subscribe(Collections.
singletonList(this.topic));

You can refer to official example code of Kafka at github[1]. Hope that
helps.

[1]
https://github.com/apache/kafka/blob/trunk/examples/src/main/java/kafka/examples/Consumer.java


Gourab

On Tue, Oct 25, 2016 at 1:36 AM, Koen Vantomme <ko...@gmail.com>
wrote:

> Hello,
> Could someone reply me if I made a mistake here or not. Did I make a rookie
> mistake here ? Is this not the correct forum here to put my question ?
> It's a bit strange to have this error  : Exception in thread "main"
> java.lang.IllegalStateException: This consumer has already been closed
> On the net there is also not much feedback  on this error.
> Kind regards,
> Koen
>
>
> On Sun, Oct 23, 2016 at 6:10 PM, Koen Vantomme <ko...@gmail.com>
> wrote:
>
> > Hello,
> >
> > I'm creating a simple consumer in JAVA, the first time I run the consumer
> > it works fine.
> > I stop the application. When I want to rerun the consumer I get error
> > message "This consumer has already been closed"
> >
> > Any suggestions ?
> > Regards,
> > Koen
> >
> > 2016-10-23 17:17:34,261 [main] INFO   AppInfoParser - Kafka commitId :
> > 23c69d62a0cabf06
> > Exception in thread "main" java.lang.IllegalStateException: This
> consumer
> > has already been closed.
> > at org.apache.kafka.clients.consumer.KafkaConsumer.
> > ensureNotClosed(KafkaConsumer.java:1310)
> > at org.apache.kafka.clients.consumer.KafkaConsumer.
> > acquire(KafkaConsumer.java:1321)
> > at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > KafkaConsumer.java:844)
> >
> >
> > The code :
> >
> > String topic ="testmetrics";
> > String group ="cg1";
> >
> > Properties props = new Properties();
> > props.put("bootstrap.servers", "localhost:9092");
> > props.put("group.id", group);
> > props.put("enable.auto.commit", "true");
> > props.put("auto.commit.interval.ms", "1000");
> > props.put("session.timeout.ms", "30000");
> > props.put("key.deserializer",
> >         "org.apache.kafka.common.serialization.StringDeserializer");
> > props.put("value.deserializer",
> >         "org.apache.kafka.common.serialization.StringDeserializer");
> >
> > KafkaConsumer<String, String> consumer = new KafkaConsumer<String,
> String>(props);
> >
> >
> > consumer.subscribe(Arrays.asList(topic));
> > System.out.println("Subscribed to topic " + topic);
> > int i = 0;
> >
> > while (true) {
> >     ConsumerRecords<String, String> records = consumer.poll(100);
> >     for (ConsumerRecord<String, String> record : records)
> >         {
> >         System.out.printf("offset = %d, key = %s, value = %s\n",
> >                 record.offset(), record.key(), record.value());
> >         }
> > c
> >
> >
>

Re: Consumer error : This consumer has already been closed

Posted by Koen Vantomme <ko...@gmail.com>.
Hello,
Could someone reply me if I made a mistake here or not. Did I make a rookie
mistake here ? Is this not the correct forum here to put my question ?
It's a bit strange to have this error  : Exception in thread "main"
java.lang.IllegalStateException: This consumer has already been closed
On the net there is also not much feedback  on this error.
Kind regards,
Koen


On Sun, Oct 23, 2016 at 6:10 PM, Koen Vantomme <ko...@gmail.com>
wrote:

> Hello,
>
> I'm creating a simple consumer in JAVA, the first time I run the consumer
> it works fine.
> I stop the application. When I want to rerun the consumer I get error
> message "This consumer has already been closed"
>
> Any suggestions ?
> Regards,
> Koen
>
> 2016-10-23 17:17:34,261 [main] INFO   AppInfoParser - Kafka commitId :
> 23c69d62a0cabf06
> Exception in thread "main" java.lang.IllegalStateException: This consumer
> has already been closed.
> at org.apache.kafka.clients.consumer.KafkaConsumer.
> ensureNotClosed(KafkaConsumer.java:1310)
> at org.apache.kafka.clients.consumer.KafkaConsumer.
> acquire(KafkaConsumer.java:1321)
> at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:844)
>
>
> The code :
>
> String topic ="testmetrics";
> String group ="cg1";
>
> Properties props = new Properties();
> props.put("bootstrap.servers", "localhost:9092");
> props.put("group.id", group);
> props.put("enable.auto.commit", "true");
> props.put("auto.commit.interval.ms", "1000");
> props.put("session.timeout.ms", "30000");
> props.put("key.deserializer",
>         "org.apache.kafka.common.serialization.StringDeserializer");
> props.put("value.deserializer",
>         "org.apache.kafka.common.serialization.StringDeserializer");
>
> KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
>
>
> consumer.subscribe(Arrays.asList(topic));
> System.out.println("Subscribed to topic " + topic);
> int i = 0;
>
> while (true) {
>     ConsumerRecords<String, String> records = consumer.poll(100);
>     for (ConsumerRecord<String, String> record : records)
>         {
>         System.out.printf("offset = %d, key = %s, value = %s\n",
>                 record.offset(), record.key(), record.value());
>         }
> c
>
>