You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@pulsar.apache.org by Tecno Brain <ce...@gmail.com> on 2022/03/17 22:14:22 UTC

Java Pulsar Client

Hello,
  I have an application that for every message it receives, it processes
it  and writes a message to a different topic, finally acknowledges the
message received.

  On shutdown, I would like for the subscriber threads to terminate
processing the messages they already read.
  I am closing all the Consumers first and then I close the Pulsar client.

 for (Consumer<?> consumer : consumers) {
          consumer.close();
  }
pulsarClient.close()

But, there are consumer threads that have not yet finished. So, they fail
when trying to write to the message (because the PulsarClient is already
closing) or when trying to ack the original message (because the Consumer
has been closed)

Is there a way to wait on the consumer threads to complete (of course, with
a timeout)?

The threads are named like:
pulsar-external-listener-23-1
pulsar-external-listener-25-1

That thread pool is configured in the PulsarClient (listenerThreads).
Is there a way to get a handle to this pool ?

For now, I plan on just adding a Thread.sleep() in the middle after pausing
the consumers:

for (Consumer<?> consumer : consumers) {
          consumer.pause();
  }
Thread.sleep(timeout);
for (Consumer<?> consumer : consumers) {
          consumer.close();
  }
pulsarClient.close()

 at
org.apache.pulsar.client.impl.ConsumerBase.callMessageListener(ConsumerBase.java:945)
 at
org.apache.pulsar.client.impl.ConsumerBase.lambda$triggerListener$7(ConsumerBase.java:924)

at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
 at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
 at
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
 at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)

at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

at
org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 at java.base/java.lang.Thread.run(Thread.java:829) Caused by:
org.apache.pulsar.client.api.PulsarClientException$AlreadyClosedException:
Client already closed : state = Closing at
org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:975)
at
org.apache.pulsar.client.impl.ProducerBuilderImpl.create(ProducerBuilderImpl.java:91)



org.apache.pulsar.client.api.PulsarClientException:
java.util.concurrent.ExecutionException:
org.apache.pulsar.client.api.PulsarClientException: Consumer already closed
...
at
org.apache.pulsar.client.impl.ConsumerBase.callMessageListener(ConsumerBase.java:945)
 at
org.apache.pulsar.client.impl.ConsumerBase.lambda$triggerListener$7(ConsumerBase.java:924)
 at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
 at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
 at
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
 at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
 at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

at
org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.pulsar.client.api.PulsarClientException:
java.util.concurrent.ExecutionException:
org.apache.pulsar.client.api.PulsarClientException: Consumer already closed
 at
org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1027)
 at
org.apache.pulsar.client.impl.ConsumerBase.acknowledge(ConsumerBase.java:318)
 at
org.apache.pulsar.client.impl.ConsumerBase.acknowledge(ConsumerBase.java:306)

 ... 9 more Caused by: java.util.concurrent.ExecutionException:
org.apache.pulsar.client.api.PulsarClientException: Consumer already closed
at
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
at
org.apache.pulsar.client.impl.ConsumerBase.acknowledge(ConsumerBase.java:313)
... 11 more Caused by: org.apache.pulsar.client.api.PulsarClientException:
Consumer already closed at
org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.doAcknowledge(MultiTopicsConsumerImpl.java:437)
at
org.apache.pulsar.client.impl.ConsumerBase.doAcknowledgeWithTxn(ConsumerBase.java:554)
at
org.apache.pulsar.client.impl.ConsumerBase.acknowledgeAsync(ConsumerBase.java:494)
at
org.apache.pulsar.client.impl.ConsumerBase.acknowledgeAsync(ConsumerBase.java:483)
... 12 more