You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by gi...@git.apache.org on 2017/08/02 00:13:12 UTC

[GitHub] jai1 commented on a change in pull request #619: make batch-ack thread-safe

jai1 commented on a change in pull request #619: make batch-ack thread-safe
URL: https://github.com/apache/incubator-pulsar/pull/619#discussion_r130760041
 
 

 ##########
 File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
 ##########
 @@ -338,15 +338,25 @@ boolean markAckForBatchMessage(BatchMessageIdImpl batchMessageId, AckType ackTyp
             return true;
         }
         int batchIndex = batchMessageId.getBatchIndex();
-        int batchSize = bitSet.length();
-        if (ackType == AckType.Individual) {
-            bitSet.clear(batchIndex);
-        } else {
-            // +1 since to argument is exclusive
-            bitSet.clear(0, batchIndex + 1);
+        // bitset is not thread-safe and requires external synchronization
+        int batchSize = 0;
+        boolean isAllMsgsAcked = false;
+        lock.writeLock().lock();
+        try {
+            batchSize = bitSet.length();
+            if (ackType == AckType.Individual) {
+                bitSet.clear(batchIndex);
+            } else {
+                // +1 since to argument is exclusive
+                bitSet.clear(0, batchIndex + 1);
+            }
+            isAllMsgsAcked = bitSet.isEmpty();
+        } finally {
+            lock.writeLock().unlock();
         }
+        
         // all messages in this batch have been acked
-        if (bitSet.isEmpty()) {
+        if (isAllMsgsAcked) {
 
 Review comment:
   Do we need a lock on line 380 as well?
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services