You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/12/17 15:52:13 UTC

[GitHub] [pulsar] Lanayx commented on issue #5877: Shared subscription doesn't work well with partitioned topic in 2.4.2

Lanayx commented on issue #5877: Shared subscription doesn't work well with partitioned topic in 2.4.2
URL: https://github.com/apache/pulsar/issues/5877#issuecomment-566601841
 
 
   @codelipenghui I don't think so, I'm on windows, so only able to run pulsar as docker or kubernetes image. This is java code that I used to test those consumers if it helps (sorry for quality, I'm newbie in Java). 
   ```
   package jtest;
   
   import java.io.IOException;
   import java.time.Duration;
   import java.time.Instant;
   import java.util.ArrayList;
   import java.util.concurrent.CompletableFuture;
   import java.util.concurrent.Future;
   import java.util.concurrent.TimeUnit;
   import java.util.function.Function;
   import java.util.logging.LogManager;
   
   import org.apache.pulsar.client.api.Consumer;
   import org.apache.pulsar.client.api.Producer;
   import org.apache.pulsar.client.api.Message;
   import org.apache.pulsar.client.api.MessageId;
   import org.apache.pulsar.client.api.PulsarClient;
   import org.apache.pulsar.client.api.PulsarClientException;
   import org.apache.pulsar.client.api.Reader;
   import org.apache.pulsar.client.api.SubscriptionInitialPosition;
   import org.apache.pulsar.client.api.SubscriptionType;
   import org.apache.pulsar.client.impl.TopicMessageIdImpl;
   
   /**
    * Hello world!
    */
   public final class App {
       private static Instant start;
   
       private App() {
       }
   
       public static void run(Consumer consumer, int number) {
   
           // Wait for a message
           CompletableFuture<Message> msgF = consumer.receiveAsync();
           msgF.thenAccept(msg -> {
               // try {
               // Do something with the message
               TopicMessageIdImpl id = (TopicMessageIdImpl) msg.getMessageId();
               System.out.printf("Message received: %s %s %s\n", msg.getKey(), new String(msg.getData()), id.getTopicPartitionName());
   
               // Acknowledge the message so that it can be deleted by the message broker
               consumer.acknowledgeAsync(msg).thenAccept(action -> {
                   if (number < 5000) {
                       run(consumer, number + 1);
                   } else {
                       Instant end = Instant.now();
                       long time = Duration.between(start, end).toMillis();
                       System.out.println("End! " + time);
                   }
   
               }).exceptionally(x -> {
                   System.out.println("Exception!");
                   return x;
               });
           });
   
       }
   
       /**
        * Says hello to the world.
        *
        * @param args The arguments of the program.
        * @throws IOException
        */
       public static void main(String[] args) throws IOException {
   
           System.out.println("Hello World!");
           try {
               PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://my-pulsar-cluster:31002").build();
               Consumer consumer = client.newConsumer()
                   .topic("persistent://public/default/partitioned2")
                   .subscriptionName("test-subscription3")
                   .subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
                   .subscriptionType(SubscriptionType.Shared)
                   .subscribe();
   
               Consumer consumer2 = client.newConsumer()
                   .topic("persistent://public/default/partitioned2")
                   .subscriptionName("test-subscription3")
                   .subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
                   .subscriptionType(SubscriptionType.Shared)
                   .subscribe();
   
               System.out.println("Consumer created! ");
   
               run(consumer,0);
               run(consumer2,0);
   
   		} catch (PulsarClientException e) {
   			// TODO Auto-generated catch block
   			e.printStackTrace();
   		}
       }
   }
   ```
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services