You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/02/24 06:44:59 UTC
[pulsar] branch branch-2.8 updated: Fix send to deadLetterTopic not working when reach maxRedeliverCount (#14317)
This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new f1a1294 Fix send to deadLetterTopic not working when reach maxRedeliverCount (#14317)
f1a1294 is described below
commit f1a129479053fd439c08cdb7e78ad47ac7a46bfb
Author: 萧易客 <km...@live.com>
AuthorDate: Tue Feb 22 17:17:47 2022 +0800
Fix send to deadLetterTopic not working when reach maxRedeliverCount (#14317)
If a message reached maxRedeliverCount, it will send to deadLetterTopic, since 2.8.0, this mechanism is broken, it was introduced in #9970
(cherry picked from commit 16beb9d97fdc64092c8f3fe6959d6bf20dd0aa13)
---
.../apache/pulsar/client/impl/schema/AbstractSchema.java | 7 +++----
.../java/org/apache/pulsar/client/impl/MessageTest.java | 13 ++++++++++++-
2 files changed, 15 insertions(+), 5 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java
index 8cf7a05..33c2ed1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java
@@ -75,14 +75,13 @@ public abstract class AbstractSchema<T> implements Schema<T> {
* @param schemaVersion the version
* @return the schema at that specific version
* @throws SchemaSerializationException in case of unknown schema version
- * @throws NullPointerException in case of null schemaVersion
+ * @throws NullPointerException in case of null schemaVersion and supportSchemaVersioning is true
*/
public Schema<?> atSchemaVersion(byte[] schemaVersion) throws SchemaSerializationException {
- Objects.requireNonNull(schemaVersion);
if (!supportSchemaVersioning()) {
return this;
- } else {
- throw new SchemaSerializationException("Not implemented for " + this.getClass());
}
+ Objects.requireNonNull(schemaVersion);
+ throw new SchemaSerializationException("Not implemented for " + this.getClass());
}
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageTest.java
index 15a4f99..27698b51 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageTest.java
@@ -22,8 +22,8 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
-
import java.nio.ByteBuffer;
+import java.util.Optional;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.api.proto.MessageMetadata;
@@ -81,4 +81,15 @@ public class MessageTest {
assertFalse(topicMessage.isReplicated());
assertNull(topicMessage.getReplicatedFrom());
}
+
+ @Test
+ public void testMessageImplGetReaderSchema() {
+ MessageMetadata builder = new MessageMetadata();
+ builder.hasSchemaVersion();
+ ByteBuffer payload = ByteBuffer.wrap(new byte[0]);
+ Message<byte[]> msg = MessageImpl.create(builder, payload, Schema.BYTES);
+
+ Optional<Schema<?>> readerSchema = msg.getReaderSchema();
+ assertTrue(readerSchema.isPresent());
+ }
}