You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/02/25 07:58:25 UTC

[pulsar] 06/13: Fix send to deadLetterTopic not working when reach maxRedeliverCount (#14317)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8d1d78e1eecc082fe016da7452f5235b55126555
Author: 萧易客 <km...@live.com>
AuthorDate: Tue Feb 22 17:17:47 2022 +0800

    Fix send to deadLetterTopic not working when reach maxRedeliverCount (#14317)
    
    If a message reached maxRedeliverCount, it will send to deadLetterTopic, since 2.8.0, this mechanism is broken, it was introduced in #9970
    
    (cherry picked from commit 16beb9d97fdc64092c8f3fe6959d6bf20dd0aa13)
---
 .../apache/pulsar/client/impl/schema/AbstractSchema.java    |  7 +++----
 .../java/org/apache/pulsar/client/impl/MessageTest.java     | 13 ++++++++++++-
 2 files changed, 15 insertions(+), 5 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java
index f1e6a2d..c9fd683 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java
@@ -74,14 +74,13 @@ public abstract class AbstractSchema<T> implements Schema<T> {
      * @param schemaVersion the version
      * @return the schema at that specific version
      * @throws SchemaSerializationException in case of unknown schema version
-     * @throws NullPointerException in case of null schemaVersion
+     * @throws NullPointerException in case of null schemaVersion and supportSchemaVersioning is true
      */
     public Schema<?> atSchemaVersion(byte[] schemaVersion) throws SchemaSerializationException {
-        Objects.requireNonNull(schemaVersion);
         if (!supportSchemaVersioning()) {
             return this;
-        } else {
-            throw new SchemaSerializationException("Not implemented for " + this.getClass());
         }
+        Objects.requireNonNull(schemaVersion);
+        throw new SchemaSerializationException("Not implemented for " + this.getClass());
     }
 }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageTest.java
index 6d633e7..13cf4f6 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageTest.java
@@ -22,8 +22,8 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
-
 import java.nio.ByteBuffer;
+import java.util.Optional;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
@@ -81,4 +81,15 @@ public class MessageTest {
         assertFalse(topicMessage.isReplicated());
         assertNull(topicMessage.getReplicatedFrom());
     }
+
+    @Test
+    public void testMessageImplGetReaderSchema() {
+        MessageMetadata builder = new MessageMetadata();
+        builder.hasSchemaVersion();
+        ByteBuffer payload = ByteBuffer.wrap(new byte[0]);
+        Message<byte[]> msg = MessageImpl.create(builder, payload, Schema.BYTES, null);
+
+        Optional<Schema<?>> readerSchema = msg.getReaderSchema();
+        assertTrue(readerSchema.isPresent());
+    }
 }