You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/01/03 11:58:24 UTC

[GitHub] [pulsar] Shawyeok opened a new issue #9109: pulsar-timer thread blocked at redeliverUnacknowledgedMessages

Shawyeok opened a new issue #9109:
URL: https://github.com/apache/pulsar/issues/9109


   **Describe the bug**
   After broker crash and restart, consumers got blocked, consumer rateOut decrease to 0, can't auto recover without restart consumer process.
   
   **To Reproduce**
   Steps to reproduce the behavior:
   1. Run reproduce code below
   ```java
   public class PulsarConsumerTest {
   
       private static final Logger LOG = LoggerFactory.getLogger(PulsarConsumerTest.class);
   
       private PulsarClient pulsarClient;
       private Consumer<byte[]> consumer;
   
       @Before
       public void setUp() throws Exception {
           pulsarClient = PulsarClient.builder()
               .serviceUrl("pulsar://<broker>:6650")
               .build();
           String topic = "persistent://sample/ns1/topic1";
           String subscriptionName = "test";
           DeadLetterPolicy deadLetterPolicy = DeadLetterPolicy.builder()
               .deadLetterTopic(String.format("%s-%s-DLQ", topic, subscriptionName))
               .maxRedeliverCount(3)
               .build();
           consumer = pulsarClient.newConsumer()
               .topic(topic)
               .deadLetterPolicy(deadLetterPolicy)
               .ackTimeout(5, TimeUnit.SECONDS)
               .acknowledgmentGroupTime(0, TimeUnit.MILLISECONDS)
               .subscriptionName(subscriptionName)
               .subscriptionType(SubscriptionType.Shared)
               .subscribe();
       }
   
       @After
       public void tearDown() throws Exception {
           pulsarClient.close();
       }
   
       @Test
       public void test() throws PulsarClientException {
           while (true) {
               Message<byte[]> message = consumer.receive();
               MessageId messageId = message.getMessageId();
               LOG.info("received message with messageId: {}", messageId);
               try {
                   consume(message);
               } catch (Exception e) {
                   LOG.error("consume message exception with messageId: {}", messageId, e);
               }
           }
       }
   
       private void consume(Message<byte[]> message) {
           throw new IllegalStateException("mock consume fails");
       }
   }
   ```
   2. Send 1000 messages to topic `persistent://sample/ns1/topic1`
   3. Wait about 10-15s(time to redeliver), kill and restart broker process
   4. Look into the thread `pulsar-timer-4-1`'s stack, check whether it's blocked or not
   5. If the problem doesn't appear, try step 2-4 a few more times
   
   **Expected behavior**
   The thread `pulsar-timer-4-1` blocked at `producer.send` forever, and the method `consumer.receive` may blocked at `UnAckedMessageTracker#add` method due to acquire a writeLock inside `UnAckedMessageTracker`.
   ```
   "pulsar-timer-4-1" #37 prio=5 os_prio=0 tid=0x00007efe584c0000 nid=0x62 waiting on condition [0x00007efe30aec000]
      java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000000af9bbee8> (a java.util.concurrent.CompletableFuture$Signaller)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
    at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
    at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
    at org.apache.pulsar.client.impl.ProducerBase.send(ProducerBase.java:115)
    at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:89)
    at org.apache.pulsar.client.impl.ConsumerImpl.processPossibleToDLQ(ConsumerImpl.java:1452)
    at org.apache.pulsar.client.impl.ConsumerImpl.lambda$null$12(ConsumerImpl.java:1390)
    at org.apache.pulsar.client.impl.ConsumerImpl$$Lambda$811/717118161.test(Unknown Source)
    at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:174)
    at java.util.Iterator.forEachRemaining(Iterator.java:116)
    at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
    at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
    at org.apache.pulsar.client.impl.ConsumerImpl.lambda$redeliverUnacknowledgedMessages$14(ConsumerImpl.java:1396)
    at org.apache.pulsar.client.impl.ConsumerImpl$$Lambda$810/1511392410.accept(Unknown Source)
    at java.lang.Iterable.forEach(Iterable.java:75)
    at org.apache.pulsar.client.impl.ConsumerImpl.redeliverUnacknowledgedMessages(ConsumerImpl.java:1388)
    at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.lambda$redeliverUnacknowledgedMessages$20(MultiTopicsConsumerImpl.java:621)
    at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl$$Lambda$807/636669140.accept(Unknown Source)
    at java.util.HashMap.forEach(HashMap.java:1289)
    at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.redeliverUnacknowledgedMessages(MultiTopicsConsumerImpl.java:619)
    at org.apache.pulsar.client.impl.UnAckedMessageTracker$2.run(UnAckedMessageTracker.java:144)
    at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:672)
    at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:747)
    at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:472)
    at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)
      Locked ownable synchronizers:
    - <0x00000000a9dab610> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
   ```
   
   **Screenshots**
   The consumer thread got blocked in our production.
   ![image](https://user-images.githubusercontent.com/5058708/103477812-28b5cd80-4dfd-11eb-8535-cf867b5d701b.png)
   
   **Additional context**
   Broker version: `2.4.0`
   pulsar-client version: `2.5.2`
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui closed issue #9109: pulsar-timer thread blocked at redeliverUnacknowledgedMessages

Posted by GitBox <gi...@apache.org>.
codelipenghui closed issue #9109:
URL: https://github.com/apache/pulsar/issues/9109


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on issue #9109: pulsar-timer thread blocked at redeliverUnacknowledgedMessages

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on issue #9109:
URL: https://github.com/apache/pulsar/issues/9109#issuecomment-788550126


   https://github.com/apache/pulsar/pull/9552 fixed this problem


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org