You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/09/08 06:37:11 UTC

[pulsar] 01/03: [fix][broker] Fix schema does not replicate successfully (#17049)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 960c8a81c3406405a79f148ae71e3dde1a12d555
Author: Penghui Li <pe...@apache.org>
AuthorDate: Thu Aug 18 11:21:14 2022 +0800

    [fix][broker] Fix schema does not replicate successfully (#17049)
    
    But there is a mistake that the returned schema state is incorrect.
    
    https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java#L765-L770
    
    Because the replicator used MessageImpl will not have the schema.
    And this will cause the producer to skip the schema upload.
    
    https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2147-L2149
    
    We should remove
    
    https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java#L766-L768
    
    To return the correct schema state.
    
    And then we should also provide the correct schema hash.
    If the message is used by the replicator, the schema hash should
    be based on the replicator schema. Otherwise, it should use based
    on the schema of the message.
    
    - Fixed the incorrect returned schema state
    - Provide the method for getting schema hash for MessageImpl
    
    (cherry picked from commit 7689133adfd930a50c2690ecca1f2068cafa8bcb)
---
 build/run_unit_group.sh                            |  2 +-
 .../pulsar/broker/service/ReplicatorTest.java      | 30 ++++++++++++----------
 .../org/apache/pulsar/client/impl/MessageImpl.java | 12 ++++++---
 .../apache/pulsar/client/impl/ProducerImpl.java    |  9 +++----
 .../client/impl/MultiTopicsConsumerImplTest.java   |  2 +-
 .../pulsar/common/protocol/schema/SchemaHash.java  |  7 ++++-
 6 files changed, 37 insertions(+), 25 deletions(-)

diff --git a/build/run_unit_group.sh b/build/run_unit_group.sh
index 2d99e84eff3..8bb84679fd9 100755
--- a/build/run_unit_group.sh
+++ b/build/run_unit_group.sh
@@ -117,7 +117,7 @@ function other() {
                                                 **/ManagedLedgerTest.java,
                                                 **/TestPulsarKeyValueSchemaHandler.java,
                                                 **/PrimitiveSchemaTest.java,
-                                                BlobStoreManagedLedgerOffloaderTest.java'
+                                                BlobStoreManagedLedgerOffloaderTest.java' -DtestReuseFork=false
 
   $MVN_TEST_COMMAND -pl managed-ledger -Dinclude='**/ManagedLedgerTest.java,
                                                   **/OffloadersCacheTest.java'
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 488fbfef023..e78539f985d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -69,6 +69,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -88,7 +89,6 @@ import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.ReplicatorStats;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.schema.Schemas;
@@ -390,19 +390,24 @@ public class ReplicatorTest extends ReplicatorTestBase {
         final String subName = "my-sub";
 
         @Cleanup
-        Producer<Schemas.PersonOne> producer1 = client1.newProducer(Schema.AVRO(Schemas.PersonOne.class))
-                .topic(topic.toString())
-                .create();
-        @Cleanup
-        Producer<Schemas.PersonOne> producer2 = client2.newProducer(Schema.AVRO(Schemas.PersonOne.class))
-                .topic(topic.toString())
-                .create();
-        @Cleanup
-        Producer<Schemas.PersonOne> producer3 = client3.newProducer(Schema.AVRO(Schemas.PersonOne.class))
+        Producer<Schemas.PersonOne> producer = client1.newProducer(Schema.AVRO(Schemas.PersonOne.class))
                 .topic(topic.toString())
                 .create();
 
-        List<Producer<Schemas.PersonOne>> producers = Lists.newArrayList(producer1, producer2, producer3);
+        admin1.topics().createSubscription(topic.toString(), subName, MessageId.earliest);
+        admin2.topics().createSubscription(topic.toString(), subName, MessageId.earliest);
+        admin3.topics().createSubscription(topic.toString(), subName, MessageId.earliest);
+
+
+        for (int i = 0; i < 10; i++) {
+            producer.send(new Schemas.PersonOne(i));
+        }
+
+        Awaitility.await().untilAsserted(() -> {
+            assertTrue(admin1.topics().getInternalStats(topic.toString()).schemaLedgers.size() > 0);
+            assertTrue(admin2.topics().getInternalStats(topic.toString()).schemaLedgers.size() > 0);
+            assertTrue(admin3.topics().getInternalStats(topic.toString()).schemaLedgers.size() > 0);
+        });
 
         @Cleanup
         Consumer<Schemas.PersonOne> consumer1 = client1.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
@@ -422,8 +427,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
                 .subscriptionName(subName)
                 .subscribe();
 
-        for (int i = 0; i < 3; i++) {
-            producers.get(i).send(new Schemas.PersonOne(i));
+        for (int i = 0; i < 10; i++) {
             Message<Schemas.PersonOne> msg1 = consumer1.receive();
             Message<Schemas.PersonOne> msg2 = consumer2.receive();
             Message<Schemas.PersonOne> msg3 = consumer3.receive();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index fd00475de71..acdf73bed3f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -54,6 +54,7 @@ import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
+import org.apache.pulsar.common.protocol.schema.SchemaHash;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -66,6 +67,8 @@ public class MessageImpl<T> implements Message<T> {
     private ByteBuf payload;
 
     private Schema<T> schema;
+
+    private SchemaHash schemaHash;
     private SchemaInfo schemaInfoForReplicator;
     private SchemaState schemaState = SchemaState.None;
     private Optional<EncryptionContext> encryptionCtx = Optional.empty();
@@ -92,6 +95,7 @@ public class MessageImpl<T> implements Message<T> {
         msg.payload = Unpooled.wrappedBuffer(payload);
         msg.properties = null;
         msg.schema = schema;
+        msg.schemaHash = SchemaHash.of(schema);
         msg.uncompressedSize = payload.remaining();
         return msg;
     }
@@ -439,9 +443,14 @@ public class MessageImpl<T> implements Message<T> {
         return schema.getSchemaInfo();
     }
 
+    public SchemaHash getSchemaHash() {
+        return schemaHash == null ? SchemaHash.of(new byte[0], null) : schemaHash;
+    }
+
     public void setSchemaInfoForReplicator(SchemaInfo schemaInfo) {
         if (msgMetadata.hasReplicatedFrom()) {
             this.schemaInfoForReplicator = schemaInfo;
+            this.schemaHash = SchemaHash.of(schemaInfo);
         } else {
             throw new IllegalArgumentException("Only allowed to set schemaInfoForReplicator for a replicated message.");
         }
@@ -761,9 +770,6 @@ public class MessageImpl<T> implements Message<T> {
     }
 
     SchemaState getSchemaState() {
-        if (getSchemaInfo() == null) {
-            return SchemaState.Ready;
-        }
         return schemaState;
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 0a0ba549650..711f8e38dd3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -637,8 +637,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
             completeCallbackAndReleaseSemaphore(msg.getUncompressedSize(), callback, e);
             return false;
         }
-        SchemaHash schemaHash = SchemaHash.of(msg.getSchemaInternal());
-        byte[] schemaVersion = schemaCache.get(schemaHash);
+        byte[] schemaVersion = schemaCache.get(msg.getSchemaHash());
         if (schemaVersion != null) {
             msgMetadataBuilder.setSchemaVersion(schemaVersion);
             msg.setSchemaState(MessageImpl.SchemaState.Ready);
@@ -647,8 +646,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
     }
 
     private boolean rePopulateMessageSchema(MessageImpl msg) {
-        SchemaHash schemaHash = SchemaHash.of(msg.getSchemaInternal());
-        byte[] schemaVersion = schemaCache.get(schemaHash);
+        byte[] schemaVersion = schemaCache.get(msg.getSchemaHash());
         if (schemaVersion == null) {
             return false;
         }
@@ -679,8 +677,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                 // case, we should not cache the schema version so that the schema version of the message metadata will
                 // be null, instead of an empty array.
                 if (v.length != 0) {
-                    SchemaHash schemaHash = SchemaHash.of(msg.getSchemaInternal());
-                    schemaCache.putIfAbsent(schemaHash, v);
+                    schemaCache.putIfAbsent(msg.getSchemaHash(), v);
                     msg.getMessageBuilder().setSchemaVersion(v);
                 }
                 msg.setSchemaState(MessageImpl.SchemaState.Ready);
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
index faa621cae21..38e66807228 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
@@ -124,7 +124,7 @@ public class MultiTopicsConsumerImplTest {
     //
     // Code under tests is using CompletableFutures. Theses may hang indefinitely if code is broken.
     // That's why a test timeout is defined.
-    @Test(timeOut = 5000)
+    @Test(timeOut = 10000)
     public void testParallelSubscribeAsync() throws Exception {
         String topicName = "parallel-subscribe-async-topic";
         MultiTopicsConsumerImpl<byte[]> impl = createMultiTopicsConsumer();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaHash.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaHash.java
index 40220e6047a..8bbc18fbb70 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaHash.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaHash.java
@@ -54,7 +54,12 @@ public class SchemaHash {
         return of(schemaData.getData(), schemaData.getType());
     }
 
-    private static SchemaHash of(byte[] schemaBytes, SchemaType schemaType) {
+    public static SchemaHash of(SchemaInfo schemaInfo) {
+        return of(schemaInfo == null ? new byte[0] : schemaInfo.getSchema(),
+                schemaInfo == null ? null : schemaInfo.getType());
+    }
+
+    public static SchemaHash of(byte[] schemaBytes, SchemaType schemaType) {
         return new SchemaHash(hashFunction.hashBytes(schemaBytes), schemaType);
     }