You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/09/08 06:37:10 UTC

[pulsar] branch branch-2.9 updated (9fd542efc8f -> 22eb42f728a)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from 9fd542efc8f Bump dependency check and spring version to avoid potential FP (#15408)
     new 960c8a81c34 [fix][broker] Fix schema does not replicate successfully (#17049)
     new 8d293a2b5fc [fix][broker] Fix out of order data replication (#17154)
     new 22eb42f728a [fix][broker] Fix the replicator unnecessary get schema request for bytes schema (#17523)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 build/run_unit_group.sh                            |  2 +-
 .../service/persistent/PersistentReplicator.java   | 50 +++++++++++---
 .../pulsar/broker/service/ReplicatorTest.java      | 78 +++++++++++++++++-----
 .../org/apache/pulsar/client/impl/MessageImpl.java | 12 +++-
 .../apache/pulsar/client/impl/ProducerImpl.java    | 20 ++++--
 .../client/impl/MultiTopicsConsumerImplTest.java   |  2 +-
 .../pulsar/client/impl/ProducerImplTest.java       | 38 +++++------
 .../pulsar/common/protocol/schema/SchemaHash.java  |  7 +-
 8 files changed, 149 insertions(+), 60 deletions(-)
 copy pulsar-broker-common/src/test/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServletWithClassLoaderTest.java => pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java (55%)


[pulsar] 02/03: [fix][broker] Fix out of order data replication (#17154)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8d293a2b5fcfe44c78fc40022ab40e11387e2fd4
Author: Penghui Li <pe...@apache.org>
AuthorDate: Fri Aug 19 09:17:48 2022 +0800

    [fix][broker] Fix out of order data replication (#17154)
    
    * [fix][broker] Fix out of order data replication
    
    The schema replication will break the data replication order while fetching
    the schema from the local cluster.
    
    https://github.com/apache/pulsar/blob/8a6ecd7d4c9399bb7ce5a224ca854e4a71db79b1/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java#L366-L369
    
    The method getSchemaInfo() is an async method that will reverse the order in which messages are written.
    
    Added a new state for replicator `fetchSchemaInProgress` which means the
    replicator had detected a new schema that needed to fetch the schema info
    from the local cluster. During the schema fetching, the replicator should
    pause the data replicator and resume after the schema has been fetched.
    
    (cherry picked from commit 39c1ee1aebc06c85b7dcb203b8cfa16fe035ae27)
---
 .../service/persistent/PersistentReplicator.java   | 50 +++++++++++++++++----
 .../pulsar/broker/service/ReplicatorTest.java      | 51 +++++++++++++++++-----
 2 files changed, 80 insertions(+), 21 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 1db685ccbec..c10b70df6a8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -104,6 +104,8 @@ public class PersistentReplicator extends AbstractReplicator
 
     private final ReplicatorStatsImpl stats = new ReplicatorStatsImpl();
 
+    private volatile boolean fetchSchemaInProgress = false;
+
     public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String localCluster, String remoteCluster,
                                 BrokerService brokerService, PulsarClientImpl replicationClient)
             throws PulsarServerException {
@@ -219,6 +221,11 @@ public class PersistentReplicator extends AbstractReplicator
     }
 
     protected void readMoreEntries() {
+        if (fetchSchemaInProgress) {
+            log.info("[{}][{} -> {}] Skip the reading due to new detected schema",
+                    topicName, localCluster, remoteCluster);
+            return;
+        }
         int availablePermits = getAvailablePermits();
 
         if (availablePermits > 0) {
@@ -289,8 +296,15 @@ public class PersistentReplicator extends AbstractReplicator
             // This flag is set to true when we skip atleast one local message,
             // in order to skip remaining local messages.
             boolean isLocalMessageSkippedOnce = false;
+            boolean skipRemainingMessages = false;
             for (int i = 0; i < entries.size(); i++) {
                 Entry entry = entries.get(i);
+                // Skip the messages since the replicator need to fetch the schema info to replicate the schema to the
+                // remote cluster. Rewind the cursor first and continue the message read after fetched the schema.
+                if (skipRemainingMessages) {
+                    entry.release();
+                    continue;
+                }
                 int length = entry.getLength();
                 ByteBuf headersAndPayload = entry.getDataBuffer();
                 MessageImpl msg;
@@ -363,16 +377,34 @@ public class PersistentReplicator extends AbstractReplicator
 
                 headersAndPayload.retain();
 
-                getSchemaInfo(msg).thenAccept(schemaInfo -> {
-                    msg.setSchemaInfoForReplicator(schemaInfo);
+                CompletableFuture<SchemaInfo> schemaFuture = getSchemaInfo(msg);
+                if (!schemaFuture.isDone() || schemaFuture.isCompletedExceptionally()) {
+                    entry.release();
+                    headersAndPayload.release();
+                    msg.recycle();
+                    // Mark the replicator is fetching the schema for now and rewind the cursor
+                    // and trigger the next read after complete the schema fetching.
+                    fetchSchemaInProgress = true;
+                    skipRemainingMessages = true;
+                    cursor.cancelPendingReadRequest();
+                    log.info("[{}][{} -> {}] Pause the data replication due to new detected schema", topicName,
+                            localCluster, remoteCluster);
+                    schemaFuture.whenComplete((__, e) -> {
+                       if (e != null) {
+                           log.warn("[{}][{} -> {}] Failed to get schema from local cluster, will try in the next loop",
+                                   topicName, localCluster, remoteCluster, e);
+                       }
+                       log.info("[{}][{} -> {}] Resume the data replication after the schema fetching done", topicName,
+                               localCluster, remoteCluster);
+                       cursor.rewind();
+                       fetchSchemaInProgress = false;
+                       readMoreEntries();
+                    });
+                } else {
+                    msg.setSchemaInfoForReplicator(schemaFuture.get());
                     producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg));
-                }).exceptionally(ex -> {
-                    log.error("[{}][{} -> {}] Failed to get schema from local cluster", topicName,
-                            localCluster, remoteCluster, ex);
-                    return null;
-                });
-
-                atLeastOneMessageSentForReplication = true;
+                    atLeastOneMessageSentForReplication = true;
+                }
             }
         } catch (Exception e) {
             log.error("[{}][{} -> {}] Unexpected exception: {}", topicName, localCluster, remoteCluster, e.getMessage(),
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index e78539f985d..9cb6ec45487 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -78,6 +78,8 @@ import org.apache.pulsar.client.api.RawReader;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
@@ -379,8 +381,11 @@ public class ReplicatorTest extends ReplicatorTestBase {
         consumer3.receive(1);
     }
 
-    @Test
+    @Test(invocationCount = 5)
     public void testReplicationWithSchema() throws Exception {
+        config1.setBrokerDeduplicationEnabled(true);
+        config2.setBrokerDeduplicationEnabled(true);
+        config3.setBrokerDeduplicationEnabled(true);
         PulsarClient client1 = pulsar1.getClient();
         PulsarClient client2 = pulsar2.getClient();
         PulsarClient client3 = pulsar3.getClient();
@@ -390,17 +395,29 @@ public class ReplicatorTest extends ReplicatorTestBase {
         final String subName = "my-sub";
 
         @Cleanup
-        Producer<Schemas.PersonOne> producer = client1.newProducer(Schema.AVRO(Schemas.PersonOne.class))
+        Producer<Schemas.PersonOne> producer1 = client1.newProducer(Schema.AVRO(Schemas.PersonOne.class))
+                .topic(topic.toString())
+                .enableBatching(false)
+                .create();
+
+        @Cleanup
+        Producer<Schemas.PersonThree> producer2 = client1.newProducer(Schema.AVRO(Schemas.PersonThree.class))
                 .topic(topic.toString())
+                .enableBatching(false)
                 .create();
 
         admin1.topics().createSubscription(topic.toString(), subName, MessageId.earliest);
         admin2.topics().createSubscription(topic.toString(), subName, MessageId.earliest);
         admin3.topics().createSubscription(topic.toString(), subName, MessageId.earliest);
 
+        final int totalMessages = 1000;
 
-        for (int i = 0; i < 10; i++) {
-            producer.send(new Schemas.PersonOne(i));
+        for (int i = 0; i < totalMessages / 2; i++) {
+            producer1.sendAsync(new Schemas.PersonOne(i));
+        }
+
+        for (int i = 500; i < totalMessages; i++) {
+            producer2.sendAsync(new Schemas.PersonThree(i, "name-" + i));
         }
 
         Awaitility.await().untilAsserted(() -> {
@@ -410,29 +427,39 @@ public class ReplicatorTest extends ReplicatorTestBase {
         });
 
         @Cleanup
-        Consumer<Schemas.PersonOne> consumer1 = client1.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
+        Consumer<GenericRecord> consumer1 = client1.newConsumer(Schema.AUTO_CONSUME())
                 .topic(topic.toString())
                 .subscriptionName(subName)
                 .subscribe();
 
         @Cleanup
-        Consumer<Schemas.PersonOne> consumer2 = client2.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
+        Consumer<GenericRecord> consumer2 = client2.newConsumer(Schema.AUTO_CONSUME())
                 .topic(topic.toString())
                 .subscriptionName(subName)
                 .subscribe();
 
         @Cleanup
-        Consumer<Schemas.PersonOne> consumer3 = client3.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
+        Consumer<GenericRecord> consumer3 = client3.newConsumer(Schema.AUTO_CONSUME())
                 .topic(topic.toString())
                 .subscriptionName(subName)
                 .subscribe();
 
-        for (int i = 0; i < 10; i++) {
-            Message<Schemas.PersonOne> msg1 = consumer1.receive();
-            Message<Schemas.PersonOne> msg2 = consumer2.receive();
-            Message<Schemas.PersonOne> msg3 = consumer3.receive();
+        int lastId = -1;
+        for (int i = 0; i < totalMessages; i++) {
+            Message<GenericRecord> msg1 = consumer1.receive();
+            Message<GenericRecord> msg2 = consumer2.receive();
+            Message<GenericRecord> msg3 = consumer3.receive();
             assertTrue(msg1 != null && msg2 != null && msg3 != null);
-            assertTrue(msg1.getValue().equals(msg2.getValue()) && msg2.getValue().equals(msg3.getValue()));
+            GenericRecord record1 = msg1.getValue();
+            GenericRecord record2 = msg2.getValue();
+            GenericRecord record3 = msg3.getValue();
+            int id1 = (int) record1.getField("id");
+            int id2 = (int) record2.getField("id");
+            int id3 = (int) record3.getField("id");
+            log.info("Received ids, id1: {}, id2: {}, id3: {}, lastId: {}", id1, id2, id3, lastId);
+            assertTrue(id1 == id2 && id2 == id3);
+            assertTrue(id1 > lastId);
+            lastId = id1;
             consumer1.acknowledge(msg1);
             consumer2.acknowledge(msg2);
             consumer3.acknowledge(msg3);


[pulsar] 03/03: [fix][broker] Fix the replicator unnecessary get schema request for bytes schema (#17523)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 22eb42f728aee529be0c71198ce95f6b91a04718
Author: Penghui Li <pe...@apache.org>
AuthorDate: Thu Sep 8 13:37:01 2022 +0800

    [fix][broker] Fix the replicator unnecessary get schema request for bytes schema (#17523)
    
    (cherry picked from commit 2ed561436deea9639d177e8e43317c37ea44152d)
---
 .../pulsar/broker/service/ReplicatorTest.java      | 13 +++++++
 .../apache/pulsar/client/impl/ProducerImpl.java    | 11 +++++-
 .../pulsar/client/impl/ProducerImplTest.java       | 43 ++++++++++++++++++++++
 3 files changed, 66 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 9cb6ec45487..d9ac04c3fc2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -464,6 +464,19 @@ public class ReplicatorTest extends ReplicatorTestBase {
             consumer2.acknowledge(msg2);
             consumer3.acknowledge(msg3);
         }
+
+        @Cleanup
+        Producer<byte[]> producerBytes = client1.newProducer()
+                .topic(topic.toString())
+                .enableBatching(false)
+                .create();
+
+        byte[] data = "Bytes".getBytes();
+        producerBytes.send(data);
+
+        assertEquals(consumer1.receive().getValue().getNativeObject(), data);
+        assertEquals(consumer2.receive().getValue().getNativeObject(), data);
+        assertEquals(consumer3.receive().getValue().getNativeObject(), data);
     }
 
     @Test
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 711f8e38dd3..9c77d8b4ae6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -623,13 +623,22 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         }
     }
 
-    private boolean populateMessageSchema(MessageImpl msg, SendCallback callback) {
+    @VisibleForTesting
+    boolean populateMessageSchema(MessageImpl msg, SendCallback callback) {
         MessageMetadata msgMetadataBuilder = msg.getMessageBuilder();
         if (msg.getSchemaInternal() == schema) {
             schemaVersion.ifPresent(v -> msgMetadataBuilder.setSchemaVersion(v));
             msg.setSchemaState(MessageImpl.SchemaState.Ready);
             return true;
         }
+        // If the message is from the replicator and without replicated schema
+        // Which means the message is written with BYTES schema
+        // So we don't need to replicate schema to the remote cluster
+        if (msg.hasReplicateFrom() && msg.getSchemaInfoForReplicator() == null) {
+            msg.setSchemaState(MessageImpl.SchemaState.Ready);
+            return true;
+        }
+
         if (!isMultiSchemaEnabled(true)) {
             PulsarClientException.InvalidMessageException e = new PulsarClientException.InvalidMessageException(
                     format("The producer %s of the topic %s is disabled the `MultiSchema`", producerName, topic)
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java
new file mode 100644
index 00000000000..4db3cd0843b
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.withSettings;
+import static org.testng.Assert.assertTrue;
+import org.apache.pulsar.client.api.Schema;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+public class ProducerImplTest {
+    @Test
+    public void testPopulateMessageSchema() {
+        MessageImpl<?> msg = mock(MessageImpl.class);
+        when(msg.hasReplicateFrom()).thenReturn(true);
+        when(msg.getSchemaInternal()).thenReturn(mock(Schema.class));
+        when(msg.getSchemaInfoForReplicator()).thenReturn(null);
+        ProducerImpl<?> producer = mock(ProducerImpl.class, withSettings()
+                .defaultAnswer(Mockito.CALLS_REAL_METHODS));
+        assertTrue(producer.populateMessageSchema(msg, null));
+        verify(msg).setSchemaState(MessageImpl.SchemaState.Ready);
+    }
+
+}


[pulsar] 01/03: [fix][broker] Fix schema does not replicate successfully (#17049)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 960c8a81c3406405a79f148ae71e3dde1a12d555
Author: Penghui Li <pe...@apache.org>
AuthorDate: Thu Aug 18 11:21:14 2022 +0800

    [fix][broker] Fix schema does not replicate successfully (#17049)
    
    But there is a mistake that the returned schema state is incorrect.
    
    https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java#L765-L770
    
    Because the replicator used MessageImpl will not have the schema.
    And this will cause the producer to skip the schema upload.
    
    https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2147-L2149
    
    We should remove
    
    https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java#L766-L768
    
    To return the correct schema state.
    
    And then we should also provide the correct schema hash.
    If the message is used by the replicator, the schema hash should
    be based on the replicator schema. Otherwise, it should use based
    on the schema of the message.
    
    - Fixed the incorrect returned schema state
    - Provide the method for getting schema hash for MessageImpl
    
    (cherry picked from commit 7689133adfd930a50c2690ecca1f2068cafa8bcb)
---
 build/run_unit_group.sh                            |  2 +-
 .../pulsar/broker/service/ReplicatorTest.java      | 30 ++++++++++++----------
 .../org/apache/pulsar/client/impl/MessageImpl.java | 12 ++++++---
 .../apache/pulsar/client/impl/ProducerImpl.java    |  9 +++----
 .../client/impl/MultiTopicsConsumerImplTest.java   |  2 +-
 .../pulsar/common/protocol/schema/SchemaHash.java  |  7 ++++-
 6 files changed, 37 insertions(+), 25 deletions(-)

diff --git a/build/run_unit_group.sh b/build/run_unit_group.sh
index 2d99e84eff3..8bb84679fd9 100755
--- a/build/run_unit_group.sh
+++ b/build/run_unit_group.sh
@@ -117,7 +117,7 @@ function other() {
                                                 **/ManagedLedgerTest.java,
                                                 **/TestPulsarKeyValueSchemaHandler.java,
                                                 **/PrimitiveSchemaTest.java,
-                                                BlobStoreManagedLedgerOffloaderTest.java'
+                                                BlobStoreManagedLedgerOffloaderTest.java' -DtestReuseFork=false
 
   $MVN_TEST_COMMAND -pl managed-ledger -Dinclude='**/ManagedLedgerTest.java,
                                                   **/OffloadersCacheTest.java'
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 488fbfef023..e78539f985d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -69,6 +69,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -88,7 +89,6 @@ import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.ReplicatorStats;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.schema.Schemas;
@@ -390,19 +390,24 @@ public class ReplicatorTest extends ReplicatorTestBase {
         final String subName = "my-sub";
 
         @Cleanup
-        Producer<Schemas.PersonOne> producer1 = client1.newProducer(Schema.AVRO(Schemas.PersonOne.class))
-                .topic(topic.toString())
-                .create();
-        @Cleanup
-        Producer<Schemas.PersonOne> producer2 = client2.newProducer(Schema.AVRO(Schemas.PersonOne.class))
-                .topic(topic.toString())
-                .create();
-        @Cleanup
-        Producer<Schemas.PersonOne> producer3 = client3.newProducer(Schema.AVRO(Schemas.PersonOne.class))
+        Producer<Schemas.PersonOne> producer = client1.newProducer(Schema.AVRO(Schemas.PersonOne.class))
                 .topic(topic.toString())
                 .create();
 
-        List<Producer<Schemas.PersonOne>> producers = Lists.newArrayList(producer1, producer2, producer3);
+        admin1.topics().createSubscription(topic.toString(), subName, MessageId.earliest);
+        admin2.topics().createSubscription(topic.toString(), subName, MessageId.earliest);
+        admin3.topics().createSubscription(topic.toString(), subName, MessageId.earliest);
+
+
+        for (int i = 0; i < 10; i++) {
+            producer.send(new Schemas.PersonOne(i));
+        }
+
+        Awaitility.await().untilAsserted(() -> {
+            assertTrue(admin1.topics().getInternalStats(topic.toString()).schemaLedgers.size() > 0);
+            assertTrue(admin2.topics().getInternalStats(topic.toString()).schemaLedgers.size() > 0);
+            assertTrue(admin3.topics().getInternalStats(topic.toString()).schemaLedgers.size() > 0);
+        });
 
         @Cleanup
         Consumer<Schemas.PersonOne> consumer1 = client1.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
@@ -422,8 +427,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
                 .subscriptionName(subName)
                 .subscribe();
 
-        for (int i = 0; i < 3; i++) {
-            producers.get(i).send(new Schemas.PersonOne(i));
+        for (int i = 0; i < 10; i++) {
             Message<Schemas.PersonOne> msg1 = consumer1.receive();
             Message<Schemas.PersonOne> msg2 = consumer2.receive();
             Message<Schemas.PersonOne> msg3 = consumer3.receive();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index fd00475de71..acdf73bed3f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -54,6 +54,7 @@ import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
+import org.apache.pulsar.common.protocol.schema.SchemaHash;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -66,6 +67,8 @@ public class MessageImpl<T> implements Message<T> {
     private ByteBuf payload;
 
     private Schema<T> schema;
+
+    private SchemaHash schemaHash;
     private SchemaInfo schemaInfoForReplicator;
     private SchemaState schemaState = SchemaState.None;
     private Optional<EncryptionContext> encryptionCtx = Optional.empty();
@@ -92,6 +95,7 @@ public class MessageImpl<T> implements Message<T> {
         msg.payload = Unpooled.wrappedBuffer(payload);
         msg.properties = null;
         msg.schema = schema;
+        msg.schemaHash = SchemaHash.of(schema);
         msg.uncompressedSize = payload.remaining();
         return msg;
     }
@@ -439,9 +443,14 @@ public class MessageImpl<T> implements Message<T> {
         return schema.getSchemaInfo();
     }
 
+    public SchemaHash getSchemaHash() {
+        return schemaHash == null ? SchemaHash.of(new byte[0], null) : schemaHash;
+    }
+
     public void setSchemaInfoForReplicator(SchemaInfo schemaInfo) {
         if (msgMetadata.hasReplicatedFrom()) {
             this.schemaInfoForReplicator = schemaInfo;
+            this.schemaHash = SchemaHash.of(schemaInfo);
         } else {
             throw new IllegalArgumentException("Only allowed to set schemaInfoForReplicator for a replicated message.");
         }
@@ -761,9 +770,6 @@ public class MessageImpl<T> implements Message<T> {
     }
 
     SchemaState getSchemaState() {
-        if (getSchemaInfo() == null) {
-            return SchemaState.Ready;
-        }
         return schemaState;
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 0a0ba549650..711f8e38dd3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -637,8 +637,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
             completeCallbackAndReleaseSemaphore(msg.getUncompressedSize(), callback, e);
             return false;
         }
-        SchemaHash schemaHash = SchemaHash.of(msg.getSchemaInternal());
-        byte[] schemaVersion = schemaCache.get(schemaHash);
+        byte[] schemaVersion = schemaCache.get(msg.getSchemaHash());
         if (schemaVersion != null) {
             msgMetadataBuilder.setSchemaVersion(schemaVersion);
             msg.setSchemaState(MessageImpl.SchemaState.Ready);
@@ -647,8 +646,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
     }
 
     private boolean rePopulateMessageSchema(MessageImpl msg) {
-        SchemaHash schemaHash = SchemaHash.of(msg.getSchemaInternal());
-        byte[] schemaVersion = schemaCache.get(schemaHash);
+        byte[] schemaVersion = schemaCache.get(msg.getSchemaHash());
         if (schemaVersion == null) {
             return false;
         }
@@ -679,8 +677,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                 // case, we should not cache the schema version so that the schema version of the message metadata will
                 // be null, instead of an empty array.
                 if (v.length != 0) {
-                    SchemaHash schemaHash = SchemaHash.of(msg.getSchemaInternal());
-                    schemaCache.putIfAbsent(schemaHash, v);
+                    schemaCache.putIfAbsent(msg.getSchemaHash(), v);
                     msg.getMessageBuilder().setSchemaVersion(v);
                 }
                 msg.setSchemaState(MessageImpl.SchemaState.Ready);
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
index faa621cae21..38e66807228 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
@@ -124,7 +124,7 @@ public class MultiTopicsConsumerImplTest {
     //
     // Code under tests is using CompletableFutures. Theses may hang indefinitely if code is broken.
     // That's why a test timeout is defined.
-    @Test(timeOut = 5000)
+    @Test(timeOut = 10000)
     public void testParallelSubscribeAsync() throws Exception {
         String topicName = "parallel-subscribe-async-topic";
         MultiTopicsConsumerImpl<byte[]> impl = createMultiTopicsConsumer();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaHash.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaHash.java
index 40220e6047a..8bbc18fbb70 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaHash.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaHash.java
@@ -54,7 +54,12 @@ public class SchemaHash {
         return of(schemaData.getData(), schemaData.getType());
     }
 
-    private static SchemaHash of(byte[] schemaBytes, SchemaType schemaType) {
+    public static SchemaHash of(SchemaInfo schemaInfo) {
+        return of(schemaInfo == null ? new byte[0] : schemaInfo.getSchema(),
+                schemaInfo == null ? null : schemaInfo.getType());
+    }
+
+    public static SchemaHash of(byte[] schemaBytes, SchemaType schemaType) {
         return new SchemaHash(hashFunction.hashBytes(schemaBytes), schemaType);
     }