You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/01/09 14:08:18 UTC

[GitHub] codelipenghui edited a comment on issue #3131: Consumer stop receive messages from broker

codelipenghui edited a comment on issue #3131: Consumer stop receive messages from broker
URL: https://github.com/apache/pulsar/issues/3131#issuecomment-452700099
 
 
   This is a test case to show how consumer stop consume messages:
   
   ```java
   package com.zhaopin.pulsar.issues;
   
   import org.apache.pulsar.client.api.Consumer;
   import org.apache.pulsar.client.api.Message;
   import org.apache.pulsar.client.api.Producer;
   import org.apache.pulsar.client.api.PulsarClient;
   import org.apache.pulsar.client.api.PulsarClientException;
   import org.apache.pulsar.client.api.SubscriptionType;
   
   import java.util.ArrayList;
   import java.util.List;
   import java.util.concurrent.TimeUnit;
   
   public class ConsumerStopReceiveMessages {
   
       public static void main(String[] args) throws PulsarClientException {
   
           final String topic = "your-topic";
   
           /**
            * Broker configs:
            *
            * maxUnackedMessagesPerConsumer=500
            * maxUnackedMessagesPerSubscription=2000
            */
           PulsarClient client = PulsarClient.builder()
                   .serviceUrl("pulsar://localhost:6650")
                   .build();
   
           // Create a producer with enable message batching
           Producer<byte[]> producer = client.newProducer()
                   .topic(topic)
                   .batchingMaxMessages(1000)
                   .batchingMaxPublishDelay(30, TimeUnit.SECONDS)
                   .blockIfQueueFull(true)
                   .maxPendingMessages(1000)
                   .enableBatching(true)
                   .create();
   
           // Create 6 consumers
           List<Consumer<byte[]>> consumers = new ArrayList<>();
           for (int i = 0; i < 6; i++) {
               consumers.add(client.newConsumer()
                       .topic(topic)
                       .subscriptionType(SubscriptionType.Shared)
                       .subscriptionName("test")
                       .ackTimeout(1, TimeUnit.SECONDS)
                       .receiverQueueSize(100)
                       .subscribe());
           }
   
           // Producer start publish messages
           new Thread(() -> {
               int index = 0;
               for (; ; ) {
                   producer.sendAsync((index++ + "").getBytes());
               }
           }).start();
   
           // Consumers start consume messages
           consumers.forEach(consumer -> new Thread(() -> {
               do {
                   // can't receive message
                   try {
                       Message<byte[]> msg = consumer.receive();
                       System.out.println("Message Received [x] " + consumer.getConsumerName() + " --- [" + msg.getMessageId() + "]" + " ---- " + new String(msg.getValue()));
                       //Do not ack messages, wait broker redelivery
                   } catch (Exception e) {
                       e.printStackTrace();
                   }
               } while (true);
           }).start());
       }
   }
   ```
   
   After a while, you will get following logs:
   
   ```
   Message Received [x] 78cab --- [15361:48:-1:990] ---- 0
   Message Received [x] 78cab --- [15361:48:-1:991] ---- 0
   Message Received [x] 78cab --- [15361:48:-1:992] ---- 0
   Message Received [x] 78cab --- [15361:48:-1:993] ---- 0
   Message Received [x] 78cab --- [15361:48:-1:994] ---- 0
   Message Received [x] 78cab --- [15361:48:-1:995] ---- 0
   Message Received [x] 78cab --- [15361:48:-1:996] ---- 0
   Message Received [x] 78cab --- [15361:48:-1:997] ---- 0
   Message Received [x] 78cab --- [15361:48:-1:998] ---- 0
   Message Received [x] 78cab --- [15361:48:-1:999] ---- 0
   [ConsumerBase{subscription='test', consumerName='78cab', topic='your-topic'}] 9 messages have timed-out
   [your-topic] [pulsar-api-test-14-3291] Pending messages: 1 --- Publish throughput: 7583.21 msg/s --- 0.33 Mbit/s --- Latency: med: 222.155 ms - 95pct: 587.127 ms - 99pct: 666.942 ms - 99.9pct: 766.028 ms - max: 766.102 ms --- Ack received rate: 7566.54 ack/s --- Failed messages: 0
   [your-topic] [test] [2a5ea] Prefetched messages: 0 --- Consume throughput: 16.45 msgs/s --- Throughput received: 0.00 msg/s --- 0.00 Mbit/s --- Ack sent rate: 0 ack/s --- Failed messages: 0 --- Failed acks: {}
   [your-topic] [test] [78cab] Prefetched messages: 0 --- Consume throughput: 332.48 msgs/s --- Throughput received: 0.00 msg/s --- 0.00 Mbit/s --- Ack sent rate: 0 ack/s --- Failed messages: 0 --- Failed acks: {}
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services