You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/04/29 07:44:05 UTC

[pulsar] branch branch-2.7 updated: [Broker] Fix issue in reusing EntryBatchIndexesAcks instances (#10400)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 90c0c18  [Broker] Fix issue in reusing EntryBatchIndexesAcks instances (#10400)
90c0c18 is described below

commit 90c0c18e30ab41b1aea25cdb242e2671914f2e96
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Wed Apr 28 00:32:33 2021 +0300

    [Broker] Fix issue in reusing EntryBatchIndexesAcks instances (#10400)
    
    * [Broker] Fix issue in reusing EntryBatchIndexesAcks instances
    
    * Make fields private
    
    * Refactor code: rename maxSize -> size and do some cleanup
    
    * Cleanup test case
    
    * Clear references before recycling
---
 .../broker/service/EntryBatchIndexesAcks.java      | 22 ++++++---
 .../broker/service/EntryBatchIndexesAcksTest.java  | 53 ++++++++++++++++++++++
 2 files changed, 68 insertions(+), 7 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryBatchIndexesAcks.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryBatchIndexesAcks.java
index b467c6f..75a82b1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryBatchIndexesAcks.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryBatchIndexesAcks.java
@@ -25,8 +25,8 @@ import org.apache.commons.lang3.tuple.Pair;
 import java.util.BitSet;
 
 public class EntryBatchIndexesAcks {
-
-    Pair<Integer, long[]>[] indexesAcks = new Pair[100];
+    private int size = 100;
+    private Pair<Integer, long[]>[] indexesAcks = new Pair[size];
 
     public void setIndexesAcks(int entryIdx, Pair<Integer, long[]> indexesAcks) {
         this.indexesAcks[entryIdx] = indexesAcks;
@@ -39,7 +39,8 @@ public class EntryBatchIndexesAcks {
 
     public int getTotalAckedIndexCount() {
         int count = 0;
-        for (Pair<Integer, long[]> pair : indexesAcks) {
+        for (int i = 0; i < size; i++) {
+            Pair<Integer, long[]> pair = indexesAcks[i];
             if (pair != null) {
                 count += pair.getLeft() - BitSet.valueOf(pair.getRight()).cardinality();
             }
@@ -48,15 +49,22 @@ public class EntryBatchIndexesAcks {
     }
 
     public void recycle() {
+        for (int i = 0; i < size; i++) {
+            indexesAcks[i] = null;
+        }
         handle.recycle(this);
     }
 
+    private void ensureCapacityAndSetSize(int entriesListSize) {
+        size = entriesListSize;
+        if (indexesAcks.length < size) {
+            indexesAcks = new Pair[size];
+        }
+    }
+
     public static EntryBatchIndexesAcks get(int entriesListSize) {
         EntryBatchIndexesAcks ebi = RECYCLER.get();
-
-        if (ebi.indexesAcks.length < entriesListSize) {
-            ebi.indexesAcks = new Pair[entriesListSize];
-        }
+        ebi.ensureCapacityAndSetSize(entriesListSize);
         return ebi;
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/EntryBatchIndexesAcksTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/EntryBatchIndexesAcksTest.java
new file mode 100644
index 0000000..a19bb4e
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/EntryBatchIndexesAcksTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.testng.Assert.assertEquals;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.common.util.collections.BitSetRecyclable;
+import org.testng.annotations.Test;
+
+public class EntryBatchIndexesAcksTest {
+
+    @Test
+    void shouldResetStateBeforeReusing() {
+        // given
+        // a bitset with 95 bits set
+        BitSetRecyclable bitSet = BitSetRecyclable.create();
+        bitSet.set(0, 95);
+        long[] nintyFiveBitsSet = bitSet.toLongArray();
+        // and a EntryBatchIndexesAcks for the size of 10
+        EntryBatchIndexesAcks acks = EntryBatchIndexesAcks.get(10);
+
+        // when setting 2 indexes with 95/100 bits set in each (5 "acked" in each)
+        acks.setIndexesAcks(8, Pair.of(100, nintyFiveBitsSet));
+        acks.setIndexesAcks(9, Pair.of(100, nintyFiveBitsSet));
+
+        // then the totalAckedIndexCount should be 10
+        assertEquals(acks.getTotalAckedIndexCount(), 10);
+
+        // when recycled and used again
+        acks.recycle();
+        acks = EntryBatchIndexesAcks.get(2);
+
+        // then there should be no previous state and totalAckedIndexCount should be 0
+        assertEquals(acks.getTotalAckedIndexCount(), 0);
+    }
+
+}
\ No newline at end of file