You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/08/29 14:37:57 UTC

[GitHub] [pulsar] kimcs opened a new issue #5073: java-client: Consumer closed after using seek(timestamp)

kimcs opened a new issue #5073: java-client: Consumer closed after using seek(timestamp)
URL: https://github.com/apache/pulsar/issues/5073
 
 
   **Describe the bug**
   Consumer is closed after a seek(timestamp) operation.
   
   **To Reproduce**
   ```java
   package sample;
   
   import org.apache.pulsar.client.admin.PulsarAdmin;
   import org.apache.pulsar.client.admin.PulsarAdminException;
   import org.apache.pulsar.client.api.Consumer;
   import org.apache.pulsar.client.api.MessageId;
   import org.apache.pulsar.client.api.Producer;
   import org.apache.pulsar.client.api.PulsarClient;
   import org.apache.pulsar.client.api.PulsarClientException;
   import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
   import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
   import org.testng.annotations.Test;
   
   import java.util.List;
   
   public class SeekTimestampTest {
   
       static final String topic = "public/default/seek-test-timestamp-magic-h509db3";
   
       void deleteTopic(String topic) throws PulsarClientException {
           ClientConfigurationData config = new ClientConfigurationData();
           config.setAuthentication(new AuthenticationDisabled());
           config.setServiceUrl("http://localhost:8080");
           try (PulsarAdmin admin = new PulsarAdmin("http://localhost:8080", config)) {
               List<String> topics = admin.namespaces().getTopics("public/default");
               for (String topicName : topics) {
                   if (topicName.contains(topic)) {
                       admin.topics().delete(topic);
                       break;
                   }
               }
           } catch (PulsarAdminException e) {
               throw new RuntimeException(e);
           }
       }
   
       void produceMessages(PulsarClient client, int n) throws PulsarClientException {
           try (Producer<byte[]> producer = client.newProducer().topic(topic).create()) {
               for (int i = 0; i < n; i++) {
                   MessageId messageId = producer.send(new byte[]{(byte) (1 + i)});
                   System.out.printf("Produced message: %s%n", messageId);
               }
           }
       }
   
       @Test
       public void thatSeekTimestampDoesNotCloseConsumer() throws PulsarClientException, InterruptedException {
           deleteTopic(topic);
   
           try (PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build()) {
               try (Consumer<byte[]> consumer = client.newConsumer().topic(topic).subscriptionName("sub1").subscribe()) {
                   try {
                       long someTimestamp = System.currentTimeMillis();
                       produceMessages(client, 10);
                       consumer.seek(someTimestamp);
                   } finally {
                       consumer.unsubscribe(); // FAILS with "Not connected to broker"
                   }
               }
           }
       }
   
   }
   ```
   
   **Expected behavior**
   That seek(timestamp) should never close the consumer.
   
   **Additional context**
   In attempting to reproduce this bug I have also seen the consumer fail before calling unsubscribe on it, e.g. when calling receive on it, but that was harder to reproduce consistently.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services