You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/18 11:37:03 UTC

[pulsar] branch branch-2.11 updated: [fix][broker] Fix schema does not replicate successfully (#17049)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 511a5cc4a50 [fix][broker] Fix schema does not replicate successfully (#17049)
511a5cc4a50 is described below

commit 511a5cc4a50ffc7011f23eb95372582c37dd14ea
Author: Penghui Li <pe...@apache.org>
AuthorDate: Thu Aug 18 11:21:14 2022 +0800

    [fix][broker] Fix schema does not replicate successfully (#17049)
    
    ### Motivation
    
    #11441 supports replicate schema to remote clusters.
    But there is a mistake that the returned schema state is incorrect.
    
    https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java#L765-L770
    
    Because the replicator used MessageImpl will not have the schema.
    And this will cause the producer to skip the schema upload.
    
    https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2147-L2149
    
    We should remove
    
    https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java#L766-L768
    
    To return the correct schema state.
    
    And then we should also provide the correct schema hash.
    If the message is used by the replicator, the schema hash should
    be based on the replicator schema. Otherwise, it should use based
    on the schema of the message.
    
    ### Modification
    
    - Fixed the incorrect returned schema state
    - Provide the method for getting schema hash for MessageImpl
---
 build/run_unit_group.sh                            |  2 +-
 .../pulsar/broker/service/ReplicatorTest.java      | 57 ++++++++++++----------
 .../org/apache/pulsar/client/impl/MessageImpl.java | 12 +++--
 .../apache/pulsar/client/impl/ProducerImpl.java    |  9 ++--
 .../client/impl/MultiTopicsConsumerImplTest.java   |  2 +-
 .../pulsar/common/protocol/schema/SchemaHash.java  |  7 ++-
 6 files changed, 52 insertions(+), 37 deletions(-)

diff --git a/build/run_unit_group.sh b/build/run_unit_group.sh
index 65b2a5baec7..f011fe6bfc8 100755
--- a/build/run_unit_group.sh
+++ b/build/run_unit_group.sh
@@ -134,7 +134,7 @@ function test_group_other() {
            -Dexclude='**/ManagedLedgerTest.java,
                    **/OffloadersCacheTest.java
                   **/PrimitiveSchemaTest.java,
-                  BlobStoreManagedLedgerOffloaderTest.java'
+                  BlobStoreManagedLedgerOffloaderTest.java' -DtestReuseFork=false
 
   mvn_test -pl managed-ledger -Dinclude='**/ManagedLedgerTest.java,
                                                   **/OffloadersCacheTest.java'
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index c8c257992cf..be232760b0b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -62,6 +62,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.State;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
@@ -70,6 +71,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -78,6 +80,7 @@ import org.apache.pulsar.client.api.RawReader;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
@@ -392,19 +395,24 @@ public class ReplicatorTest extends ReplicatorTestBase {
         final String subName = "my-sub";
 
         @Cleanup
-        Producer<Schemas.PersonOne> producer1 = client1.newProducer(Schema.AVRO(Schemas.PersonOne.class))
-                .topic(topic.toString())
-                .create();
-        @Cleanup
-        Producer<Schemas.PersonOne> producer2 = client2.newProducer(Schema.AVRO(Schemas.PersonOne.class))
-                .topic(topic.toString())
-                .create();
-        @Cleanup
-        Producer<Schemas.PersonOne> producer3 = client3.newProducer(Schema.AVRO(Schemas.PersonOne.class))
+        Producer<Schemas.PersonOne> producer = client1.newProducer(Schema.AVRO(Schemas.PersonOne.class))
                 .topic(topic.toString())
                 .create();
 
-        List<Producer<Schemas.PersonOne>> producers = Lists.newArrayList(producer1, producer2, producer3);
+        admin1.topics().createSubscription(topic.toString(), subName, MessageId.earliest);
+        admin2.topics().createSubscription(topic.toString(), subName, MessageId.earliest);
+        admin3.topics().createSubscription(topic.toString(), subName, MessageId.earliest);
+
+
+        for (int i = 0; i < 10; i++) {
+            producer.send(new Schemas.PersonOne(i));
+        }
+
+        Awaitility.await().untilAsserted(() -> {
+            assertTrue(admin1.topics().getInternalStats(topic.toString()).schemaLedgers.size() > 0);
+            assertTrue(admin2.topics().getInternalStats(topic.toString()).schemaLedgers.size() > 0);
+            assertTrue(admin3.topics().getInternalStats(topic.toString()).schemaLedgers.size() > 0);
+        });
 
         @Cleanup
         Consumer<Schemas.PersonOne> consumer1 = client1.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
@@ -424,8 +432,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
                 .subscriptionName(subName)
                 .subscribe();
 
-        for (int i = 0; i < 3; i++) {
-            producers.get(i).send(new Schemas.PersonOne(i));
+        for (int i = 0; i < 10; i++) {
             Message<Schemas.PersonOne> msg1 = consumer1.receive();
             Message<Schemas.PersonOne> msg2 = consumer2.receive();
             Message<Schemas.PersonOne> msg3 = consumer3.receive();
@@ -1395,15 +1402,21 @@ public class ReplicatorTest extends ReplicatorTestBase {
 
         PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopic(dest.toString(), false)
                 .getNow(null).get();
+        MessageIdImpl lastMessageId = (MessageIdImpl) topic.getLastMessageId().get();
+        Position lastPosition = PositionImpl.get(lastMessageId.getLedgerId(), lastMessageId.getEntryId());
         ConcurrentOpenHashMap<String, Replicator> replicators = topic.getReplicators();
         PersistentReplicator replicator = (PersistentReplicator) replicators.get("r2");
 
-        Awaitility.await().timeout(50, TimeUnit.SECONDS)
+        Awaitility.await().pollInterval(1, TimeUnit.SECONDS).timeout(30, TimeUnit.SECONDS)
                 .untilAsserted(() -> assertEquals(org.apache.pulsar.broker.service.AbstractReplicator.State.Started,
                         replicator.getState()));
 
         assertEquals(replicator.getState(), org.apache.pulsar.broker.service.AbstractReplicator.State.Started);
         ManagedCursorImpl cursor = (ManagedCursorImpl) replicator.getCursor();
+
+        // Make sure all the data has replicated to the remote cluster before close the cursor.
+        Awaitility.await().untilAsserted(() -> assertEquals(cursor.getMarkDeletedPosition(), lastPosition));
+
         cursor.setState(State.Closed);
 
         Field field = ManagedCursorImpl.class.getDeclaredField("state");
@@ -1412,22 +1425,16 @@ public class ReplicatorTest extends ReplicatorTestBase {
 
         producer1.produce(10);
 
-        Position deletedPos = cursor.getMarkDeletedPosition();
-        Position readPos = cursor.getReadPosition();
-
-        Awaitility.await().timeout(30, TimeUnit.SECONDS).until(
-                () -> cursor.getMarkDeletedPosition().getEntryId() != (cursor.getReadPosition().getEntryId() - 1));
-
-        assertNotEquals((readPos.getEntryId() - 1), deletedPos.getEntryId());
+        // The cursor is closed, so the mark delete position will not move forward.
+        assertEquals(cursor.getMarkDeletedPosition(), lastPosition);
 
         field.set(cursor, State.Open);
 
         Awaitility.await().timeout(30, TimeUnit.SECONDS).until(
-                () -> cursor.getMarkDeletedPosition().getEntryId() == (cursor.getReadPosition().getEntryId() - 1));
-
-        deletedPos = cursor.getMarkDeletedPosition();
-        readPos = cursor.getReadPosition();
-        assertEquals((readPos.getEntryId() - 1), deletedPos.getEntryId());
+                () -> {
+                    log.info("++++++++++++ {}, {}", cursor.getMarkDeletedPosition(), cursor.getReadPosition());
+                    return cursor.getMarkDeletedPosition().getEntryId() == (cursor.getReadPosition().getEntryId() - 1);
+                });
     }
 
     private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 4970f95b7fc..6d1d586b49b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -52,6 +52,7 @@ import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
+import org.apache.pulsar.common.protocol.schema.SchemaHash;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -64,6 +65,8 @@ public class MessageImpl<T> implements Message<T> {
     private ByteBuf payload;
 
     private Schema<T> schema;
+
+    private SchemaHash schemaHash;
     private SchemaInfo schemaInfoForReplicator;
     private SchemaState schemaState = SchemaState.None;
     private Optional<EncryptionContext> encryptionCtx = Optional.empty();
@@ -91,6 +94,7 @@ public class MessageImpl<T> implements Message<T> {
         msg.payload = Unpooled.wrappedBuffer(payload);
         msg.properties = null;
         msg.schema = schema;
+        msg.schemaHash = SchemaHash.of(schema);
         msg.uncompressedSize = payload.remaining();
         return msg;
     }
@@ -431,9 +435,14 @@ public class MessageImpl<T> implements Message<T> {
         return schema.getSchemaInfo();
     }
 
+    public SchemaHash getSchemaHash() {
+        return schemaHash == null ? SchemaHash.of(new byte[0], null) : schemaHash;
+    }
+
     public void setSchemaInfoForReplicator(SchemaInfo schemaInfo) {
         if (msgMetadata.hasReplicatedFrom()) {
             this.schemaInfoForReplicator = schemaInfo;
+            this.schemaHash = SchemaHash.of(schemaInfo);
         } else {
             throw new IllegalArgumentException("Only allowed to set schemaInfoForReplicator for a replicated message.");
         }
@@ -763,9 +772,6 @@ public class MessageImpl<T> implements Message<T> {
     }
 
     SchemaState getSchemaState() {
-        if (getSchemaInfo() == null) {
-            return SchemaState.Ready;
-        }
         return schemaState;
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 64ed61bd995..792c8596d1b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -716,8 +716,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
             completeCallbackAndReleaseSemaphore(msg.getUncompressedSize(), callback, e);
             return false;
         }
-        SchemaHash schemaHash = SchemaHash.of(msg.getSchemaInternal());
-        byte[] schemaVersion = schemaCache.get(schemaHash);
+        byte[] schemaVersion = schemaCache.get(msg.getSchemaHash());
         if (schemaVersion != null) {
             msgMetadataBuilder.setSchemaVersion(schemaVersion);
             msg.setSchemaState(MessageImpl.SchemaState.Ready);
@@ -726,8 +725,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
     }
 
     private boolean rePopulateMessageSchema(MessageImpl msg) {
-        SchemaHash schemaHash = SchemaHash.of(msg.getSchemaInternal());
-        byte[] schemaVersion = schemaCache.get(schemaHash);
+        byte[] schemaVersion = schemaCache.get(msg.getSchemaHash());
         if (schemaVersion == null) {
             return false;
         }
@@ -758,8 +756,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                 // case, we should not cache the schema version so that the schema version of the message metadata will
                 // be null, instead of an empty array.
                 if (v.length != 0) {
-                    SchemaHash schemaHash = SchemaHash.of(msg.getSchemaInternal());
-                    schemaCache.putIfAbsent(schemaHash, v);
+                    schemaCache.putIfAbsent(msg.getSchemaHash(), v);
                     msg.getMessageBuilder().setSchemaVersion(v);
                 }
                 msg.setSchemaState(MessageImpl.SchemaState.Ready);
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
index 51fba75cd21..15288cecb1c 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
@@ -126,7 +126,7 @@ public class MultiTopicsConsumerImplTest {
     //
     // Code under tests is using CompletableFutures. Theses may hang indefinitely if code is broken.
     // That's why a test timeout is defined.
-    @Test(timeOut = 5000)
+    @Test(timeOut = 10000)
     public void testParallelSubscribeAsync() throws Exception {
         String topicName = "parallel-subscribe-async-topic";
         MultiTopicsConsumerImpl<byte[]> impl = createMultiTopicsConsumer();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaHash.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaHash.java
index 40220e6047a..8bbc18fbb70 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaHash.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaHash.java
@@ -54,7 +54,12 @@ public class SchemaHash {
         return of(schemaData.getData(), schemaData.getType());
     }
 
-    private static SchemaHash of(byte[] schemaBytes, SchemaType schemaType) {
+    public static SchemaHash of(SchemaInfo schemaInfo) {
+        return of(schemaInfo == null ? new byte[0] : schemaInfo.getSchema(),
+                schemaInfo == null ? null : schemaInfo.getType());
+    }
+
+    public static SchemaHash of(byte[] schemaBytes, SchemaType schemaType) {
         return new SchemaHash(hashFunction.hashBytes(schemaBytes), schemaType);
     }