You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/04/29 07:44:05 UTC
[pulsar] branch branch-2.7 updated: [Broker] Fix issue in reusing
EntryBatchIndexesAcks instances (#10400)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 90c0c18 [Broker] Fix issue in reusing EntryBatchIndexesAcks instances (#10400)
90c0c18 is described below
commit 90c0c18e30ab41b1aea25cdb242e2671914f2e96
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Wed Apr 28 00:32:33 2021 +0300
[Broker] Fix issue in reusing EntryBatchIndexesAcks instances (#10400)
* [Broker] Fix issue in reusing EntryBatchIndexesAcks instances
* Make fields private
* Refactor code: rename maxSize -> size and do some cleanup
* Cleanup test case
* Clear references before recycling
---
.../broker/service/EntryBatchIndexesAcks.java | 22 ++++++---
.../broker/service/EntryBatchIndexesAcksTest.java | 53 ++++++++++++++++++++++
2 files changed, 68 insertions(+), 7 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryBatchIndexesAcks.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryBatchIndexesAcks.java
index b467c6f..75a82b1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryBatchIndexesAcks.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryBatchIndexesAcks.java
@@ -25,8 +25,8 @@ import org.apache.commons.lang3.tuple.Pair;
import java.util.BitSet;
public class EntryBatchIndexesAcks {
-
- Pair<Integer, long[]>[] indexesAcks = new Pair[100];
+ private int size = 100;
+ private Pair<Integer, long[]>[] indexesAcks = new Pair[size];
public void setIndexesAcks(int entryIdx, Pair<Integer, long[]> indexesAcks) {
this.indexesAcks[entryIdx] = indexesAcks;
@@ -39,7 +39,8 @@ public class EntryBatchIndexesAcks {
public int getTotalAckedIndexCount() {
int count = 0;
- for (Pair<Integer, long[]> pair : indexesAcks) {
+ for (int i = 0; i < size; i++) {
+ Pair<Integer, long[]> pair = indexesAcks[i];
if (pair != null) {
count += pair.getLeft() - BitSet.valueOf(pair.getRight()).cardinality();
}
@@ -48,15 +49,22 @@ public class EntryBatchIndexesAcks {
}
public void recycle() {
+ for (int i = 0; i < size; i++) {
+ indexesAcks[i] = null;
+ }
handle.recycle(this);
}
+ private void ensureCapacityAndSetSize(int entriesListSize) {
+ size = entriesListSize;
+ if (indexesAcks.length < size) {
+ indexesAcks = new Pair[size];
+ }
+ }
+
public static EntryBatchIndexesAcks get(int entriesListSize) {
EntryBatchIndexesAcks ebi = RECYCLER.get();
-
- if (ebi.indexesAcks.length < entriesListSize) {
- ebi.indexesAcks = new Pair[entriesListSize];
- }
+ ebi.ensureCapacityAndSetSize(entriesListSize);
return ebi;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/EntryBatchIndexesAcksTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/EntryBatchIndexesAcksTest.java
new file mode 100644
index 0000000..a19bb4e
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/EntryBatchIndexesAcksTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.testng.Assert.assertEquals;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.common.util.collections.BitSetRecyclable;
+import org.testng.annotations.Test;
+
+public class EntryBatchIndexesAcksTest {
+
+ @Test
+ void shouldResetStateBeforeReusing() {
+ // given
+ // a bitset with 95 bits set
+ BitSetRecyclable bitSet = BitSetRecyclable.create();
+ bitSet.set(0, 95);
+ long[] nintyFiveBitsSet = bitSet.toLongArray();
+ // and a EntryBatchIndexesAcks for the size of 10
+ EntryBatchIndexesAcks acks = EntryBatchIndexesAcks.get(10);
+
+ // when setting 2 indexes with 95/100 bits set in each (5 "acked" in each)
+ acks.setIndexesAcks(8, Pair.of(100, nintyFiveBitsSet));
+ acks.setIndexesAcks(9, Pair.of(100, nintyFiveBitsSet));
+
+ // then the totalAckedIndexCount should be 10
+ assertEquals(acks.getTotalAckedIndexCount(), 10);
+
+ // when recycled and used again
+ acks.recycle();
+ acks = EntryBatchIndexesAcks.get(2);
+
+ // then there should be no previous state and totalAckedIndexCount should be 0
+ assertEquals(acks.getTotalAckedIndexCount(), 0);
+ }
+
+}
\ No newline at end of file