You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/08/19 01:17:57 UTC

[pulsar] branch master updated: [fix][broker] Fix out of order data replication (#17154)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 39c1ee1aebc [fix][broker] Fix out of order data replication (#17154)
39c1ee1aebc is described below

commit 39c1ee1aebc06c85b7dcb203b8cfa16fe035ae27
Author: Penghui Li <pe...@apache.org>
AuthorDate: Fri Aug 19 09:17:48 2022 +0800

    [fix][broker] Fix out of order data replication (#17154)
    
    * [fix][broker] Fix out of order data replication
    
    ### Motivation
    
    The schema replication will break the data replication order while fetching
    the schema from the local cluster.
    
    https://github.com/apache/pulsar/blob/8a6ecd7d4c9399bb7ce5a224ca854e4a71db79b1/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java#L366-L369
    
    The method getSchemaInfo() is an async method that will reverse the order in which messages are written.
    
    ### Modification
    
    Added a new state for replicator `fetchSchemaInProgress` which means the
    replicator had detected a new schema that needed to fetch the schema info
    from the local cluster. During the schema fetching, the replicator should
    pause the data replicator and resume after the schema has been fetched.
---
 .../service/persistent/PersistentReplicator.java   | 50 ++++++++++++++++++----
 .../pulsar/broker/service/ReplicatorTest.java      | 50 ++++++++++++++++------
 2 files changed, 79 insertions(+), 21 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 953300e823f..2e7dbb2fbf1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -105,6 +105,8 @@ public class PersistentReplicator extends AbstractReplicator
 
     private final ReplicatorStatsImpl stats = new ReplicatorStatsImpl();
 
+    private volatile boolean fetchSchemaInProgress = false;
+
     public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String localCluster, String remoteCluster,
                                 BrokerService brokerService, PulsarClientImpl replicationClient)
             throws PulsarServerException {
@@ -220,6 +222,11 @@ public class PersistentReplicator extends AbstractReplicator
     }
 
     protected void readMoreEntries() {
+        if (fetchSchemaInProgress) {
+            log.info("[{}][{} -> {}] Skip the reading due to new detected schema",
+                    topicName, localCluster, remoteCluster);
+            return;
+        }
         int availablePermits = getAvailablePermits();
 
         if (availablePermits > 0) {
@@ -290,8 +297,15 @@ public class PersistentReplicator extends AbstractReplicator
             // This flag is set to true when we skip atleast one local message,
             // in order to skip remaining local messages.
             boolean isLocalMessageSkippedOnce = false;
+            boolean skipRemainingMessages = false;
             for (int i = 0; i < entries.size(); i++) {
                 Entry entry = entries.get(i);
+                // Skip the messages since the replicator need to fetch the schema info to replicate the schema to the
+                // remote cluster. Rewind the cursor first and continue the message read after fetched the schema.
+                if (skipRemainingMessages) {
+                    entry.release();
+                    continue;
+                }
                 int length = entry.getLength();
                 ByteBuf headersAndPayload = entry.getDataBuffer();
                 MessageImpl msg;
@@ -364,16 +378,34 @@ public class PersistentReplicator extends AbstractReplicator
 
                 headersAndPayload.retain();
 
-                getSchemaInfo(msg).thenAccept(schemaInfo -> {
-                    msg.setSchemaInfoForReplicator(schemaInfo);
+                CompletableFuture<SchemaInfo> schemaFuture = getSchemaInfo(msg);
+                if (!schemaFuture.isDone() || schemaFuture.isCompletedExceptionally()) {
+                    entry.release();
+                    headersAndPayload.release();
+                    msg.recycle();
+                    // Mark the replicator is fetching the schema for now and rewind the cursor
+                    // and trigger the next read after complete the schema fetching.
+                    fetchSchemaInProgress = true;
+                    skipRemainingMessages = true;
+                    cursor.cancelPendingReadRequest();
+                    log.info("[{}][{} -> {}] Pause the data replication due to new detected schema", topicName,
+                            localCluster, remoteCluster);
+                    schemaFuture.whenComplete((__, e) -> {
+                       if (e != null) {
+                           log.warn("[{}][{} -> {}] Failed to get schema from local cluster, will try in the next loop",
+                                   topicName, localCluster, remoteCluster, e);
+                       }
+                       log.info("[{}][{} -> {}] Resume the data replication after the schema fetching done", topicName,
+                               localCluster, remoteCluster);
+                       cursor.rewind();
+                       fetchSchemaInProgress = false;
+                       readMoreEntries();
+                    });
+                } else {
+                    msg.setSchemaInfoForReplicator(schemaFuture.get());
                     producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg));
-                }).exceptionally(ex -> {
-                    log.error("[{}][{} -> {}] Failed to get schema from local cluster", topicName,
-                            localCluster, remoteCluster, ex);
-                    return null;
-                });
-
-                atLeastOneMessageSentForReplication = true;
+                    atLeastOneMessageSentForReplication = true;
+                }
             }
         } catch (Exception e) {
             log.error("[{}][{} -> {}] Unexpected exception: {}", topicName, localCluster, remoteCluster, e.getMessage(),
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index be232760b0b..3e3ef98df65 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -80,6 +80,7 @@ import org.apache.pulsar.client.api.RawReader;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -384,8 +385,11 @@ public class ReplicatorTest extends ReplicatorTestBase {
         consumer3.receive(1);
     }
 
-    @Test
+    @Test(invocationCount = 5)
     public void testReplicationWithSchema() throws Exception {
+        config1.setBrokerDeduplicationEnabled(true);
+        config2.setBrokerDeduplicationEnabled(true);
+        config3.setBrokerDeduplicationEnabled(true);
         PulsarClient client1 = pulsar1.getClient();
         PulsarClient client2 = pulsar2.getClient();
         PulsarClient client3 = pulsar3.getClient();
@@ -395,17 +399,29 @@ public class ReplicatorTest extends ReplicatorTestBase {
         final String subName = "my-sub";
 
         @Cleanup
-        Producer<Schemas.PersonOne> producer = client1.newProducer(Schema.AVRO(Schemas.PersonOne.class))
+        Producer<Schemas.PersonOne> producer1 = client1.newProducer(Schema.AVRO(Schemas.PersonOne.class))
+                .topic(topic.toString())
+                .enableBatching(false)
+                .create();
+
+        @Cleanup
+        Producer<Schemas.PersonThree> producer2 = client1.newProducer(Schema.AVRO(Schemas.PersonThree.class))
                 .topic(topic.toString())
+                .enableBatching(false)
                 .create();
 
         admin1.topics().createSubscription(topic.toString(), subName, MessageId.earliest);
         admin2.topics().createSubscription(topic.toString(), subName, MessageId.earliest);
         admin3.topics().createSubscription(topic.toString(), subName, MessageId.earliest);
 
+        final int totalMessages = 1000;
 
-        for (int i = 0; i < 10; i++) {
-            producer.send(new Schemas.PersonOne(i));
+        for (int i = 0; i < totalMessages / 2; i++) {
+            producer1.sendAsync(new Schemas.PersonOne(i));
+        }
+
+        for (int i = 500; i < totalMessages; i++) {
+            producer2.sendAsync(new Schemas.PersonThree(i, "name-" + i));
         }
 
         Awaitility.await().untilAsserted(() -> {
@@ -415,29 +431,39 @@ public class ReplicatorTest extends ReplicatorTestBase {
         });
 
         @Cleanup
-        Consumer<Schemas.PersonOne> consumer1 = client1.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
+        Consumer<GenericRecord> consumer1 = client1.newConsumer(Schema.AUTO_CONSUME())
                 .topic(topic.toString())
                 .subscriptionName(subName)
                 .subscribe();
 
         @Cleanup
-        Consumer<Schemas.PersonOne> consumer2 = client2.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
+        Consumer<GenericRecord> consumer2 = client2.newConsumer(Schema.AUTO_CONSUME())
                 .topic(topic.toString())
                 .subscriptionName(subName)
                 .subscribe();
 
         @Cleanup
-        Consumer<Schemas.PersonOne> consumer3 = client3.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
+        Consumer<GenericRecord> consumer3 = client3.newConsumer(Schema.AUTO_CONSUME())
                 .topic(topic.toString())
                 .subscriptionName(subName)
                 .subscribe();
 
-        for (int i = 0; i < 10; i++) {
-            Message<Schemas.PersonOne> msg1 = consumer1.receive();
-            Message<Schemas.PersonOne> msg2 = consumer2.receive();
-            Message<Schemas.PersonOne> msg3 = consumer3.receive();
+        int lastId = -1;
+        for (int i = 0; i < totalMessages; i++) {
+            Message<GenericRecord> msg1 = consumer1.receive();
+            Message<GenericRecord> msg2 = consumer2.receive();
+            Message<GenericRecord> msg3 = consumer3.receive();
             assertTrue(msg1 != null && msg2 != null && msg3 != null);
-            assertTrue(msg1.getValue().equals(msg2.getValue()) && msg2.getValue().equals(msg3.getValue()));
+            GenericRecord record1 = msg1.getValue();
+            GenericRecord record2 = msg2.getValue();
+            GenericRecord record3 = msg3.getValue();
+            int id1 = (int) record1.getField("id");
+            int id2 = (int) record2.getField("id");
+            int id3 = (int) record3.getField("id");
+            log.info("Received ids, id1: {}, id2: {}, id3: {}, lastId: {}", id1, id2, id3, lastId);
+            assertTrue(id1 == id2 && id2 == id3);
+            assertTrue(id1 > lastId);
+            lastId = id1;
             consumer1.acknowledge(msg1);
             consumer2.acknowledge(msg2);
             consumer3.acknowledge(msg3);