You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/09/08 05:37:58 UTC
[pulsar] branch branch-2.11 updated: [fix][broker] Fix the replicator unnecessary get schema request for bytes schema (#17523)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 345417f284e [fix][broker] Fix the replicator unnecessary get schema request for bytes schema (#17523)
345417f284e is described below
commit 345417f284e31ded29b25d93c4a4921b3058c1e3
Author: Penghui Li <pe...@apache.org>
AuthorDate: Thu Sep 8 13:37:01 2022 +0800
[fix][broker] Fix the replicator unnecessary get schema request for bytes schema (#17523)
(cherry picked from commit 2ed561436deea9639d177e8e43317c37ea44152d)
---
.../apache/pulsar/broker/service/ReplicatorTest.java | 13 +++++++++++++
.../org/apache/pulsar/client/impl/ProducerImpl.java | 11 ++++++++++-
.../apache/pulsar/client/impl/ProducerImplTest.java | 19 +++++++++++++++++++
3 files changed, 42 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 3e3ef98df65..9b87b06012e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -468,6 +468,19 @@ public class ReplicatorTest extends ReplicatorTestBase {
consumer2.acknowledge(msg2);
consumer3.acknowledge(msg3);
}
+
+ @Cleanup
+ Producer<byte[]> producerBytes = client1.newProducer()
+ .topic(topic.toString())
+ .enableBatching(false)
+ .create();
+
+ byte[] data = "Bytes".getBytes();
+ producerBytes.send(data);
+
+ assertEquals(consumer1.receive().getValue().getNativeObject(), data);
+ assertEquals(consumer2.receive().getValue().getNativeObject(), data);
+ assertEquals(consumer3.receive().getValue().getNativeObject(), data);
}
@Test
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 792c8596d1b..be411cfa4bb 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -702,13 +702,22 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
}
}
- private boolean populateMessageSchema(MessageImpl msg, SendCallback callback) {
+ @VisibleForTesting
+ boolean populateMessageSchema(MessageImpl msg, SendCallback callback) {
MessageMetadata msgMetadataBuilder = msg.getMessageBuilder();
if (msg.getSchemaInternal() == schema) {
schemaVersion.ifPresent(v -> msgMetadataBuilder.setSchemaVersion(v));
msg.setSchemaState(MessageImpl.SchemaState.Ready);
return true;
}
+ // If the message is from the replicator and without replicated schema
+ // Which means the message is written with BYTES schema
+ // So we don't need to replicate schema to the remote cluster
+ if (msg.hasReplicateFrom() && msg.getSchemaInfoForReplicator() == null) {
+ msg.setSchemaState(MessageImpl.SchemaState.Ready);
+ return true;
+ }
+
if (!isMultiSchemaEnabled(true)) {
PulsarClientException.InvalidMessageException e = new PulsarClientException.InvalidMessageException(
format("The producer %s of the topic %s is disabled the `MultiSchema`", producerName, topic)
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java
index 8de554c7516..8ca7a599695 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java
@@ -18,11 +18,17 @@
*/
package org.apache.pulsar.client.impl;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.withSettings;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
import java.nio.ByteBuffer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.mockito.Mockito;
import org.testng.annotations.Test;
public class ProducerImplTest {
@@ -47,4 +53,17 @@ public class ProducerImplTest {
// check if the ctx is deallocated successfully.
assertNull(ctx.firstChunkMessageId);
}
+
+ @Test
+ public void testPopulateMessageSchema() {
+ MessageImpl<?> msg = mock(MessageImpl.class);
+ when(msg.hasReplicateFrom()).thenReturn(true);
+ when(msg.getSchemaInternal()).thenReturn(mock(Schema.class));
+ when(msg.getSchemaInfoForReplicator()).thenReturn(null);
+ ProducerImpl<?> producer = mock(ProducerImpl.class, withSettings()
+ .defaultAnswer(Mockito.CALLS_REAL_METHODS));
+ assertTrue(producer.populateMessageSchema(msg, null));
+ verify(msg).setSchemaState(MessageImpl.SchemaState.Ready);
+ }
+
}