You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ti...@apache.org on 2022/12/27 13:53:52 UTC

[pulsar] branch master updated: [fix][client] retry letter producer respect auto schema (#19051)

This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 28772212e43 [fix][client] retry letter producer respect auto schema (#19051)
28772212e43 is described below

commit 28772212e4302ae4c2ff06e8ec9495218395650f
Author: tison <wa...@gmail.com>
AuthorDate: Tue Dec 27 21:53:43 2022 +0800

    [fix][client] retry letter producer respect auto schema (#19051)
    
    Signed-off-by: tison <wa...@gmail.com>
    Co-authored-by: congbobo184 <co...@github.com>
---
 .../apache/pulsar/client/api/RetryTopicTest.java   | 126 ++++++++++++++++++++-
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  20 ++--
 .../src/main/resources/findbugsExclude.xml         |   5 +
 3 files changed, 141 insertions(+), 10 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
index 3028392ada1..7012cb0d698 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
@@ -19,7 +19,9 @@
 package org.apache.pulsar.client.api;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import java.lang.reflect.Field;
 import java.util.HashMap;
@@ -29,6 +31,10 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
+import lombok.Data;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.reflect.Nullable;
+import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.client.util.RetryMessageUtil;
@@ -103,7 +109,7 @@ public class RetryTopicTest extends ProducerConsumerBase {
 
         int totalInDeadLetter = 0;
         do {
-            Message message = deadLetterConsumer.receive();
+            Message<byte[]> message = deadLetterConsumer.receive();
             log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
             deadLetterConsumer.acknowledge(message);
             totalInDeadLetter++;
@@ -128,6 +134,124 @@ public class RetryTopicTest extends ProducerConsumerBase {
         checkConsumer.close();
     }
 
+    @Data
+    public static class Foo {
+        @Nullable
+        private String field1;
+        @Nullable
+        private String field2;
+    }
+
+    @Data
+    public static class FooV2 {
+        @Nullable
+        private String field1;
+        @Nullable
+        private String field2;
+        @Nullable
+        private String field3;
+    }
+
+    @Test(timeOut = 20000)
+    public void testAutoConsumeSchemaRetryLetter() throws Exception {
+        final String topic = "persistent://my-property/my-ns/retry-letter-topic";
+        final String subName = "my-subscription";
+        final String retrySubName = "my-subscription" + "-RETRY";
+        final int sendMessages = 10;
+        final String retryTopic = topic + "-RETRY";
+
+        admin.topics().createNonPartitionedTopic(topic);
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES())
+                .topic(topic)
+                .enableBatching(false)
+                .create();
+        @Cleanup
+        Consumer<GenericRecord> consumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+                .topic(topic)
+                .subscriptionName(subName)
+                .isAckReceiptEnabled(true)
+                .subscriptionType(SubscriptionType.Shared)
+                .enableRetry(true)
+                .deadLetterPolicy(DeadLetterPolicy.builder().retryLetterTopic(retryTopic)
+                        .maxRedeliverCount(Integer.MAX_VALUE).build())
+                .subscribe();
+        @Cleanup
+        Consumer<GenericRecord> retryTopicConsumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+                .topic(retryTopic)
+                .subscriptionName(retrySubName)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+        Set<MessageId> messageIds = new HashSet<>();
+        for (int i = 0; i < sendMessages; i++) {
+            if (i % 2 == 0) {
+                Foo foo = new Foo();
+                foo.field1 = i + "";
+                foo.field2 = i + "";
+                messageIds.add(producer.newMessage(Schema.AVRO(Foo.class)).value(foo).send());
+            } else {
+                FooV2 foo = new FooV2();
+                foo.field1 = i + "";
+                foo.field2 = i + "";
+                foo.field3 = i + "";
+                messageIds.add(producer.newMessage(Schema.AVRO(FooV2.class)).value(foo).send());
+            }
+        }
+        producer.close();
+
+        int totalReceived = 0;
+        do {
+            Message<GenericRecord> message = consumer.receive();
+            log.info(
+                    "consumer received message (schema={}) : {} {}",
+                    message.getReaderSchema().get(), message.getMessageId(), new String(message.getData()));
+            consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
+            assertTrue(messageIds.contains(message.getMessageId()));
+            totalReceived++;
+        } while (totalReceived < sendMessages);
+
+        // consume receive retry message
+        Set<MessageId> retryTopicMessageIds = new HashSet<>();
+        do {
+            Message<GenericRecord> message = consumer.receive();
+            log.info(
+                    "consumer received retry message (schema={}) : {} {}",
+                    message.getReaderSchema().get(), message.getMessageId(), new String(message.getData()));
+            consumer.acknowledge(message);
+            retryTopicMessageIds.add(message.getMessageId());
+            assertFalse(messageIds.contains(message.getMessageId()));
+            totalReceived++;
+        } while (totalReceived - sendMessages < sendMessages);
+
+        Message<GenericRecord> message = consumer.receive(2, TimeUnit.SECONDS);
+        assertNull(message);
+
+        totalReceived = 0;
+
+        // retryTopicConsumer receive retry messages
+        do {
+            Message<GenericRecord> retryTopicMessage = retryTopicConsumer.receive();
+            assertTrue(retryTopicMessageIds.contains(retryTopicMessage.getMessageId()));
+            assertEquals(retryTopicMessage.getValue().getField("field1"), totalReceived + "");
+            assertEquals(retryTopicMessage.getValue().getField("field2"), totalReceived + "");
+            if (totalReceived % 2 == 0) {
+                try {
+                    retryTopicMessage.getValue().getField("field3");
+                } catch (Exception e) {
+                    assertTrue(e instanceof AvroRuntimeException);
+                    assertEquals(e.getMessage(), "Not a valid schema field: field3");
+                }
+            } else {
+                assertEquals(retryTopicMessage.getValue().getField("field3"), totalReceived + "");
+            }
+            totalReceived++;
+        } while (totalReceived < sendMessages);
+
+        message = retryTopicConsumer.receive(2, TimeUnit.SECONDS);
+        assertNull(message);
+    }
+
     @Test(timeOut = 60000)
     public void testRetryTopicProperties() throws Exception {
         final String topic = "persistent://my-property/my-ns/retry-topic";
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 2680e707030..8fef7399836 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -182,7 +182,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
     private volatile CompletableFuture<Producer<byte[]>> deadLetterProducer;
 
-    private volatile Producer<T> retryLetterProducer;
+    private volatile Producer<byte[]> retryLetterProducer;
     private final ReadWriteLock createProducerLock = new ReentrantReadWriteLock();
 
     protected volatile boolean paused;
@@ -611,7 +611,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             createProducerLock.writeLock().lock();
             try {
                 if (retryLetterProducer == null) {
-                    retryLetterProducer = client.newProducer(schema)
+                    retryLetterProducer = client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema))
                             .topic(this.deadLetterPolicy.getRetryLetterTopic())
                             .enableBatching(false)
                             .blockIfQueueFull(false)
@@ -636,18 +636,18 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                 if (customProperties != null) {
                     propertiesMap.putAll(customProperties);
                 }
-                int reconsumetimes = 1;
+                int reconsumeTimes = 1;
                 if (propertiesMap.containsKey(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)) {
-                    reconsumetimes = Integer.parseInt(
+                    reconsumeTimes = Integer.parseInt(
                             propertiesMap.get(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES));
-                    reconsumetimes = reconsumetimes + 1;
+                    reconsumeTimes = reconsumeTimes + 1;
                 }
-                propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES, String.valueOf(reconsumetimes));
+                propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES, String.valueOf(reconsumeTimes));
                 propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_DELAY_TIME,
                         String.valueOf(unit.toMillis(delayTime)));
 
                 MessageId finalMessageId = messageId;
-                if (reconsumetimes > this.deadLetterPolicy.getMaxRedeliverCount()
+                if (reconsumeTimes > this.deadLetterPolicy.getMaxRedeliverCount()
                         && StringUtils.isNotBlank(deadLetterPolicy.getDeadLetterTopic())) {
                     initDeadLetterProducerIfNeeded();
                     deadLetterProducer.thenAccept(dlqProducer -> {
@@ -672,8 +672,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                         return null;
                     });
                 } else {
-                    TypedMessageBuilder<T> typedMessageBuilderNew = retryLetterProducer.newMessage()
-                            .value(retryMessage.getValue())
+                    assert retryMessage != null;
+                    TypedMessageBuilder<byte[]> typedMessageBuilderNew = retryLetterProducer
+                            .newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
+                            .value(retryMessage.getData())
                             .properties(propertiesMap);
                     if (delayTime > 0) {
                         typedMessageBuilderNew.deliverAfter(delayTime, unit);
diff --git a/pulsar-client/src/main/resources/findbugsExclude.xml b/pulsar-client/src/main/resources/findbugsExclude.xml
index a37c886466e..39ed71bb2d6 100644
--- a/pulsar-client/src/main/resources/findbugsExclude.xml
+++ b/pulsar-client/src/main/resources/findbugsExclude.xml
@@ -91,6 +91,11 @@
         <Bug pattern="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD"/>
     </Match>
 
+    <Match>
+        <Class name="~org.apache.pulsar.client.impl.ConsumerImpl.*"/>
+        <Bug pattern="REC_CATCH_EXCEPTION"/>
+    </Match>
+
     <Match>
         <Class name="org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl"/>
         <Bug pattern="SE_BAD_FIELD"/>