You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/09/08 05:58:16 UTC
[pulsar] branch branch-2.10 updated: [fix][broker] Fix the replicator unnecessary get schema request for bytes schema (#17523)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new d1898467212 [fix][broker] Fix the replicator unnecessary get schema request for bytes schema (#17523)
d1898467212 is described below
commit d1898467212397fd76c581f495ab32ca0e27daac
Author: Penghui Li <pe...@apache.org>
AuthorDate: Thu Sep 8 13:37:01 2022 +0800
[fix][broker] Fix the replicator unnecessary get schema request for bytes schema (#17523)
(cherry picked from commit 2ed561436deea9639d177e8e43317c37ea44152d)
---
.../apache/pulsar/broker/service/ReplicatorTest.java | 13 +++++++++++++
.../org/apache/pulsar/client/impl/ProducerImpl.java | 11 ++++++++++-
.../apache/pulsar/client/impl/ProducerImplTest.java | 19 +++++++++++++++++++
3 files changed, 42 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 72d04d95397..34606871f50 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -467,6 +467,19 @@ public class ReplicatorTest extends ReplicatorTestBase {
consumer2.acknowledge(msg2);
consumer3.acknowledge(msg3);
}
+
+ @Cleanup
+ Producer<byte[]> producerBytes = client1.newProducer()
+ .topic(topic.toString())
+ .enableBatching(false)
+ .create();
+
+ byte[] data = "Bytes".getBytes();
+ producerBytes.send(data);
+
+ assertEquals(consumer1.receive().getValue().getNativeObject(), data);
+ assertEquals(consumer2.receive().getValue().getNativeObject(), data);
+ assertEquals(consumer3.receive().getValue().getNativeObject(), data);
}
@Test
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 6a509166e8a..486891bbd0e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -647,13 +647,22 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
}
}
- private boolean populateMessageSchema(MessageImpl msg, SendCallback callback) {
+ @VisibleForTesting
+ boolean populateMessageSchema(MessageImpl msg, SendCallback callback) {
MessageMetadata msgMetadataBuilder = msg.getMessageBuilder();
if (msg.getSchemaInternal() == schema) {
schemaVersion.ifPresent(v -> msgMetadataBuilder.setSchemaVersion(v));
msg.setSchemaState(MessageImpl.SchemaState.Ready);
return true;
}
+ // If the message is from the replicator and without replicated schema
+ // Which means the message is written with BYTES schema
+ // So we don't need to replicate schema to the remote cluster
+ if (msg.hasReplicateFrom() && msg.getSchemaInfoForReplicator() == null) {
+ msg.setSchemaState(MessageImpl.SchemaState.Ready);
+ return true;
+ }
+
if (!isMultiSchemaEnabled(true)) {
PulsarClientException.InvalidMessageException e = new PulsarClientException.InvalidMessageException(
format("The producer %s of the topic %s is disabled the `MultiSchema`", producerName, topic)
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java
index 6d2d427049d..2a67a96b668 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java
@@ -18,11 +18,17 @@
*/
package org.apache.pulsar.client.impl;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.withSettings;
+import static org.testng.Assert.assertTrue;
import java.nio.ByteBuffer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.Mockito;
public class ProducerImplTest {
@Test
@@ -46,4 +52,17 @@ public class ProducerImplTest {
// check if the ctx is deallocated successfully.
Assert.assertNull(ctx.firstChunkMessageId);
}
+
+ @Test
+ public void testPopulateMessageSchema() {
+ MessageImpl<?> msg = mock(MessageImpl.class);
+ when(msg.hasReplicateFrom()).thenReturn(true);
+ when(msg.getSchemaInternal()).thenReturn(mock(Schema.class));
+ when(msg.getSchemaInfoForReplicator()).thenReturn(null);
+ ProducerImpl<?> producer = mock(ProducerImpl.class, withSettings()
+ .defaultAnswer(Mockito.CALLS_REAL_METHODS));
+ assertTrue(producer.populateMessageSchema(msg, null));
+ verify(msg).setSchemaState(MessageImpl.SchemaState.Ready);
+ }
+
}