You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/09/08 05:37:58 UTC

[pulsar] branch branch-2.11 updated: [fix][broker] Fix the replicator unnecessary get schema request for bytes schema (#17523)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 345417f284e [fix][broker] Fix the replicator unnecessary get schema request for bytes schema (#17523)
345417f284e is described below

commit 345417f284e31ded29b25d93c4a4921b3058c1e3
Author: Penghui Li <pe...@apache.org>
AuthorDate: Thu Sep 8 13:37:01 2022 +0800

    [fix][broker] Fix the replicator unnecessary get schema request for bytes schema (#17523)
    
    (cherry picked from commit 2ed561436deea9639d177e8e43317c37ea44152d)
---
 .../apache/pulsar/broker/service/ReplicatorTest.java  | 13 +++++++++++++
 .../org/apache/pulsar/client/impl/ProducerImpl.java   | 11 ++++++++++-
 .../apache/pulsar/client/impl/ProducerImplTest.java   | 19 +++++++++++++++++++
 3 files changed, 42 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 3e3ef98df65..9b87b06012e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -468,6 +468,19 @@ public class ReplicatorTest extends ReplicatorTestBase {
             consumer2.acknowledge(msg2);
             consumer3.acknowledge(msg3);
         }
+
+        @Cleanup
+        Producer<byte[]> producerBytes = client1.newProducer()
+                .topic(topic.toString())
+                .enableBatching(false)
+                .create();
+
+        byte[] data = "Bytes".getBytes();
+        producerBytes.send(data);
+
+        assertEquals(consumer1.receive().getValue().getNativeObject(), data);
+        assertEquals(consumer2.receive().getValue().getNativeObject(), data);
+        assertEquals(consumer3.receive().getValue().getNativeObject(), data);
     }
 
     @Test
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 792c8596d1b..be411cfa4bb 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -702,13 +702,22 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         }
     }
 
-    private boolean populateMessageSchema(MessageImpl msg, SendCallback callback) {
+    @VisibleForTesting
+    boolean populateMessageSchema(MessageImpl msg, SendCallback callback) {
         MessageMetadata msgMetadataBuilder = msg.getMessageBuilder();
         if (msg.getSchemaInternal() == schema) {
             schemaVersion.ifPresent(v -> msgMetadataBuilder.setSchemaVersion(v));
             msg.setSchemaState(MessageImpl.SchemaState.Ready);
             return true;
         }
+        // If the message is from the replicator and without replicated schema
+        // Which means the message is written with BYTES schema
+        // So we don't need to replicate schema to the remote cluster
+        if (msg.hasReplicateFrom() && msg.getSchemaInfoForReplicator() == null) {
+            msg.setSchemaState(MessageImpl.SchemaState.Ready);
+            return true;
+        }
+
         if (!isMultiSchemaEnabled(true)) {
             PulsarClientException.InvalidMessageException e = new PulsarClientException.InvalidMessageException(
                     format("The producer %s of the topic %s is disabled the `MultiSchema`", producerName, topic)
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java
index 8de554c7516..8ca7a599695 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java
@@ -18,11 +18,17 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.withSettings;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
 import java.nio.ByteBuffer;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
 public class ProducerImplTest {
@@ -47,4 +53,17 @@ public class ProducerImplTest {
         // check if the ctx is deallocated successfully.
         assertNull(ctx.firstChunkMessageId);
     }
+
+    @Test
+    public void testPopulateMessageSchema() {
+        MessageImpl<?> msg = mock(MessageImpl.class);
+        when(msg.hasReplicateFrom()).thenReturn(true);
+        when(msg.getSchemaInternal()).thenReturn(mock(Schema.class));
+        when(msg.getSchemaInfoForReplicator()).thenReturn(null);
+        ProducerImpl<?> producer = mock(ProducerImpl.class, withSettings()
+                .defaultAnswer(Mockito.CALLS_REAL_METHODS));
+        assertTrue(producer.populateMessageSchema(msg, null));
+        verify(msg).setSchemaState(MessageImpl.SchemaState.Ready);
+    }
+
 }