You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/07/09 02:25:17 UTC

[GitHub] [pulsar] merlimat opened a new pull request, #16490: Fixed error when delayed messages trackers state grows to >1.5GB

merlimat opened a new pull request, #16490:
URL: https://github.com/apache/pulsar/pull/16490

   ### Motivation
   
   The delayed messages tracker is using a `ByteBuf` in direct memory to store the priority queue. When the number of messages tracked grows a lot, the array gets doubled up to 1.5 GB. The next size increase will fail because 3.0 GB would be bigger than the max size for ByteBuf, 2GB, since they're indexed by integers.
   
   ### Modification
   
   Instead of using a single `ByteBuf`, use a list of buffers with a max segment size. 
   
   The first buffer will be expanded as normal, though the other buffers will always be of fixed size. 
   
   This approach has multiple benefits:
    1. Allows the tracker for a single topic to not be limited by 2GB
    2. There is much less memory wasted, since we don't have to double the buffer each time, but rather just add more segments
    3.  It will make expansions and shrinking of the tracker much more efficient since we will not have to copy the entire data set, just adding or removing segments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] merlimat commented on pull request #16490: Fixed error when delayed messages trackers state grows to >1.5GB

Posted by GitBox <gi...@apache.org>.
merlimat commented on PR #16490:
URL: https://github.com/apache/pulsar/pull/16490#issuecomment-1181924262

   > Overall looks good to me.
   > Would you please add a unit test for `SegmentedLongArray`?
   
   👍 Added


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] merlimat commented on a diff in pull request #16490: Fixed error when delayed messages trackers state grows to >1.5GB

Posted by GitBox <gi...@apache.org>.
merlimat commented on code in PR #16490:
URL: https://github.com/apache/pulsar/pull/16490#discussion_r917273349


##########
pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/SegmentedLongArray.java:
##########
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.collections;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.concurrent.NotThreadSafe;
+import lombok.Getter;
+
+@NotThreadSafe
+public class SegmentedLongArray implements AutoCloseable {
+
+    private static final int SIZE_OF_LONG = 8;
+
+    private static final int MAX_SEGMENT_SIZE = 2 * 1024 * 1024; // 2M longs -> 16 MB
+    private final List<ByteBuf> buffers = new ArrayList<>();
+
+    @Getter
+    private final long initialCapacity;
+
+    @Getter
+    private long capacity;
+
+    public SegmentedLongArray(long initialCapacity) {
+        long remainingToAdd = initialCapacity;
+
+        while (remainingToAdd > 0) {
+            int sizeToAdd = (int) Math.min(remainingToAdd, MAX_SEGMENT_SIZE);
+            ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(sizeToAdd * SIZE_OF_LONG);
+            buffer.writerIndex(sizeToAdd * SIZE_OF_LONG);
+            buffers.add(buffer);
+            remainingToAdd -= sizeToAdd;
+        }
+
+        this.initialCapacity = initialCapacity;
+        this.capacity = this.initialCapacity;
+    }
+
+    public void writeLong(long offset, long value) {
+        int bufferIdx = (int) (offset / MAX_SEGMENT_SIZE);
+        int internalIdx = (int) (offset % MAX_SEGMENT_SIZE);
+        buffers.get(bufferIdx).setLong(internalIdx * SIZE_OF_LONG, value);
+    }
+
+    public long readLong(long offset) {
+        int bufferIdx = (int) (offset / MAX_SEGMENT_SIZE);
+        int internalIdx = (int) (offset % MAX_SEGMENT_SIZE);
+        return buffers.get(bufferIdx).getLong(internalIdx * SIZE_OF_LONG);
+    }
+
+    public void increaseCapacity() {
+        if (capacity < MAX_SEGMENT_SIZE) {

Review Comment:
   All the segments, after the first one, will be created directly at `MAX_SEGMENT_SIZE` and will not get expanded/shrinked, only added or removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] merlimat commented on a diff in pull request #16490: Fixed error when delayed messages trackers state grows to >1.5GB

Posted by GitBox <gi...@apache.org>.
merlimat commented on code in PR #16490:
URL: https://github.com/apache/pulsar/pull/16490#discussion_r917277413


##########
pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/SegmentedLongArray.java:
##########
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.collections;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.concurrent.NotThreadSafe;
+import lombok.Getter;
+
+@NotThreadSafe
+public class SegmentedLongArray implements AutoCloseable {
+
+    private static final int SIZE_OF_LONG = 8;
+
+    private static final int MAX_SEGMENT_SIZE = 2 * 1024 * 1024; // 2M longs -> 16 MB
+    private final List<ByteBuf> buffers = new ArrayList<>();
+
+    @Getter
+    private final long initialCapacity;
+
+    @Getter
+    private long capacity;
+
+    public SegmentedLongArray(long initialCapacity) {
+        long remainingToAdd = initialCapacity;
+
+        while (remainingToAdd > 0) {
+            int sizeToAdd = (int) Math.min(remainingToAdd, MAX_SEGMENT_SIZE);
+            ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(sizeToAdd * SIZE_OF_LONG);
+            buffer.writerIndex(sizeToAdd * SIZE_OF_LONG);
+            buffers.add(buffer);
+            remainingToAdd -= sizeToAdd;
+        }
+
+        this.initialCapacity = initialCapacity;
+        this.capacity = this.initialCapacity;
+    }
+
+    public void writeLong(long offset, long value) {
+        int bufferIdx = (int) (offset / MAX_SEGMENT_SIZE);
+        int internalIdx = (int) (offset % MAX_SEGMENT_SIZE);
+        buffers.get(bufferIdx).setLong(internalIdx * SIZE_OF_LONG, value);
+    }
+
+    public long readLong(long offset) {
+        int bufferIdx = (int) (offset / MAX_SEGMENT_SIZE);
+        int internalIdx = (int) (offset % MAX_SEGMENT_SIZE);
+        return buffers.get(bufferIdx).getLong(internalIdx * SIZE_OF_LONG);
+    }
+
+    public void increaseCapacity() {
+        if (capacity < MAX_SEGMENT_SIZE) {

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #16490: Fixed error when delayed messages trackers state grows to >1.5GB

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #16490:
URL: https://github.com/apache/pulsar/pull/16490#discussion_r917274078


##########
pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/SegmentedLongArray.java:
##########
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.collections;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.concurrent.NotThreadSafe;
+import lombok.Getter;
+
+@NotThreadSafe
+public class SegmentedLongArray implements AutoCloseable {
+
+    private static final int SIZE_OF_LONG = 8;
+
+    private static final int MAX_SEGMENT_SIZE = 2 * 1024 * 1024; // 2M longs -> 16 MB
+    private final List<ByteBuf> buffers = new ArrayList<>();
+
+    @Getter
+    private final long initialCapacity;
+
+    @Getter
+    private long capacity;
+
+    public SegmentedLongArray(long initialCapacity) {
+        long remainingToAdd = initialCapacity;
+
+        while (remainingToAdd > 0) {
+            int sizeToAdd = (int) Math.min(remainingToAdd, MAX_SEGMENT_SIZE);
+            ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(sizeToAdd * SIZE_OF_LONG);
+            buffer.writerIndex(sizeToAdd * SIZE_OF_LONG);
+            buffers.add(buffer);
+            remainingToAdd -= sizeToAdd;
+        }
+
+        this.initialCapacity = initialCapacity;
+        this.capacity = this.initialCapacity;
+    }
+
+    public void writeLong(long offset, long value) {
+        int bufferIdx = (int) (offset / MAX_SEGMENT_SIZE);
+        int internalIdx = (int) (offset % MAX_SEGMENT_SIZE);
+        buffers.get(bufferIdx).setLong(internalIdx * SIZE_OF_LONG, value);
+    }
+
+    public long readLong(long offset) {
+        int bufferIdx = (int) (offset / MAX_SEGMENT_SIZE);
+        int internalIdx = (int) (offset % MAX_SEGMENT_SIZE);
+        return buffers.get(bufferIdx).getLong(internalIdx * SIZE_OF_LONG);
+    }
+
+    public void increaseCapacity() {
+        if (capacity < MAX_SEGMENT_SIZE) {

Review Comment:
   I think here will add? https://github.com/apache/pulsar/pull/16490/files#diff-ee5391a05253205e8b9bb0f52faa7397326ec0687513204b3bf1b5244d2b8504R45-R50



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] merlimat commented on a diff in pull request #16490: Fixed error when delayed messages trackers state grows to >1.5GB

Posted by GitBox <gi...@apache.org>.
merlimat commented on code in PR #16490:
URL: https://github.com/apache/pulsar/pull/16490#discussion_r917275497


##########
pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/SegmentedLongArray.java:
##########
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.collections;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.concurrent.NotThreadSafe;
+import lombok.Getter;
+
+@NotThreadSafe
+public class SegmentedLongArray implements AutoCloseable {
+
+    private static final int SIZE_OF_LONG = 8;
+
+    private static final int MAX_SEGMENT_SIZE = 2 * 1024 * 1024; // 2M longs -> 16 MB
+    private final List<ByteBuf> buffers = new ArrayList<>();
+
+    @Getter
+    private final long initialCapacity;
+
+    @Getter
+    private long capacity;
+
+    public SegmentedLongArray(long initialCapacity) {
+        long remainingToAdd = initialCapacity;
+
+        while (remainingToAdd > 0) {
+            int sizeToAdd = (int) Math.min(remainingToAdd, MAX_SEGMENT_SIZE);
+            ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(sizeToAdd * SIZE_OF_LONG);
+            buffer.writerIndex(sizeToAdd * SIZE_OF_LONG);
+            buffers.add(buffer);
+            remainingToAdd -= sizeToAdd;
+        }
+
+        this.initialCapacity = initialCapacity;
+        this.capacity = this.initialCapacity;
+    }
+
+    public void writeLong(long offset, long value) {
+        int bufferIdx = (int) (offset / MAX_SEGMENT_SIZE);
+        int internalIdx = (int) (offset % MAX_SEGMENT_SIZE);
+        buffers.get(bufferIdx).setLong(internalIdx * SIZE_OF_LONG, value);
+    }
+
+    public long readLong(long offset) {
+        int bufferIdx = (int) (offset / MAX_SEGMENT_SIZE);
+        int internalIdx = (int) (offset % MAX_SEGMENT_SIZE);
+        return buffers.get(bufferIdx).getLong(internalIdx * SIZE_OF_LONG);
+    }
+
+    public void increaseCapacity() {
+        if (capacity < MAX_SEGMENT_SIZE) {

Review Comment:
   Ah, that's true! I need to fix the constructor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #16490: Fixed error when delayed messages trackers state grows to >1.5GB

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #16490:
URL: https://github.com/apache/pulsar/pull/16490#discussion_r917252418


##########
pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/SegmentedLongArray.java:
##########
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.collections;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.concurrent.NotThreadSafe;
+import lombok.Getter;
+
+@NotThreadSafe
+public class SegmentedLongArray implements AutoCloseable {
+
+    private static final int SIZE_OF_LONG = 8;
+
+    private static final int MAX_SEGMENT_SIZE = 2 * 1024 * 1024; // 2M longs -> 16 MB
+    private final List<ByteBuf> buffers = new ArrayList<>();
+
+    @Getter
+    private final long initialCapacity;
+
+    @Getter
+    private long capacity;
+
+    public SegmentedLongArray(long initialCapacity) {
+        long remainingToAdd = initialCapacity;
+
+        while (remainingToAdd > 0) {
+            int sizeToAdd = (int) Math.min(remainingToAdd, MAX_SEGMENT_SIZE);
+            ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(sizeToAdd * SIZE_OF_LONG);
+            buffer.writerIndex(sizeToAdd * SIZE_OF_LONG);
+            buffers.add(buffer);
+            remainingToAdd -= sizeToAdd;
+        }
+
+        this.initialCapacity = initialCapacity;
+        this.capacity = this.initialCapacity;
+    }
+
+    public void writeLong(long offset, long value) {
+        int bufferIdx = (int) (offset / MAX_SEGMENT_SIZE);
+        int internalIdx = (int) (offset % MAX_SEGMENT_SIZE);
+        buffers.get(bufferIdx).setLong(internalIdx * SIZE_OF_LONG, value);
+    }
+
+    public long readLong(long offset) {
+        int bufferIdx = (int) (offset / MAX_SEGMENT_SIZE);
+        int internalIdx = (int) (offset % MAX_SEGMENT_SIZE);
+        return buffers.get(bufferIdx).getLong(internalIdx * SIZE_OF_LONG);
+    }
+
+    public void increaseCapacity() {
+        if (capacity < MAX_SEGMENT_SIZE) {

Review Comment:
   And the last segment size < MAX_SEGMENT_SIZE will also affect the `writeLong` and `readLong` method because we always uses the `MAX_SEGMENT_SIZE` to calculate the index.



##########
pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/SegmentedLongArray.java:
##########
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.collections;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.concurrent.NotThreadSafe;
+import lombok.Getter;
+
+@NotThreadSafe
+public class SegmentedLongArray implements AutoCloseable {
+
+    private static final int SIZE_OF_LONG = 8;
+
+    private static final int MAX_SEGMENT_SIZE = 2 * 1024 * 1024; // 2M longs -> 16 MB
+    private final List<ByteBuf> buffers = new ArrayList<>();
+
+    @Getter
+    private final long initialCapacity;
+
+    @Getter
+    private long capacity;
+
+    public SegmentedLongArray(long initialCapacity) {
+        long remainingToAdd = initialCapacity;
+
+        while (remainingToAdd > 0) {
+            int sizeToAdd = (int) Math.min(remainingToAdd, MAX_SEGMENT_SIZE);
+            ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(sizeToAdd * SIZE_OF_LONG);
+            buffer.writerIndex(sizeToAdd * SIZE_OF_LONG);
+            buffers.add(buffer);
+            remainingToAdd -= sizeToAdd;
+        }
+
+        this.initialCapacity = initialCapacity;
+        this.capacity = this.initialCapacity;
+    }
+
+    public void writeLong(long offset, long value) {
+        int bufferIdx = (int) (offset / MAX_SEGMENT_SIZE);
+        int internalIdx = (int) (offset % MAX_SEGMENT_SIZE);
+        buffers.get(bufferIdx).setLong(internalIdx * SIZE_OF_LONG, value);
+    }
+
+    public long readLong(long offset) {
+        int bufferIdx = (int) (offset / MAX_SEGMENT_SIZE);
+        int internalIdx = (int) (offset % MAX_SEGMENT_SIZE);
+        return buffers.get(bufferIdx).getLong(internalIdx * SIZE_OF_LONG);
+    }
+
+    public void increaseCapacity() {
+        if (capacity < MAX_SEGMENT_SIZE) {

Review Comment:
   The size of the last segment might also < MAX_SEGMENT_SIZE, we need to consider to update the capacity of the buffer?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #16490: Fixed error when delayed messages trackers state grows to >1.5GB

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #16490:
URL: https://github.com/apache/pulsar/pull/16490#discussion_r917274078


##########
pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/SegmentedLongArray.java:
##########
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.collections;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.concurrent.NotThreadSafe;
+import lombok.Getter;
+
+@NotThreadSafe
+public class SegmentedLongArray implements AutoCloseable {
+
+    private static final int SIZE_OF_LONG = 8;
+
+    private static final int MAX_SEGMENT_SIZE = 2 * 1024 * 1024; // 2M longs -> 16 MB
+    private final List<ByteBuf> buffers = new ArrayList<>();
+
+    @Getter
+    private final long initialCapacity;
+
+    @Getter
+    private long capacity;
+
+    public SegmentedLongArray(long initialCapacity) {
+        long remainingToAdd = initialCapacity;
+
+        while (remainingToAdd > 0) {
+            int sizeToAdd = (int) Math.min(remainingToAdd, MAX_SEGMENT_SIZE);
+            ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(sizeToAdd * SIZE_OF_LONG);
+            buffer.writerIndex(sizeToAdd * SIZE_OF_LONG);
+            buffers.add(buffer);
+            remainingToAdd -= sizeToAdd;
+        }
+
+        this.initialCapacity = initialCapacity;
+        this.capacity = this.initialCapacity;
+    }
+
+    public void writeLong(long offset, long value) {
+        int bufferIdx = (int) (offset / MAX_SEGMENT_SIZE);
+        int internalIdx = (int) (offset % MAX_SEGMENT_SIZE);
+        buffers.get(bufferIdx).setLong(internalIdx * SIZE_OF_LONG, value);
+    }
+
+    public long readLong(long offset) {
+        int bufferIdx = (int) (offset / MAX_SEGMENT_SIZE);
+        int internalIdx = (int) (offset % MAX_SEGMENT_SIZE);
+        return buffers.get(bufferIdx).getLong(internalIdx * SIZE_OF_LONG);
+    }
+
+    public void increaseCapacity() {
+        if (capacity < MAX_SEGMENT_SIZE) {

Review Comment:
   I think here is possible to add a buffer which is not with `MAX_SEGMENT_SIZE` ? https://github.com/apache/pulsar/pull/16490/files#diff-ee5391a05253205e8b9bb0f52faa7397326ec0687513204b3bf1b5244d2b8504R45-R50



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] merlimat merged pull request #16490: [fix][broker] Fixed error when delayed messages trackers state grows to >1.5GB

Posted by GitBox <gi...@apache.org>.
merlimat merged PR #16490:
URL: https://github.com/apache/pulsar/pull/16490


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org