You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/11/12 22:55:05 UTC

[GitHub] [pulsar] michaeljmarshall opened a new pull request #12779: [Java Client] Use epoch to version producer's cnx to prevent early de…

michaeljmarshall opened a new pull request #12779:
URL: https://github.com/apache/pulsar/pull/12779


   …livery of messages
   
   ### Motivation
   
   I discovered a race condition in Pulsar’s Java Client `ProducerImpl` that can lead to messages persisted out-of-order for a single producer sending to a non-partitioned topic. This PR removes the race condition by keeping track of the current `cnx` and introducing a new update to the state while in a lock on the `ProducerImpl.this`.
   
   ### Reproducing the issue
   I can consistently reproduce the issue by interrupting an active producer sending messages to Pulsar. I interrupt the producer by restarting the broker hosting the producer's topic or by unloading the producer's topic/namespace. Because of the nature of this race condition, the out of order issue does not happen every time, but it does happen frequently enough to have noticed it. In order to increase the probability of the error, my test set up included producing to 100 topics on a 3 broker pulsar cluster at a total rate of 50k msgs/sec. I determined that the messages were out of order in two ways. First, by producing messages from a single thread and putting my own, monotonically increasing sequence id as the messages payload. Second, by inspecting the message's sequence id assigned by the pulsar java client. Inspecting the messages using the reader api revealed messages in sequences like `1,4,2,3,4`. The 4 in that sequence is duplicated because when the client receives an ack fo
 r an unexpected message, it re-sends all pending messages.
   
   ### Description of the problem
   The problem comes from the way that the pulsar producer changes state when a connection is closed and when a connection is opened. Here are the state changes that take place when a connection closes and when one opens:
   
   #### Connection closed:
   1. Set `cnx` to `null`, as long as the current `cnx` is the connection being closed.
   2. Set the state to `Connecting`.
   3. Schedule a runnable to get a new `cnx`.
   4. Once the task from step 3 is run, asynchronously get a `cnx` and then call `ProducerImpl#connectionOpened`.
   
   #### ProducerImpl#connectionOpened:
   1. Set `cnx` to the new connection.
   2. Send `Producer` command to the broker to register the new producer.
   3. Once the producer is registered, schedule a runnable to redeliver `pendingMessages`.
   4. Once the task from step 3 is run, go to `Ready`, as long as the current state is `Uninitialized`, `Connecting`, or `RegisteringSchema`.
   
   There is nothing that prevents a connection from being closed while another connection is currently being established in the `connectionOpened` method. This is exactly what exposes the race condition fixed by this PR. In the race, a connection is established and we call `connectionOpened` and successfully register the producer with the broker. Then, that connection is closed before step 4 of `connectionOpened` and the state changes from `Connecting` to `Connecting`. Because step 4 only checks that the current state is `Uninitialized`, `Connecting`, or `RegisteringSchema`, it updates the state to `Ready`. When adding some extra logging, I could see that the we changed the state to `Ready` while `cnx()` returned `null`. At this point, messages wouldn't yet deliver. When the new connection is established, the `connectionOpened` is called, and we set the new `cnx`. Since our state is still `Ready`, we start delivering any new messages received by the client. These are out of order. We
  asynchronously register the producer with the broker and then messages start to persist.
   
   ### Modifications
   
   1. Update where the producer’s `epoch` value is incremented. It was previously updated before getting a connection. However, this seems prone to races. By updating it when we have the connection and within a lock on the producer, we ensure that the connection gets the next highest epoch number.
   2. Set the state to `Connecting` in the `connectionOpened` method. This is important because the connection could have been closed after the check for `cnx() != null` in the `recoverProcessOpSendMsgFrom` method but before that method gets to the point of setting state to `Ready`. Since we update the state within the lock on `ProducerImpl.this`, we won't delivery any messages. There is still a chance that the broker will have state `Ready` and `cnx() == null`.
   3. Use the producer's `epoch` value to ensure the `cnx` reference passed to `recoverProcessOpSendMsgFrom` is still the current `cnx`. Ensure that `cnx() != null`. These checks ensure that it is safe to update state to `Ready`.
   
   ### Alternatives
   Instead of using the `epoch` value, we could have checked that `cnx() == cnx` in the `recoverProcessOpSendMsgFrom`. This check would be valid in all cases except where the next connection is the same connection. I think this would happen if a topic were unloaded from a broker and then loaded back on to the same broker. The epoch value gives a consistent way to know that the `cnx` is the current `cnx`.
   
   We could have added a new state called `RegisteringProducer`. I investigated this option, but it seemed more complicated and less elegant than this solution. I chose the simpler solution here.
   
   ### Verifying this change
   
   I tried to implement a unit test that would consistently reproduce this issue. I was only able to do so when I introduced a 50 millisecond delay in the scheduling of the runnable within `resendMessages`. Using that method, this PR prevented the race. I also built a custom client with this PR, and I can confirm that I observed the log line indicating `Producer epoch mismatch or the current connection is null.`. This log confirms that the race would have happened but was avoided.
   
   Also, I provided extra detail in this PR description since I wasn't able to add a new test to specifically verify this change. I think the change is pretty straightforward, and I try to add enough context to make the change easy to understand.
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API: no
     - The schema: no
     - The default values of configurations: no
     - The wire protocol: no
     - The rest endpoints: no
     - The admin cli options: no
     - Anything that affects deployment: no
   
   ### Documentation
   
   - [x] `no-need-doc` 
     
     This is an internal change to the client. We should make sure to include this fix in the release notes, but no documentation changes need to be made.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli merged pull request #12779: [Java Client] Use epoch to version producer's cnx to prevent early de…

Posted by GitBox <gi...@apache.org>.
eolivelli merged pull request #12779:
URL: https://github.com/apache/pulsar/pull/12779


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] michaeljmarshall commented on pull request #12779: [Java Client] Use epoch to version producer's cnx to prevent early de…

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on pull request #12779:
URL: https://github.com/apache/pulsar/pull/12779#issuecomment-967728503


   I sent a note to the mailing list to discuss this change: https://lists.apache.org/thread/6y6jfdx432j2gqxgk9cnhdw48fq1m6b1.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on pull request #12779: [Java Client] Use epoch to version producer's cnx to prevent early de…

Posted by GitBox <gi...@apache.org>.
Jason918 commented on pull request #12779:
URL: https://github.com/apache/pulsar/pull/12779#issuecomment-971306951


   > > Can you add a test to cover this case?
   > > Not sure if it's easy to do.
   > 
   > @Jason918 - as I mentioned in the PR description, I was only able to get a unit test to consistently reproduce the underlying issue by modifying the client code (scheduling a delay for one of the callbacks). I am pretty sure we have tests that verify producer (re)connection, which will verify that this code path works for the happy path. Also, I verified that this change correctly removes the race condition by testing in the k8s environment when I discovered the race. At this point, I'm not exactly sure how to add a test, but I am open to suggestions.
   
   Great. Thanks for the detailed explanation. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] michaeljmarshall commented on pull request #12779: [Java Client] Use epoch to version producer's cnx to prevent early de…

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on pull request #12779:
URL: https://github.com/apache/pulsar/pull/12779#issuecomment-971205083


   > Can you add a test to cover this case?
   Not sure if it's easy to do.
   
   @Jason918 - as I mentioned in the PR description, I was only able to get a unit test to consistently reproduce the underlying issue by modifying the client code (scheduling a delay for one of the callbacks). I am pretty sure we have tests that verify producer (re)connection, which will verify that this code path works for the happy path. Also, I verified that this change correctly removes the race condition by testing in the k8s environment when I discovered the race. At this point, I'm not exactly sure how to add a test, but I am open to suggestions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on a change in pull request #12779: [Java Client] Use epoch to version producer's cnx to prevent early de…

Posted by GitBox <gi...@apache.org>.
Jason918 commented on a change in pull request #12779:
URL: https://github.com/apache/pulsar/pull/12779#discussion_r748801509



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
##########
@@ -37,7 +37,8 @@
     protected final Backoff backoff;
     private static final AtomicLongFieldUpdater<ConnectionHandler> EPOCH_UPDATER = AtomicLongFieldUpdater
             .newUpdater(ConnectionHandler.class, "epoch");
-    private volatile long epoch = 0L;
+    // Start with -1L because it gets incremented before sending on the first connection
+    private volatile long epoch = -1L;

Review comment:
       OK, I see. I am OK with this new starting value.
   But IMHO, it's better to assert that the epoch value increased after a single producer reconnect to avoid more flaky test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] michaeljmarshall commented on a change in pull request #12779: [Java Client] Use epoch to version producer's cnx to prevent early de…

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on a change in pull request #12779:
URL: https://github.com/apache/pulsar/pull/12779#discussion_r748797878



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
##########
@@ -37,7 +37,8 @@
     protected final Backoff backoff;
     private static final AtomicLongFieldUpdater<ConnectionHandler> EPOCH_UPDATER = AtomicLongFieldUpdater
             .newUpdater(ConnectionHandler.class, "epoch");
-    private volatile long epoch = 0L;
+    // Start with -1L because it gets incremented before sending on the first connection
+    private volatile long epoch = -1L;

Review comment:
       I don't believe the starting value inherently means anything. However, the test `ProducerCreationTest#testGeneratedNameProducerReconnect` asserts that the `epoch` value is 2 after a single producer reconnect. I could have updated the test or the starting value. I chose to update the starting value here to maintain the original behavior.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] michaeljmarshall commented on pull request #12779: [Java Client] Use epoch to version producer's cnx to prevent early de…

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on pull request #12779:
URL: https://github.com/apache/pulsar/pull/12779#issuecomment-977142363


   @merlimat @ivankelly @eolivelli @BewareMyPower @codelipenghui PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] michaeljmarshall commented on pull request #12779: [Java Client] Use epoch to version producer's cnx to prevent early de…

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on pull request #12779:
URL: https://github.com/apache/pulsar/pull/12779#issuecomment-967774285


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on a change in pull request #12779: [Java Client] Use epoch to version producer's cnx to prevent early de…

Posted by GitBox <gi...@apache.org>.
Jason918 commented on a change in pull request #12779:
URL: https://github.com/apache/pulsar/pull/12779#discussion_r748715376



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
##########
@@ -37,7 +37,8 @@
     protected final Backoff backoff;
     private static final AtomicLongFieldUpdater<ConnectionHandler> EPOCH_UPDATER = AtomicLongFieldUpdater
             .newUpdater(ConnectionHandler.class, "epoch");
-    private volatile long epoch = 0L;
+    // Start with -1L because it gets incremented before sending on the first connection
+    private volatile long epoch = -1L;

Review comment:
       Does the starting value of epoch mean anything? In my understanding, it's just used for comparison, right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org