You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ti...@apache.org on 2022/12/27 13:53:52 UTC
[pulsar] branch master updated: [fix][client] retry letter producer respect auto schema (#19051)
This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 28772212e43 [fix][client] retry letter producer respect auto schema (#19051)
28772212e43 is described below
commit 28772212e4302ae4c2ff06e8ec9495218395650f
Author: tison <wa...@gmail.com>
AuthorDate: Tue Dec 27 21:53:43 2022 +0800
[fix][client] retry letter producer respect auto schema (#19051)
Signed-off-by: tison <wa...@gmail.com>
Co-authored-by: congbobo184 <co...@github.com>
---
.../apache/pulsar/client/api/RetryTopicTest.java | 126 ++++++++++++++++++++-
.../apache/pulsar/client/impl/ConsumerImpl.java | 20 ++--
.../src/main/resources/findbugsExclude.xml | 5 +
3 files changed, 141 insertions(+), 10 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
index 3028392ada1..7012cb0d698 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
@@ -19,7 +19,9 @@
package org.apache.pulsar.client.api;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.lang.reflect.Field;
import java.util.HashMap;
@@ -29,6 +31,10 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
+import lombok.Data;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.reflect.Nullable;
+import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.util.RetryMessageUtil;
@@ -103,7 +109,7 @@ public class RetryTopicTest extends ProducerConsumerBase {
int totalInDeadLetter = 0;
do {
- Message message = deadLetterConsumer.receive();
+ Message<byte[]> message = deadLetterConsumer.receive();
log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
deadLetterConsumer.acknowledge(message);
totalInDeadLetter++;
@@ -128,6 +134,124 @@ public class RetryTopicTest extends ProducerConsumerBase {
checkConsumer.close();
}
+ @Data
+ public static class Foo {
+ @Nullable
+ private String field1;
+ @Nullable
+ private String field2;
+ }
+
+ @Data
+ public static class FooV2 {
+ @Nullable
+ private String field1;
+ @Nullable
+ private String field2;
+ @Nullable
+ private String field3;
+ }
+
+ @Test(timeOut = 20000)
+ public void testAutoConsumeSchemaRetryLetter() throws Exception {
+ final String topic = "persistent://my-property/my-ns/retry-letter-topic";
+ final String subName = "my-subscription";
+ final String retrySubName = "my-subscription" + "-RETRY";
+ final int sendMessages = 10;
+ final String retryTopic = topic + "-RETRY";
+
+ admin.topics().createNonPartitionedTopic(topic);
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES())
+ .topic(topic)
+ .enableBatching(false)
+ .create();
+ @Cleanup
+ Consumer<GenericRecord> consumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+ .topic(topic)
+ .subscriptionName(subName)
+ .isAckReceiptEnabled(true)
+ .subscriptionType(SubscriptionType.Shared)
+ .enableRetry(true)
+ .deadLetterPolicy(DeadLetterPolicy.builder().retryLetterTopic(retryTopic)
+ .maxRedeliverCount(Integer.MAX_VALUE).build())
+ .subscribe();
+ @Cleanup
+ Consumer<GenericRecord> retryTopicConsumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+ .topic(retryTopic)
+ .subscriptionName(retrySubName)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscribe();
+ Set<MessageId> messageIds = new HashSet<>();
+ for (int i = 0; i < sendMessages; i++) {
+ if (i % 2 == 0) {
+ Foo foo = new Foo();
+ foo.field1 = i + "";
+ foo.field2 = i + "";
+ messageIds.add(producer.newMessage(Schema.AVRO(Foo.class)).value(foo).send());
+ } else {
+ FooV2 foo = new FooV2();
+ foo.field1 = i + "";
+ foo.field2 = i + "";
+ foo.field3 = i + "";
+ messageIds.add(producer.newMessage(Schema.AVRO(FooV2.class)).value(foo).send());
+ }
+ }
+ producer.close();
+
+ int totalReceived = 0;
+ do {
+ Message<GenericRecord> message = consumer.receive();
+ log.info(
+ "consumer received message (schema={}) : {} {}",
+ message.getReaderSchema().get(), message.getMessageId(), new String(message.getData()));
+ consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
+ assertTrue(messageIds.contains(message.getMessageId()));
+ totalReceived++;
+ } while (totalReceived < sendMessages);
+
+ // consume receive retry message
+ Set<MessageId> retryTopicMessageIds = new HashSet<>();
+ do {
+ Message<GenericRecord> message = consumer.receive();
+ log.info(
+ "consumer received retry message (schema={}) : {} {}",
+ message.getReaderSchema().get(), message.getMessageId(), new String(message.getData()));
+ consumer.acknowledge(message);
+ retryTopicMessageIds.add(message.getMessageId());
+ assertFalse(messageIds.contains(message.getMessageId()));
+ totalReceived++;
+ } while (totalReceived - sendMessages < sendMessages);
+
+ Message<GenericRecord> message = consumer.receive(2, TimeUnit.SECONDS);
+ assertNull(message);
+
+ totalReceived = 0;
+
+ // retryTopicConsumer receive retry messages
+ do {
+ Message<GenericRecord> retryTopicMessage = retryTopicConsumer.receive();
+ assertTrue(retryTopicMessageIds.contains(retryTopicMessage.getMessageId()));
+ assertEquals(retryTopicMessage.getValue().getField("field1"), totalReceived + "");
+ assertEquals(retryTopicMessage.getValue().getField("field2"), totalReceived + "");
+ if (totalReceived % 2 == 0) {
+ try {
+ retryTopicMessage.getValue().getField("field3");
+ } catch (Exception e) {
+ assertTrue(e instanceof AvroRuntimeException);
+ assertEquals(e.getMessage(), "Not a valid schema field: field3");
+ }
+ } else {
+ assertEquals(retryTopicMessage.getValue().getField("field3"), totalReceived + "");
+ }
+ totalReceived++;
+ } while (totalReceived < sendMessages);
+
+ message = retryTopicConsumer.receive(2, TimeUnit.SECONDS);
+ assertNull(message);
+ }
+
@Test(timeOut = 60000)
public void testRetryTopicProperties() throws Exception {
final String topic = "persistent://my-property/my-ns/retry-topic";
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 2680e707030..8fef7399836 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -182,7 +182,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
private volatile CompletableFuture<Producer<byte[]>> deadLetterProducer;
- private volatile Producer<T> retryLetterProducer;
+ private volatile Producer<byte[]> retryLetterProducer;
private final ReadWriteLock createProducerLock = new ReentrantReadWriteLock();
protected volatile boolean paused;
@@ -611,7 +611,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
createProducerLock.writeLock().lock();
try {
if (retryLetterProducer == null) {
- retryLetterProducer = client.newProducer(schema)
+ retryLetterProducer = client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema))
.topic(this.deadLetterPolicy.getRetryLetterTopic())
.enableBatching(false)
.blockIfQueueFull(false)
@@ -636,18 +636,18 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
if (customProperties != null) {
propertiesMap.putAll(customProperties);
}
- int reconsumetimes = 1;
+ int reconsumeTimes = 1;
if (propertiesMap.containsKey(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)) {
- reconsumetimes = Integer.parseInt(
+ reconsumeTimes = Integer.parseInt(
propertiesMap.get(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES));
- reconsumetimes = reconsumetimes + 1;
+ reconsumeTimes = reconsumeTimes + 1;
}
- propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES, String.valueOf(reconsumetimes));
+ propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES, String.valueOf(reconsumeTimes));
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_DELAY_TIME,
String.valueOf(unit.toMillis(delayTime)));
MessageId finalMessageId = messageId;
- if (reconsumetimes > this.deadLetterPolicy.getMaxRedeliverCount()
+ if (reconsumeTimes > this.deadLetterPolicy.getMaxRedeliverCount()
&& StringUtils.isNotBlank(deadLetterPolicy.getDeadLetterTopic())) {
initDeadLetterProducerIfNeeded();
deadLetterProducer.thenAccept(dlqProducer -> {
@@ -672,8 +672,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
return null;
});
} else {
- TypedMessageBuilder<T> typedMessageBuilderNew = retryLetterProducer.newMessage()
- .value(retryMessage.getValue())
+ assert retryMessage != null;
+ TypedMessageBuilder<byte[]> typedMessageBuilderNew = retryLetterProducer
+ .newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
+ .value(retryMessage.getData())
.properties(propertiesMap);
if (delayTime > 0) {
typedMessageBuilderNew.deliverAfter(delayTime, unit);
diff --git a/pulsar-client/src/main/resources/findbugsExclude.xml b/pulsar-client/src/main/resources/findbugsExclude.xml
index a37c886466e..39ed71bb2d6 100644
--- a/pulsar-client/src/main/resources/findbugsExclude.xml
+++ b/pulsar-client/src/main/resources/findbugsExclude.xml
@@ -91,6 +91,11 @@
<Bug pattern="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD"/>
</Match>
+ <Match>
+ <Class name="~org.apache.pulsar.client.impl.ConsumerImpl.*"/>
+ <Bug pattern="REC_CATCH_EXCEPTION"/>
+ </Match>
+
<Match>
<Class name="org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl"/>
<Bug pattern="SE_BAD_FIELD"/>