You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/07/15 22:11:30 UTC

[GitHub] [pulsar] Lanayx commented on a change in pull request #6449: Support Consumers Set Custom Retry Delay

Lanayx commented on a change in pull request #6449:
URL: https://github.com/apache/pulsar/pull/6449#discussion_r455394658



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -511,6 +527,129 @@ boolean markAckForBatchMessage(BatchMessageIdImpl batchMessageId, AckType ackTyp
         return sendAcknowledge(messageId, ackType, properties, txnImpl);
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType ackType,
+                                                       Map<String,Long> properties, 
+                                                       long delayTime,
+                                                       TimeUnit unit) {
+        MessageId messageId = message.getMessageId();
+        if(messageId instanceof TopicMessageIdImpl) {
+            messageId = ((TopicMessageIdImpl)messageId).getInnerMessageId();
+        }
+        checkArgument(messageId instanceof MessageIdImpl);
+        if (getState() != State.Ready && getState() != State.Connecting) {
+            stats.incrementNumAcksFailed();
+            PulsarClientException exception = new PulsarClientException("Consumer not ready. State: " + getState());
+            if (AckType.Individual.equals(ackType)) {
+                onAcknowledge(messageId, exception);
+            } else if (AckType.Cumulative.equals(ackType)) {
+                onAcknowledgeCumulative(messageId, exception);
+            }
+            return FutureUtil.failedFuture(exception);
+        }
+        if (delayTime < 0) {
+            delayTime = 0;
+        }
+        if (retryLetterProducer == null) {
+            try {
+                createProducerLock.writeLock().lock();
+                if (retryLetterProducer == null) {
+                    retryLetterProducer = client.newProducer(schema)
+                            .topic(this.deadLetterPolicy.getRetryLetterTopic())
+                            .enableBatching(false)
+                            .blockIfQueueFull(false)
+                            .create();
+                }
+            } catch (Exception e) {
+                log.error("Create retry letter producer exception with topic: {}", deadLetterPolicy.getRetryLetterTopic(), e);
+            } finally {
+                createProducerLock.writeLock().unlock();
+            }
+        }
+        if (retryLetterProducer != null) {
+            try {
+                MessageImpl<T> retryMessage = null;
+                String originMessageIdStr = null;
+                String originTopicNameStr = null;
+                if (message instanceof TopicMessageImpl) {
+                    retryMessage = (MessageImpl<T>) ((TopicMessageImpl<T>) message).getMessage();
+                    originMessageIdStr = ((TopicMessageIdImpl) message.getMessageId()).getInnerMessageId().toString();
+                    originTopicNameStr = ((TopicMessageIdImpl) message.getMessageId()).getTopicName();
+                } else if (message instanceof MessageImpl) {
+                    retryMessage = (MessageImpl<T>) message;
+                    originMessageIdStr = ((MessageImpl<T>) message).getMessageId().toString();
+                    originTopicNameStr =  ((MessageImpl<T>) message).getTopicName();
+                }
+                SortedMap<String, String> propertiesMap = new TreeMap<>();
+                int reconsumetimes = 1;
+                if (message.getProperties() != null) {
+                    propertiesMap.putAll(message.getProperties());
+                }
+
+                if (propertiesMap.containsKey(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)) {
+                    reconsumetimes = Integer.valueOf(propertiesMap.get(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES));
+                    reconsumetimes = reconsumetimes + 1;
+                   
+                } else {
+                    propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC, originTopicNameStr);
+                    propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID, originMessageIdStr);
+                }
+
+                propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES, String.valueOf(reconsumetimes));
+                propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_DELAY_TIME, String.valueOf(unit.toMillis(delayTime)));
+                
+               if (reconsumetimes > this.deadLetterPolicy.getMaxRedeliverCount()) {
+                   processPossibleToDLQ((MessageIdImpl)messageId);

Review comment:
       Does anyone know what is the point of this line? The logic of sending message to dead letter consumer is described within more than twenty lines below as well as message acknowledgment, why is it done twice?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org