You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/09/21 04:03:45 UTC

[GitHub] [pulsar] merlimat commented on a change in pull request #11884: [Client] Fix ConcurrentModificationException in sendAsync

merlimat commented on a change in pull request #11884:
URL: https://github.com/apache/pulsar/pull/11884#discussion_r712677063



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1291,6 +1289,50 @@ protected OpSendMsg newObject(Handle<OpSendMsg> handle) {
         };
     }
 
+    /**
+     * Queue implementation that is used as the pending messages queue.
+     *
+     * This implementation postpones adding of new OpSendMsg entries that happen
+     * while the forEach call is in progress. This is needed for preventing
+     * ConcurrentModificationExceptions that would occur when the forEach action
+     * calls the add method via a callback in user code.
+     *
+     * This queue is not thread safe.
+     */
+    protected static class OpSendMsgQueue extends ArrayDeque<OpSendMsg> {

Review comment:
       Would it make sense here to use composition instead of inheritance? This way we could make sure that only few methods can be called instead of exposing the whole interface of ArrayDeque.

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1291,6 +1289,50 @@ protected OpSendMsg newObject(Handle<OpSendMsg> handle) {
         };
     }
 
+    /**
+     * Queue implementation that is used as the pending messages queue.
+     *
+     * This implementation postpones adding of new OpSendMsg entries that happen
+     * while the forEach call is in progress. This is needed for preventing
+     * ConcurrentModificationExceptions that would occur when the forEach action
+     * calls the add method via a callback in user code.
+     *
+     * This queue is not thread safe.
+     */
+    protected static class OpSendMsgQueue extends ArrayDeque<OpSendMsg> {
+        // SpotBugs requires this class to follow rules for serializable classes
+        private static final long serialVersionUID = 1L;
+        private int forEachDepth = 0;
+        private final transient List<OpSendMsg> postponedOpSendMgs = new ArrayList<>();

Review comment:
       Maybe we could keep this as `null` and initialize it only if needed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org