You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/08/18 07:12:25 UTC

[GitHub] [pulsar] codelipenghui opened a new pull request, #17154: [fix][broker] Fix out of order data replication

codelipenghui opened a new pull request, #17154:
URL: https://github.com/apache/pulsar/pull/17154

   ### Motivation
   
   The schema replication will break the data replication order while fetching
   the schema from the local cluster.
   
   https://github.com/apache/pulsar/blob/8a6ecd7d4c9399bb7ce5a224ca854e4a71db79b1/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java#L366-L369
   
   The method getSchemaInfo() is an async method that will reverse the order in which messages are written.
   
   ### Modification
   
   Added a new state for replicator `fetchSchemaInProgress` which means the
   replicator had detected a new schema that needed to fetch the schema info
   from the local cluster. During the schema fetching, the replicator should
   pause the data replicator and resume after the schema has been fetched.
   
   ### Verification
   
   Updated the test to check the read message order.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17154: [fix][broker] Fix out of order data replication

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17154:
URL: https://github.com/apache/pulsar/pull/17154#discussion_r948967827


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java:
##########
@@ -364,16 +378,31 @@ public void readEntriesComplete(List<Entry> entries, Object ctx) {
 
                 headersAndPayload.retain();
 
-                getSchemaInfo(msg).thenAccept(schemaInfo -> {
-                    msg.setSchemaInfoForReplicator(schemaInfo);
+                CompletableFuture<SchemaInfo> schemaFuture = getSchemaInfo(msg);
+                if (!schemaFuture.isDone() || schemaFuture.isCompletedExceptionally()) {
+                    // Mark the replicator is fetching the schema for now and rewind the cursor
+                    // and trigger the next read after complete the schema fetching.
+                    fetchSchemaInProgress = true;
+                    skipRemainingMessages = true;

Review Comment:
   Oh, nice catch!
   Yes, we need to release the first entry.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #17154: [fix][broker] Fix out of order data replication

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #17154:
URL: https://github.com/apache/pulsar/pull/17154#discussion_r948879024


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java:
##########
@@ -384,8 +385,11 @@ public void testReplication(String namespace) throws Exception {
         consumer3.receive(1);
     }
 
-    @Test
+    @Test(invocationCount = 5)
     public void testReplicationWithSchema() throws Exception {
+        config1.setBrokerDeduplicationEnabled(true);
+        config2.setBrokerDeduplicationEnabled(true);
+        config3.setBrokerDeduplicationEnabled(true);

Review Comment:
   I agree



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] gaoran10 commented on a diff in pull request #17154: [fix][broker] Fix out of order data replication

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on code in PR #17154:
URL: https://github.com/apache/pulsar/pull/17154#discussion_r948949580


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java:
##########
@@ -364,16 +378,31 @@ public void readEntriesComplete(List<Entry> entries, Object ctx) {
 
                 headersAndPayload.retain();
 
-                getSchemaInfo(msg).thenAccept(schemaInfo -> {
-                    msg.setSchemaInfoForReplicator(schemaInfo);
+                CompletableFuture<SchemaInfo> schemaFuture = getSchemaInfo(msg);
+                if (!schemaFuture.isDone() || schemaFuture.isCompletedExceptionally()) {
+                    // Mark the replicator is fetching the schema for now and rewind the cursor
+                    // and trigger the next read after complete the schema fetching.
+                    fetchSchemaInProgress = true;
+                    skipRemainingMessages = true;

Review Comment:
    Do we need to release the first entry here? Or maybe we can break the loop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui merged pull request #17154: [fix][broker] Fix out of order data replication

Posted by GitBox <gi...@apache.org>.
codelipenghui merged PR #17154:
URL: https://github.com/apache/pulsar/pull/17154


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17154: [fix][broker] Fix out of order data replication

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17154:
URL: https://github.com/apache/pulsar/pull/17154#discussion_r948736894


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java:
##########
@@ -384,8 +385,11 @@ public void testReplication(String namespace) throws Exception {
         consumer3.receive(1);
     }
 
-    @Test
+    @Test(invocationCount = 5)
     public void testReplicationWithSchema() throws Exception {
+        config1.setBrokerDeduplicationEnabled(true);
+        config2.setBrokerDeduplicationEnabled(true);
+        config3.setBrokerDeduplicationEnabled(true);

Review Comment:
   I thought the deduplication is enabled for geo-replication by default, but not actually.
   Without this configuration, the test might get duplicated messages because we will rewind the cursor
   after the schema is fetched.
   
   And it also should be problem for geo-replication that the producer also can disconnect to the remote
   cluster and introduce the duplicated message.
   
   IMO, we should enable the data deduplication by default for geo-replication. It should be discussed under the
   mailing list first, so I don't want to have this change in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] gaoran10 commented on a diff in pull request #17154: [fix][broker] Fix out of order data replication

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on code in PR #17154:
URL: https://github.com/apache/pulsar/pull/17154#discussion_r948949580


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java:
##########
@@ -364,16 +378,31 @@ public void readEntriesComplete(List<Entry> entries, Object ctx) {
 
                 headersAndPayload.retain();
 
-                getSchemaInfo(msg).thenAccept(schemaInfo -> {
-                    msg.setSchemaInfoForReplicator(schemaInfo);
+                CompletableFuture<SchemaInfo> schemaFuture = getSchemaInfo(msg);
+                if (!schemaFuture.isDone() || schemaFuture.isCompletedExceptionally()) {
+                    // Mark the replicator is fetching the schema for now and rewind the cursor
+                    // and trigger the next read after complete the schema fetching.
+                    fetchSchemaInProgress = true;
+                    skipRemainingMessages = true;

Review Comment:
   Do we need to release the first entry here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on pull request #17154: [fix][broker] Fix out of order data replication

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on PR #17154:
URL: https://github.com/apache/pulsar/pull/17154#issuecomment-1219286097

   > I'm just wondering if we have another easy way to make CompletableFuture execute sequentially and don't require many states.
   
   In this way, we also need to maintain a shared Future there. And if we don't want to create a Future for each message, we also need to check the Future state to decide to call `thenCompose()` or use it directly.
   
   And we'd better avoid the subsequent read operations during fetching the schema. If a broker has 10k topics run into this state, too many messages will be pending here which will use up the memory.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] gaoran10 commented on a diff in pull request #17154: [fix][broker] Fix out of order data replication

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on code in PR #17154:
URL: https://github.com/apache/pulsar/pull/17154#discussion_r948949580


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java:
##########
@@ -364,16 +378,31 @@ public void readEntriesComplete(List<Entry> entries, Object ctx) {
 
                 headersAndPayload.retain();
 
-                getSchemaInfo(msg).thenAccept(schemaInfo -> {
-                    msg.setSchemaInfoForReplicator(schemaInfo);
+                CompletableFuture<SchemaInfo> schemaFuture = getSchemaInfo(msg);
+                if (!schemaFuture.isDone() || schemaFuture.isCompletedExceptionally()) {
+                    // Mark the replicator is fetching the schema for now and rewind the cursor
+                    // and trigger the next read after complete the schema fetching.
+                    fetchSchemaInProgress = true;
+                    skipRemainingMessages = true;

Review Comment:
    Do we need to release the first entry here? Or maybe we can break the loop here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org