You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/05/14 10:06:49 UTC

[GitHub] [pulsar] bigbang489 opened a new issue #6960: Duplicated messages are sent to dead letter topic

bigbang489 opened a new issue #6960:
URL: https://github.com/apache/pulsar/issues/6960


   **To Reproduce**
   1. Consumer config: SubscriptionMode=Shared, SubscriptionName=Test1, ack Timeout=10000ms, enable Dead Letter Policy and setMaxRedeliveryCount=3
   2. Run 3 Consumers with the above config in 3 parallel threads. The consumers will not send the ACK
   3. Send a message to the topic which the consumers are listening to
   4. Because the ACK will not be sent, the message will be sent to the dead letter topic after the redelivery count is exceeded.
   
   **Expected behavior**
   Only one message is sent to dead-letters topic
   
   **Actual behavior**
   The message is sent to the dead-letters topic twice
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] bigbang489 edited a comment on issue #6960: Duplicated messages are sent to dead letter topic

Posted by GitBox <gi...@apache.org>.
bigbang489 edited a comment on issue #6960:
URL: https://github.com/apache/pulsar/issues/6960#issuecomment-632557593


   This is my code
   
   `package com.test;
   
   import java.util.concurrent.Executor;
   import java.util.concurrent.Executors;
   import java.util.concurrent.TimeUnit;
   
   import org.apache.pulsar.client.api.AuthenticationFactory;
   import org.apache.pulsar.client.api.ClientBuilder;
   import org.apache.pulsar.client.api.Consumer;
   import org.apache.pulsar.client.api.DeadLetterPolicy;
   import org.apache.pulsar.client.api.Message;
   import org.apache.pulsar.client.api.MessageListener;
   import org.apache.pulsar.client.api.Producer;
   import org.apache.pulsar.client.api.PulsarClient;
   import org.apache.pulsar.client.api.PulsarClientException;
   import org.apache.pulsar.client.api.SubscriptionType;
   
   public class App {
   	static Executor executor = Executors.newFixedThreadPool(3);
   	
   	public static void main(String[] args) throws InterruptedException, PulsarClientException {
   		startConsumer("my-topic", false);
   		startConsumer("my-topic", false);
   		startConsumer("my-topic", false);
   		startConsumer("DLQ", true); // This consumer is for dead letter
   		Thread.sleep(2000);
   		publish(String.valueOf(System.currentTimeMillis()), "my-topic");
   	}
   	static void startConsumer(String topic, boolean ack) {
   		executor.execute(()-> {
   			try {
   				subscribe(topic, ack);
   			} catch (PulsarClientException e) {
   				e.printStackTrace();
   			}
   		});
   	}
   	
   	static Consumer<byte[]> subscribe(String topic, boolean ack) throws PulsarClientException {
   		ClientBuilder cb = PulsarClient.builder();
   		cb.serviceUrl("pulsar://pulsar:6650");
   		cb.authentication(AuthenticationFactory.token("*************"));
   		PulsarClient client = cb.build();
   		@SuppressWarnings("serial")
   		Consumer<byte[]> consumer = client.newConsumer()
   			.subscriptionName("test")
   			.subscriptionType(SubscriptionType.Shared)
   			.topic(topic)
   			.ackTimeout(12, TimeUnit.SECONDS)
   			.messageListener(new MessageListener<byte[]>() {
   
   				@Override
   				public void received(Consumer<byte[]> consumer, Message<byte[]> msg) {
   					System.out.println("\r\nReceive messaged: '" + new String(msg.getData()) + "' from topic: " + msg.getTopicName());
   					try {
   						if(ack) {
   							consumer.acknowledge(msg);
   						}
   					} catch (PulsarClientException e) {
   						e.printStackTrace();
   					}
   				}
   				
   			})
   			.deadLetterPolicy(DeadLetterPolicy.builder()
   					.deadLetterTopic("DLQ")
   					.maxRedeliverCount(2)
   					.build())
   			.subscribe();
   		System.out.println("\r\nStarted a consumer...");
   		return consumer;
   	}
   	
   	static void publish(String message, String topic) throws PulsarClientException {
   		ClientBuilder cb = PulsarClient.builder();
   		cb.serviceUrl("pulsar://pulsar:6650");
   		cb.authentication(AuthenticationFactory.token("***************"));
   		PulsarClient client = cb.build();
   		Producer<byte[]> producer = client.newProducer().topic(topic).create();
   		try {
   			System.out.println("\r\nSent message: " + producer.send(message.getBytes()));
   		}
   		finally {
   			producer.close();
   		}
   		
   	}
   }
   `
   
   POM file:
   `<dependency>
     		<groupId>org.apache.pulsar</groupId>
     		<artifactId>pulsar-client</artifactId>
     		<version>2.5.1</version>
     	</dependency>
     	<dependency>
     		<groupId>org.apache.pulsar</groupId>
     		<artifactId>pulsar-client-api</artifactId>
     		<version>2.5.1</version>
     	</dependency>`
   
   **The result** 
   Receive messaged: '1590133963111' from topic: persistent://public/default/my-topic
   
   Receive messaged: '1590133963111' from topic: persistent://public/default/my-topic
   
   Receive messaged: '1590133963111' from topic: persistent://public/default/my-topic
   
   **Receive messaged: '1590133963111' from topic: persistent://public/default/DLQ**
   
   Receive messaged: '1590133963111' from topic: persistent://public/default/my-topic
   
   **Receive messaged: '1590133963111' from topic: persistent://public/default/DLQ**
   
   As you see, the message is sent to DLQ twice.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on issue #6960: Duplicated messages are sent to dead letter topic

Posted by GitBox <gi...@apache.org>.
315157973 commented on issue #6960:
URL: https://github.com/apache/pulsar/issues/6960#issuecomment-632991118


   I am fixing this bug


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] bigbang489 commented on issue #6960: Duplicated messages are sent to dead letter topic

Posted by GitBox <gi...@apache.org>.
bigbang489 commented on issue #6960:
URL: https://github.com/apache/pulsar/issues/6960#issuecomment-631221626


   > @bigbang489 Thanks for reporting this issue, Could this be stable reproduced? It would be appreciate if you could provide your code example to reproduce it.
   
   The issue is stable, just enable the deadletter policy and start 3 consumers in 3 parallel threads.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] bigbang489 edited a comment on issue #6960: Duplicated messages are sent to dead letter topic

Posted by GitBox <gi...@apache.org>.
bigbang489 edited a comment on issue #6960:
URL: https://github.com/apache/pulsar/issues/6960#issuecomment-632557593


    @jiazhai  This is my code
   
   `package com.test;
   
   import java.util.concurrent.Executor;
   import java.util.concurrent.Executors;
   import java.util.concurrent.TimeUnit;
   
   import org.apache.pulsar.client.api.AuthenticationFactory;
   import org.apache.pulsar.client.api.ClientBuilder;
   import org.apache.pulsar.client.api.Consumer;
   import org.apache.pulsar.client.api.DeadLetterPolicy;
   import org.apache.pulsar.client.api.Message;
   import org.apache.pulsar.client.api.MessageListener;
   import org.apache.pulsar.client.api.Producer;
   import org.apache.pulsar.client.api.PulsarClient;
   import org.apache.pulsar.client.api.PulsarClientException;
   import org.apache.pulsar.client.api.SubscriptionType;
   
   public class App {
   	static Executor executor = Executors.newFixedThreadPool(3);
   	
   	public static void main(String[] args) throws InterruptedException, PulsarClientException {
   		startConsumer("my-topic", false);
   		startConsumer("my-topic", false);
   		startConsumer("my-topic", false);
   		startConsumer("DLQ", true); // This consumer is for dead letter
   		Thread.sleep(2000);
   		publish(String.valueOf(System.currentTimeMillis()), "my-topic");
   	}
   	static void startConsumer(String topic, boolean ack) {
   		executor.execute(()-> {
   			try {
   				subscribe(topic, ack);
   			} catch (PulsarClientException e) {
   				e.printStackTrace();
   			}
   		});
   	}
   	
   	static Consumer<byte[]> subscribe(String topic, boolean ack) throws PulsarClientException {
   		ClientBuilder cb = PulsarClient.builder();
   		cb.serviceUrl("pulsar://pulsar:6650");
   		cb.authentication(AuthenticationFactory.token("*************"));
   		PulsarClient client = cb.build();
   		@SuppressWarnings("serial")
   		Consumer<byte[]> consumer = client.newConsumer()
   			.subscriptionName("test")
   			.subscriptionType(SubscriptionType.Shared)
   			.topic(topic)
   			.ackTimeout(12, TimeUnit.SECONDS)
   			.messageListener(new MessageListener<byte[]>() {
   
   				@Override
   				public void received(Consumer<byte[]> consumer, Message<byte[]> msg) {
   					System.out.println("\r\nReceive messaged: '" + new String(msg.getData()) + "' from topic: " + msg.getTopicName());
   					try {
   						if(ack) {
   							consumer.acknowledge(msg);
   						}
   					} catch (PulsarClientException e) {
   						e.printStackTrace();
   					}
   				}
   				
   			})
   			.deadLetterPolicy(DeadLetterPolicy.builder()
   					.deadLetterTopic("DLQ")
   					.maxRedeliverCount(2)
   					.build())
   			.subscribe();
   		System.out.println("\r\nStarted a consumer...");
   		return consumer;
   	}
   	
   	static void publish(String message, String topic) throws PulsarClientException {
   		ClientBuilder cb = PulsarClient.builder();
   		cb.serviceUrl("pulsar://pulsar:6650");
   		cb.authentication(AuthenticationFactory.token("***************"));
   		PulsarClient client = cb.build();
   		Producer<byte[]> producer = client.newProducer().topic(topic).create();
   		try {
   			System.out.println("\r\nSent message: " + producer.send(message.getBytes()));
   		}
   		finally {
   			producer.close();
   		}
   		
   	}
   }
   `
   
   POM file:
   `<dependency>
     		<groupId>org.apache.pulsar</groupId>
     		<artifactId>pulsar-client</artifactId>
     		<version>2.5.1</version>
     	</dependency>
     	<dependency>
     		<groupId>org.apache.pulsar</groupId>
     		<artifactId>pulsar-client-api</artifactId>
     		<version>2.5.1</version>
     	</dependency>`
   
   **The result** 
   Receive messaged: '1590133963111' from topic: persistent://public/default/my-topic
   
   Receive messaged: '1590133963111' from topic: persistent://public/default/my-topic
   
   Receive messaged: '1590133963111' from topic: persistent://public/default/my-topic
   
   **Receive messaged: '1590133963111' from topic: persistent://public/default/DLQ**
   
   Receive messaged: '1590133963111' from topic: persistent://public/default/my-topic
   
   **Receive messaged: '1590133963111' from topic: persistent://public/default/DLQ**
   
   As you see, the message is sent to DLQ twice.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on issue #6960: Duplicated messages are sent to dead letter topic

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on issue #6960:
URL: https://github.com/apache/pulsar/issues/6960#issuecomment-632576902


   @bigbang489 Thanks, @315157973 already working on this issue.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui closed issue #6960: Duplicated messages are sent to dead letter topic

Posted by GitBox <gi...@apache.org>.
codelipenghui closed issue #6960:
URL: https://github.com/apache/pulsar/issues/6960


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] bigbang489 commented on issue #6960: Duplicated messages are sent to dead letter topic

Posted by GitBox <gi...@apache.org>.
bigbang489 commented on issue #6960:
URL: https://github.com/apache/pulsar/issues/6960#issuecomment-632557593


   This is my code
   
   `package com.test;
   
   import java.util.concurrent.Executor;
   import java.util.concurrent.Executors;
   import java.util.concurrent.TimeUnit;
   
   import org.apache.pulsar.client.api.AuthenticationFactory;
   import org.apache.pulsar.client.api.ClientBuilder;
   import org.apache.pulsar.client.api.Consumer;
   import org.apache.pulsar.client.api.DeadLetterPolicy;
   import org.apache.pulsar.client.api.Message;
   import org.apache.pulsar.client.api.MessageListener;
   import org.apache.pulsar.client.api.Producer;
   import org.apache.pulsar.client.api.PulsarClient;
   import org.apache.pulsar.client.api.PulsarClientException;
   import org.apache.pulsar.client.api.SubscriptionType;
   
   public class App {
   	static Executor executor = Executors.newFixedThreadPool(3);
   	
   	public static void main(String[] args) throws InterruptedException, PulsarClientException {
   		startConsumer("my-topic", false);
   		startConsumer("my-topic", false);
   		startConsumer("my-topic", false);
   		startConsumer("DLQ", true); // This consumer is for dead letter
   		Thread.sleep(2000);
   		publish(String.valueOf(System.currentTimeMillis()), "my-topic");
   	}
   	static void startConsumer(String topic, boolean ack) {
   		executor.execute(()-> {
   			try {
   				subscribe(topic, ack);
   			} catch (PulsarClientException e) {
   				e.printStackTrace();
   			}
   		});
   	}
   	
   	static Consumer<byte[]> subscribe(String topic, boolean ack) throws PulsarClientException {
   		ClientBuilder cb = PulsarClient.builder();
   		cb.serviceUrl("pulsar://pulsar:6650");
   		cb.authentication(AuthenticationFactory.token("*************"));
   		PulsarClient client = cb.build();
   		@SuppressWarnings("serial")
   		Consumer<byte[]> consumer = client.newConsumer()
   			.subscriptionName("test")
   			.subscriptionType(SubscriptionType.Shared)
   			.topic(topic)
   			.ackTimeout(12, TimeUnit.SECONDS)
   			.messageListener(new MessageListener<byte[]>() {
   
   				@Override
   				public void received(Consumer<byte[]> consumer, Message<byte[]> msg) {
   					System.out.println("\r\nReceive messaged: '" + new String(msg.getData()) + "' from topic: " + msg.getTopicName());
   					try {
   						if(ack) {
   							consumer.acknowledge(msg);
   						}
   					} catch (PulsarClientException e) {
   						e.printStackTrace();
   					}
   				}
   				
   			})
   			.deadLetterPolicy(DeadLetterPolicy.builder()
   					.deadLetterTopic("DLQ")
   					.maxRedeliverCount(2)
   					.build())
   			.subscribe();
   		System.out.println("\r\nStarted a consumer...");
   		return consumer;
   	}
   	
   	static void publish(String message, String topic) throws PulsarClientException {
   		ClientBuilder cb = PulsarClient.builder();
   		cb.serviceUrl("pulsar://pulsar:6650");
   		cb.authentication(AuthenticationFactory.token("***************"));
   		PulsarClient client = cb.build();
   		Producer<byte[]> producer = client.newProducer().topic(topic).create();
   		try {
   			System.out.println("\r\nSent message: " + producer.send(message.getBytes()));
   		}
   		finally {
   			producer.close();
   		}
   		
   	}
   }
   `
   
   **The result** 
   Receive messaged: '1590133963111' from topic: persistent://public/default/my-topic
   
   Receive messaged: '1590133963111' from topic: persistent://public/default/my-topic
   
   Receive messaged: '1590133963111' from topic: persistent://public/default/my-topic
   
   **Receive messaged: '1590133963111' from topic: persistent://public/default/DLQ**
   
   Receive messaged: '1590133963111' from topic: persistent://public/default/my-topic
   
   **Receive messaged: '1590133963111' from topic: persistent://public/default/DLQ**
   
   As you see, the message is sent to DLQ twice.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jiazhai commented on issue #6960: Duplicated messages are sent to dead letter topic

Posted by GitBox <gi...@apache.org>.
jiazhai commented on issue #6960:
URL: https://github.com/apache/pulsar/issues/6960#issuecomment-629902251


   @bigbang489 Thanks for reporting this issue, Could this be stable reproduced?   It would be appreciate if you could provide your code example to reproduce it. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org