You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/07 00:48:02 UTC

[GitHub] [pulsar] merlimat opened a new pull request, #15955: PIP-174: New managed ledger entry cache implementation

merlimat opened a new pull request, #15955:
URL: https://github.com/apache/pulsar/pull/15955

   ### Motivation
   
   PIP-174: #15954
   
   Provide new `SharedEntryCacheManagerImpl` implementation


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] merlimat commented on a diff in pull request #15955: PIP-174: New managed ledger entry cache implementation

Posted by GitBox <gi...@apache.org>.
merlimat commented on code in PR #15955:
URL: https://github.com/apache/pulsar/pull/15955#discussion_r903958286


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/SharedCacheSegment.java:
##########
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl.cache;
+
+import io.netty.buffer.ByteBuf;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
+
+class SharedCacheSegment implements AutoCloseable {
+
+    private final ByteBuf cacheBuffer;
+    private final AtomicInteger currentOffset = new AtomicInteger();
+    private final ConcurrentLongLongPairHashMap index;
+    private final int segmentSize;
+
+    private static final int ALIGN_64_MASK = ~(64 - 1);
+
+    SharedCacheSegment(int segmentSize) {
+        this.segmentSize = segmentSize;
+        this.cacheBuffer = PulsarByteBufAllocator.DEFAULT.buffer(segmentSize, segmentSize);
+        this.index = ConcurrentLongLongPairHashMap.newBuilder()
+                // We are going to often clear() the map, with the expectation that it's going to get filled again
+                // immediately after. In these conditions it does not make sense to shrink it each time.
+                .autoShrink(false)
+                .concurrencyLevel(Runtime.getRuntime().availableProcessors() * 2)
+                .build();
+    }
+
+    boolean insert(long ledgerId, long entryId, ByteBuf entry) {
+        int entrySize = entry.readableBytes();
+        int alignedSize = align64(entrySize);
+        int offset = currentOffset.getAndAdd(alignedSize);
+
+        if (offset + entrySize > segmentSize) {
+            // The segment is full
+            return false;
+        } else {
+            // Copy entry into read cache segment
+            cacheBuffer.setBytes(offset, entry, entry.readerIndex(), entry.readableBytes());
+            long value = offset << 32 | entrySize;
+            index.put(ledgerId, entryId, value, 0);
+            return true;
+        }
+    }
+
+    public ByteBuf get(long ledgerId, long entryId) {
+        long value = index.getFirstValue(ledgerId, entryId);
+        if (value >= 0) {
+            int offset = (int) (value >> 32);
+            int entryLen = (int) value;
+
+            ByteBuf entry = PulsarByteBufAllocator.DEFAULT.buffer(entryLen, entryLen);

Review Comment:
   Yes, we could return a "retained slice" (a `ByteBuf` that increments the ref-count and points to a portion of the original buffer) and avoid the copy on the read path. 
   The problem would be that this buffer could stay alive for an indefinite amount of time, in the case of some consumer connections being slow. With this, we'd be retaining a whole 1GB buffer even if a small message is pending on a TCP connection, and we cannot just overwrite the old cache segment when rotating because the reader could still be there.
   
   Since we already have a flag to control the copy/not-copy of the cache, another approach I was thinking of was to keep maps of the original ByteBuf (so that we also eliminate the copy on insertion in the cache). 
   We still do the rotation based on rotating the segments, where each segment has its own hash map.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on pull request #15955: PIP-174: New managed ledger entry cache implementation

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on PR #15955:
URL: https://github.com/apache/pulsar/pull/15955#issuecomment-1204921351

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #15955: PIP-174: New managed ledger entry cache implementation

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #15955:
URL: https://github.com/apache/pulsar/pull/15955#discussion_r937320243


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/SharedCacheSegmentBufferRefCount.java:
##########
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl.cache;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.IllegalReferenceCountException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.pulsar.common.util.collections.ConcurrentLongPairObjectHashMap;
+
+class SharedCacheSegmentBufferRefCount implements SharedCacheSegment {
+
+    private final AtomicInteger currentSize = new AtomicInteger();
+    private final ConcurrentLongPairObjectHashMap<ByteBuf> index;
+    private final int segmentSize;
+
+    SharedCacheSegmentBufferRefCount(int segmentSize) {
+        this.segmentSize = segmentSize;
+        this.index = ConcurrentLongPairObjectHashMap.<ByteBuf>newBuilder()
+                // We are going to often clear() the map, with the expectation that it's going to get filled again
+                // immediately after. In these conditions it does not make sense to shrink it each time.
+                .autoShrink(false)
+                .concurrencyLevel(Runtime.getRuntime().availableProcessors() * 2)
+                .build();
+    }
+
+    @Override
+    public boolean insert(long ledgerId, long entryId, ByteBuf entry) {
+        int newSize = currentSize.addAndGet(entry.readableBytes());
+
+        if (newSize > segmentSize) {
+            // The segment is full
+            return false;
+        } else {
+            // Insert entry into read cache segment
+            ByteBuf oldValue = index.putIfAbsent(ledgerId, entryId, entry.retain());

Review Comment:
   This `Bytebuf` has risk changed by other operations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #15955: PIP-174: New managed ledger entry cache implementation

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #15955:
URL: https://github.com/apache/pulsar/pull/15955#issuecomment-1148077237

   @merlimat:Thanks for your contribution. For this PR, do we need to update docs?
   (The [PR template contains info about doc](https://github.com/apache/pulsar/blob/master/.github/PULL_REQUEST_TEMPLATE.md#documentation), which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] asafm commented on a diff in pull request #15955: PIP-174: New managed ledger entry cache implementation

Posted by GitBox <gi...@apache.org>.
asafm commented on code in PR #15955:
URL: https://github.com/apache/pulsar/pull/15955#discussion_r901138209


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/SharedCacheSegment.java:
##########
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl.cache;
+
+import io.netty.buffer.ByteBuf;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
+
+class SharedCacheSegment implements AutoCloseable {
+
+    private final ByteBuf cacheBuffer;
+    private final AtomicInteger currentOffset = new AtomicInteger();
+    private final ConcurrentLongLongPairHashMap index;
+    private final int segmentSize;
+
+    private static final int ALIGN_64_MASK = ~(64 - 1);
+
+    SharedCacheSegment(int segmentSize) {
+        this.segmentSize = segmentSize;
+        this.cacheBuffer = PulsarByteBufAllocator.DEFAULT.buffer(segmentSize, segmentSize);
+        this.index = ConcurrentLongLongPairHashMap.newBuilder()
+                // We are going to often clear() the map, with the expectation that it's going to get filled again
+                // immediately after. In these conditions it does not make sense to shrink it each time.
+                .autoShrink(false)
+                .concurrencyLevel(Runtime.getRuntime().availableProcessors() * 2)
+                .build();
+    }
+
+    boolean insert(long ledgerId, long entryId, ByteBuf entry) {
+        int entrySize = entry.readableBytes();
+        int alignedSize = align64(entrySize);
+        int offset = currentOffset.getAndAdd(alignedSize);
+
+        if (offset + entrySize > segmentSize) {
+            // The segment is full
+            return false;
+        } else {
+            // Copy entry into read cache segment
+            cacheBuffer.setBytes(offset, entry, entry.readerIndex(), entry.readableBytes());
+            long value = offset << 32 | entrySize;
+            index.put(ledgerId, entryId, value, 0);
+            return true;
+        }
+    }
+
+    public ByteBuf get(long ledgerId, long entryId) {
+        long value = index.getFirstValue(ledgerId, entryId);
+        if (value >= 0) {
+            int offset = (int) (value >> 32);
+            int entryLen = (int) value;
+
+            ByteBuf entry = PulsarByteBufAllocator.DEFAULT.buffer(entryLen, entryLen);

Review Comment:
   Since it's draft PR, I'm writing here an idea that crossed my mind.
   On each get, we pay the penalty of creating ByteBuf, both heap object and direct memory allocation, then copying.
   If would return a ByteBuf which is a linked ByteBuf (view) to the original ByteBuf?
   It's still valid as long as we don't call clear().
   Perhaps we can maintain an ever-increasing version number, which upon clear we increase it.
   We can return a CachedByteBuf, which has a link to the cache and version it was cut from. It version got bigger, it means it's invalidated and can't be used anymore. 
   CachedByteBuf also be pooled if needed, since they are just long and a ByteBuf.
   Just an idea
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] merlimat commented on a diff in pull request #15955: PIP-174: New managed ledger entry cache implementation

Posted by GitBox <gi...@apache.org>.
merlimat commented on code in PR #15955:
URL: https://github.com/apache/pulsar/pull/15955#discussion_r903997873


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/SharedCacheSegment.java:
##########
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl.cache;
+
+import io.netty.buffer.ByteBuf;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
+
+class SharedCacheSegment implements AutoCloseable {
+
+    private final ByteBuf cacheBuffer;
+    private final AtomicInteger currentOffset = new AtomicInteger();
+    private final ConcurrentLongLongPairHashMap index;
+    private final int segmentSize;
+
+    private static final int ALIGN_64_MASK = ~(64 - 1);
+
+    SharedCacheSegment(int segmentSize) {
+        this.segmentSize = segmentSize;
+        this.cacheBuffer = PulsarByteBufAllocator.DEFAULT.buffer(segmentSize, segmentSize);
+        this.index = ConcurrentLongLongPairHashMap.newBuilder()
+                // We are going to often clear() the map, with the expectation that it's going to get filled again
+                // immediately after. In these conditions it does not make sense to shrink it each time.
+                .autoShrink(false)
+                .concurrencyLevel(Runtime.getRuntime().availableProcessors() * 2)
+                .build();
+    }
+
+    boolean insert(long ledgerId, long entryId, ByteBuf entry) {
+        int entrySize = entry.readableBytes();
+        int alignedSize = align64(entrySize);
+        int offset = currentOffset.getAndAdd(alignedSize);
+
+        if (offset + entrySize > segmentSize) {
+            // The segment is full
+            return false;
+        } else {
+            // Copy entry into read cache segment
+            cacheBuffer.setBytes(offset, entry, entry.readerIndex(), entry.readableBytes());
+            long value = offset << 32 | entrySize;
+            index.put(ledgerId, entryId, value, 0);
+            return true;
+        }
+    }
+
+    public ByteBuf get(long ledgerId, long entryId) {
+        long value = index.getFirstValue(ledgerId, entryId);
+        if (value >= 0) {
+            int offset = (int) (value >> 32);
+            int entryLen = (int) value;
+
+            ByteBuf entry = PulsarByteBufAllocator.DEFAULT.buffer(entryLen, entryLen);

Review Comment:
   Yes, just incrementing the ref-count. It's similar to what we are currently doing, though without the overly-complex logic for cache eviction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on pull request #15955: PIP-174: New managed ledger entry cache implementation

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on PR #15955:
URL: https://github.com/apache/pulsar/pull/15955#issuecomment-1204877704

   The test `testPulsarSinkDLQ` is flaky. It work fine at my local env. So, we can try re-run `broker group2`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #15955: PIP-174: New managed ledger entry cache implementation

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #15955:
URL: https://github.com/apache/pulsar/pull/15955#issuecomment-1148077242

   @merlimat:Thanks for your contribution. For this PR, do we need to update docs?
   (The [PR template contains info about doc](https://github.com/apache/pulsar/blob/master/.github/PULL_REQUEST_TEMPLATE.md#documentation), which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] PIP-174: New managed ledger entry cache implementation [pulsar]

Posted by "lhotari (via GitHub)" <gi...@apache.org>.
lhotari commented on code in PR #15955:
URL: https://github.com/apache/pulsar/pull/15955#discussion_r1553034084


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/SharedEntryCacheImpl.java:
##########
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl.cache;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.api.BKException;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
+import org.apache.commons.lang3.tuple.Pair;
+
+@Slf4j
+class SharedEntryCacheImpl implements EntryCache {
+
+    private final SharedEntryCacheManagerImpl entryCacheManager;
+    private final ManagedLedgerImpl ml;
+    private final ManagedLedgerInterceptor interceptor;
+
+    SharedEntryCacheImpl(ManagedLedgerImpl ml, SharedEntryCacheManagerImpl entryCacheManager) {
+        this.ml = ml;
+        this.entryCacheManager = entryCacheManager;
+        this.interceptor = ml.getManagedLedgerInterceptor();
+    }
+
+    @Override
+    public String getName() {
+        return ml.getName();
+    }
+
+    @Override
+    public boolean insert(EntryImpl entry) {
+        return entryCacheManager.insert(entry);
+    }
+
+    @Override
+    public void invalidateEntries(PositionImpl lastPosition) {
+        // No-Op. The cache invalidation is based only on rotating the segment buffers
+    }
+
+    @Override
+    public void invalidateEntriesBeforeTimestamp(long timestamp) {
+        // No-Op. The cache invalidation is based only on rotating the segment buffers
+    }
+
+    @Override
+    public void invalidateAllEntries(long ledgerId) {
+        // No-Op. The cache invalidation is based only on rotating the segment buffers
+    }
+
+    @Override
+    public void clear() {
+        // No-Op. The cache invalidation is based only on rotating the segment buffers
+    }
+
+    private static final Pair<Integer, Long> NO_EVICTION = Pair.of(0, 0L);
+
+    @Override
+    public Pair<Integer, Long> evictEntries(long sizeToFree) {
+        return NO_EVICTION;
+    }
+
+    @Override
+    public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader,
+                               AsyncCallbacks.ReadEntriesCallback callback, Object ctx) {
+        final long ledgerId = lh.getId();
+        final int entriesToRead = (int) (lastEntry - firstEntry) + 1;
+
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Reading entries range ledger {}: {} to {}", ml.getName(), ledgerId, firstEntry, lastEntry);
+        }
+
+        List<Entry> cachedEntries = new ArrayList<>(entriesToRead);
+        long totalCachedSize = entryCacheManager.getRange(ledgerId, firstEntry, lastEntry, cachedEntries);
+
+        if (cachedEntries.size() == entriesToRead) {
+            final List<Entry> entriesToReturn = Lists.newArrayListWithExpectedSize(entriesToRead);
+            // All entries found in cache
+            for (Entry entry : cachedEntries) {
+                entriesToReturn.add(EntryImpl.create((EntryImpl) entry));
+                entry.release();
+            }
+            // All entries found in cache
+            entryCacheManager.getFactoryMBean().recordCacheHits(entriesToReturn.size(), totalCachedSize);
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Ledger {} -- Found in cache entries: {}-{}", ml.getName(), ledgerId, firstEntry,
+                        lastEntry);
+            }
+            callback.readEntriesComplete(entriesToReturn, ctx);
+
+        } else {
+            if (!cachedEntries.isEmpty()) {
+                cachedEntries.forEach(entry -> entry.release());
+            }
+
+            // Read all the entries from bookkeeper
+            lh.readAsync(firstEntry, lastEntry).thenAcceptAsync(
+                    ledgerEntries -> {
+                        requireNonNull(ml.getName());
+                        requireNonNull(ml.getExecutor());
+

Review Comment:
   I guess PendingReadsManager should be adapted and used here? It was introduced by  #17241 and resulted in huge improvements.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Jason918 commented on a diff in pull request #15955: PIP-174: New managed ledger entry cache implementation

Posted by GitBox <gi...@apache.org>.
Jason918 commented on code in PR #15955:
URL: https://github.com/apache/pulsar/pull/15955#discussion_r937552499


##########
pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongLongPairHashMap.java:
##########
@@ -204,6 +204,12 @@ public LongPair get(long key1, long key2) {
         return getSection(h).get(key1, key2, (int) h);
     }
 
+    public long getFirstValue(long key1, long key2) {

Review Comment:
   Better to add some doc here. The method name is a bit confusing.
   ```suggestion
       /**
        * @return get(key1, key2).first;
        */
       public long getFirstValue(long key1, long key2) {
   ```



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/SharedCacheSegmentBufferRefCount.java:
##########
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl.cache;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.IllegalReferenceCountException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.pulsar.common.util.collections.ConcurrentLongPairObjectHashMap;
+
+class SharedCacheSegmentBufferRefCount implements SharedCacheSegment {
+
+    private final AtomicInteger currentSize = new AtomicInteger();
+    private final ConcurrentLongPairObjectHashMap<ByteBuf> index;
+    private final int segmentSize;
+
+    SharedCacheSegmentBufferRefCount(int segmentSize) {
+        this.segmentSize = segmentSize;
+        this.index = ConcurrentLongPairObjectHashMap.<ByteBuf>newBuilder()
+                // We are going to often clear() the map, with the expectation that it's going to get filled again
+                // immediately after. In these conditions it does not make sense to shrink it each time.
+                .autoShrink(false)
+                .concurrencyLevel(Runtime.getRuntime().availableProcessors() * 2)
+                .build();
+    }
+
+    @Override
+    public boolean insert(long ledgerId, long entryId, ByteBuf entry) {
+        int newSize = currentSize.addAndGet(entry.readableBytes());
+
+        if (newSize > segmentSize) {
+            // The segment is full

Review Comment:
   Should reduce `currentSize` by `entry.readableBytes()`, this insertion is failed.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/SharedEntryCacheManagerImpl.java:
##########
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl.cache;
+
+import io.netty.buffer.ByteBuf;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.locks.StampedLock;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryMBeanImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+
+@Slf4j
+public class SharedEntryCacheManagerImpl implements EntryCacheManager {
+
+    private final ManagedLedgerFactoryConfig config;
+    private final ManagedLedgerFactoryMBeanImpl factoryMBean;
+    private final List<SharedCacheSegment> segments = new ArrayList<>();
+    private int currentSegmentIdx = 0;
+    private final int segmentSize;
+    private final int segmentsCount;
+
+    private final StampedLock lock = new StampedLock();
+
+    private static final int DEFAULT_MAX_SEGMENT_SIZE = 1 * 1024 * 1024 * 1024;
+
+    public SharedEntryCacheManagerImpl(ManagedLedgerFactoryImpl factory) {
+        this.config = factory.getConfig();
+        this.factoryMBean = factory.getMbean();
+        long maxCacheSize = config.getMaxCacheSize();
+        if (maxCacheSize > 0) {
+            this.segmentsCount = Math.max(2, (int) (maxCacheSize / DEFAULT_MAX_SEGMENT_SIZE));
+            this.segmentSize = (int) (maxCacheSize / segmentsCount);
+
+            for (int i = 0; i < segmentsCount; i++) {
+                if (config.isCopyEntriesInCache()) {
+                    segments.add(new SharedCacheSegmentBufferCopy(segmentSize));
+                } else {
+                    segments.add(new SharedCacheSegmentBufferRefCount(segmentSize));
+                }
+            }
+        } else {
+            this.segmentsCount = 0;
+            this.segmentSize = 0;
+        }
+    }
+
+    ManagedLedgerFactoryMBeanImpl getFactoryMBean() {
+        return factoryMBean;
+    }
+
+    @Override
+    public EntryCache getEntryCache(ManagedLedgerImpl ml) {
+        if (getMaxSize() > 0) {
+            return new SharedEntryCacheImpl(ml, this);
+        } else {
+            return new EntryCacheDisabled(ml);
+        }
+    }
+
+    @Override
+    public void removeEntryCache(String name) {
+        // no-op
+    }
+
+    @Override
+    public long getSize() {
+        long totalSize = 0;
+        for (int i = 0; i < segmentsCount; i++) {
+            totalSize += segments.get(i).getSize();
+        }
+        return totalSize;
+    }
+
+    @Override
+    public long getMaxSize() {
+        return config.getMaxCacheSize();
+    }
+
+    @Override
+    public void clear() {
+        segments.forEach(SharedCacheSegment::clear);
+    }
+
+    @Override
+    public void close() {
+        segments.forEach(SharedCacheSegment::close);
+    }
+
+    @Override
+    public void updateCacheSizeAndThreshold(long maxSize) {
+

Review Comment:
   This method should be supported when user update `managedLedgerCacheSizeMB`. We can add an error log here to let user know this.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/SharedCacheSegmentBufferCopy.java:
##########
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl.cache;
+
+import io.netty.buffer.ByteBuf;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
+
+class SharedCacheSegmentBufferCopy implements AutoCloseable, SharedCacheSegment {
+
+    private final ByteBuf cacheBuffer;
+    private final AtomicInteger currentOffset = new AtomicInteger();
+    private final ConcurrentLongLongPairHashMap index;
+    private final int segmentSize;
+
+    private static final int ALIGN_64_MASK = ~(64 - 1);
+
+    SharedCacheSegmentBufferCopy(int segmentSize) {
+        this.segmentSize = segmentSize;
+        this.cacheBuffer = PulsarByteBufAllocator.DEFAULT.buffer(segmentSize, segmentSize);
+        this.cacheBuffer.writerIndex(segmentSize - 1);
+        this.index = ConcurrentLongLongPairHashMap.newBuilder()
+                // We are going to often clear() the map, with the expectation that it's going to get filled again
+                // immediately after. In these conditions it does not make sense to shrink it each time.
+                .autoShrink(false)
+                .concurrencyLevel(Runtime.getRuntime().availableProcessors() * 8)
+                .build();
+    }
+
+    @Override
+    public boolean insert(long ledgerId, long entryId, ByteBuf entry) {
+        int entrySize = entry.readableBytes();
+        int alignedSize = align64(entrySize);
+        int offset = currentOffset.getAndAdd(alignedSize);
+
+        if (offset + entrySize > segmentSize) {
+            // The segment is full

Review Comment:
   Reset `currentOffset` back to `offset` ?



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/SharedCacheSegmentBufferRefCount.java:
##########
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl.cache;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.IllegalReferenceCountException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.pulsar.common.util.collections.ConcurrentLongPairObjectHashMap;
+
+class SharedCacheSegmentBufferRefCount implements SharedCacheSegment {
+
+    private final AtomicInteger currentSize = new AtomicInteger();
+    private final ConcurrentLongPairObjectHashMap<ByteBuf> index;
+    private final int segmentSize;
+
+    SharedCacheSegmentBufferRefCount(int segmentSize) {
+        this.segmentSize = segmentSize;
+        this.index = ConcurrentLongPairObjectHashMap.<ByteBuf>newBuilder()
+                // We are going to often clear() the map, with the expectation that it's going to get filled again
+                // immediately after. In these conditions it does not make sense to shrink it each time.
+                .autoShrink(false)
+                .concurrencyLevel(Runtime.getRuntime().availableProcessors() * 2)
+                .build();
+    }
+
+    @Override
+    public boolean insert(long ledgerId, long entryId, ByteBuf entry) {
+        int newSize = currentSize.addAndGet(entry.readableBytes());
+
+        if (newSize > segmentSize) {
+            // The segment is full
+            return false;
+        } else {
+            // Insert entry into read cache segment
+            ByteBuf oldValue = index.putIfAbsent(ledgerId, entryId, entry.retain());
+            if (oldValue != null) {
+                entry.release();
+                return false;
+            } else {
+                return true;
+            }
+        }
+    }
+
+    @Override
+    public ByteBuf get(long ledgerId, long entryId) {
+        ByteBuf entry = index.get(ledgerId, entryId);
+        if (entry != null) {
+            try {
+                return entry.retain();
+            } catch (IllegalReferenceCountException e) {
+                // Entry was removed between the get() and the retain() calls
+                return null;
+            }
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public int getSize() {
+        return currentSize.get();
+    }
+
+    @Override
+    public void close() {
+        clear();
+    }
+
+    @Override
+    public void clear() {
+        index.forEach((ledgerId, entryId, e) -> e.release());
+        index.clear();

Review Comment:
   Reset `currentSize` to 0.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/SharedEntryCacheManagerImpl.java:
##########
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl.cache;
+
+import io.netty.buffer.ByteBuf;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.locks.StampedLock;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryMBeanImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+
+@Slf4j
+public class SharedEntryCacheManagerImpl implements EntryCacheManager {
+
+    private final ManagedLedgerFactoryConfig config;
+    private final ManagedLedgerFactoryMBeanImpl factoryMBean;
+    private final List<SharedCacheSegment> segments = new ArrayList<>();
+    private int currentSegmentIdx = 0;
+    private final int segmentSize;
+    private final int segmentsCount;
+
+    private final StampedLock lock = new StampedLock();
+
+    private static final int DEFAULT_MAX_SEGMENT_SIZE = 1 * 1024 * 1024 * 1024;
+
+    public SharedEntryCacheManagerImpl(ManagedLedgerFactoryImpl factory) {
+        this.config = factory.getConfig();
+        this.factoryMBean = factory.getMbean();
+        long maxCacheSize = config.getMaxCacheSize();
+        if (maxCacheSize > 0) {
+            this.segmentsCount = Math.max(2, (int) (maxCacheSize / DEFAULT_MAX_SEGMENT_SIZE));
+            this.segmentSize = (int) (maxCacheSize / segmentsCount);
+
+            for (int i = 0; i < segmentsCount; i++) {
+                if (config.isCopyEntriesInCache()) {
+                    segments.add(new SharedCacheSegmentBufferCopy(segmentSize));
+                } else {
+                    segments.add(new SharedCacheSegmentBufferRefCount(segmentSize));
+                }
+            }
+        } else {
+            this.segmentsCount = 0;
+            this.segmentSize = 0;
+        }
+    }
+
+    ManagedLedgerFactoryMBeanImpl getFactoryMBean() {
+        return factoryMBean;
+    }
+
+    @Override
+    public EntryCache getEntryCache(ManagedLedgerImpl ml) {
+        if (getMaxSize() > 0) {
+            return new SharedEntryCacheImpl(ml, this);
+        } else {
+            return new EntryCacheDisabled(ml);
+        }
+    }
+
+    @Override
+    public void removeEntryCache(String name) {
+        // no-op
+    }
+
+    @Override
+    public long getSize() {
+        long totalSize = 0;
+        for (int i = 0; i < segmentsCount; i++) {
+            totalSize += segments.get(i).getSize();
+        }
+        return totalSize;
+    }
+
+    @Override
+    public long getMaxSize() {
+        return config.getMaxCacheSize();
+    }
+
+    @Override
+    public void clear() {
+        segments.forEach(SharedCacheSegment::clear);
+    }
+
+    @Override
+    public void close() {
+        segments.forEach(SharedCacheSegment::close);
+    }
+
+    @Override
+    public void updateCacheSizeAndThreshold(long maxSize) {
+
+    }
+
+    @Override
+    public void updateCacheEvictionWatermark(double cacheEvictionWatermark) {
+        // No-Op. We don't use the cache eviction watermark in this implementation
+    }
+
+    @Override
+    public double getCacheEvictionWatermark() {
+        return config.getCacheEvictionWatermark();
+    }
+
+    boolean insert(EntryImpl entry) {
+        int entrySize = entry.getLength();
+
+        if (entrySize > segmentSize) {
+            log.debug("entrySize {} > segmentSize {}, skip update read cache!", entrySize, segmentSize);
+            return false;
+        }
+
+        long stamp = lock.readLock();
+        try {
+            SharedCacheSegment s = segments.get(currentSegmentIdx);
+
+            if (s.insert(entry.getLedgerId(), entry.getEntryId(), entry.getDataBuffer())) {
+                return true;
+            }
+        } finally {
+            lock.unlockRead(stamp);
+        }
+
+        // We could not insert in segment, we to get the write lock and roll-over to
+        // next segment
+        stamp = lock.writeLock();
+
+        try {
+            SharedCacheSegment segment = segments.get(currentSegmentIdx);
+
+            if (segment.insert(entry.getLedgerId(), entry.getEntryId(), entry.getDataBuffer())) {
+                return true;
+            }
+
+            // Roll to next segment
+            currentSegmentIdx = (currentSegmentIdx + 1) % segmentsCount;
+            segment = segments.get(currentSegmentIdx);
+            segment.clear();
+            return segment.insert(entry.getLedgerId(), entry.getEntryId(), entry.getDataBuffer());
+        } finally {
+            lock.unlockWrite(stamp);
+        }
+    }
+
+    EntryImpl get(long ledgerId, long entryId) {
+        long stamp = lock.readLock();
+
+        try {
+            // We need to check all the segments, starting from the current one and looking
+            // backward to minimize the checks for recently inserted entries
+            for (int i = 0; i < segmentsCount; i++) {
+                int segmentIdx = (currentSegmentIdx + (segmentsCount - i)) % segmentsCount;
+
+                ByteBuf res = segments.get(segmentIdx).get(ledgerId, entryId);
+                if (res != null) {
+                    return EntryImpl.create(ledgerId, entryId, res);
+                }
+            }
+        } finally {
+            lock.unlockRead(stamp);
+        }
+
+        return null;
+    }
+
+    long getRange(long ledgerId, long firstEntryId, long lastEntryId, List<Entry> results) {
+        long totalSize = 0;
+        long stamp = lock.readLock();
+
+        try {
+            // We need to check all the segments, starting from the current one and looking
+            // backward to minimize the checks for recently inserted entries
+            long entryId = firstEntryId;
+            for (int i = 0; i < segmentsCount; i++) {
+                int segmentIdx = (currentSegmentIdx + (segmentsCount - i)) % segmentsCount;
+                SharedCacheSegment s = segments.get(segmentIdx);
+
+                for (; entryId <= lastEntryId; entryId++) {
+                    ByteBuf res = s.get(ledgerId, entryId);
+                    if (res != null) {
+                        results.add(EntryImpl.create(ledgerId, entryId, res));
+                        totalSize += res.readableBytes();
+                    } else {
+                        break;
+                    }

Review Comment:
   > Is it possible that the cache of subsequent entries still in this segment? after we get null from this segment, we will move to the next segment.
   
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #15955: PIP-174: New managed ledger entry cache implementation

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #15955:
URL: https://github.com/apache/pulsar/pull/15955#issuecomment-1250176780

   The pr had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #15955: PIP-174: New managed ledger entry cache implementation

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #15955:
URL: https://github.com/apache/pulsar/pull/15955#issuecomment-1193039388

   The pr had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #15955: PIP-174: New managed ledger entry cache implementation

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #15955:
URL: https://github.com/apache/pulsar/pull/15955#discussion_r937446797


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/SharedCacheSegmentBufferRefCount.java:
##########
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl.cache;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.IllegalReferenceCountException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.pulsar.common.util.collections.ConcurrentLongPairObjectHashMap;
+
+class SharedCacheSegmentBufferRefCount implements SharedCacheSegment {
+
+    private final AtomicInteger currentSize = new AtomicInteger();
+    private final ConcurrentLongPairObjectHashMap<ByteBuf> index;
+    private final int segmentSize;
+
+    SharedCacheSegmentBufferRefCount(int segmentSize) {
+        this.segmentSize = segmentSize;
+        this.index = ConcurrentLongPairObjectHashMap.<ByteBuf>newBuilder()
+                // We are going to often clear() the map, with the expectation that it's going to get filled again
+                // immediately after. In these conditions it does not make sense to shrink it each time.
+                .autoShrink(false)
+                .concurrencyLevel(Runtime.getRuntime().availableProcessors() * 2)
+                .build();
+    }
+
+    @Override
+    public boolean insert(long ledgerId, long entryId, ByteBuf entry) {
+        int newSize = currentSize.addAndGet(entry.readableBytes());
+
+        if (newSize > segmentSize) {
+            // The segment is full
+            return false;
+        } else {
+            // Insert entry into read cache segment
+            ByteBuf oldValue = index.putIfAbsent(ledgerId, entryId, entry.retain());

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tisonkun commented on pull request #15955: PIP-174: New managed ledger entry cache implementation

Posted by GitBox <gi...@apache.org>.
tisonkun commented on PR #15955:
URL: https://github.com/apache/pulsar/pull/15955#issuecomment-1344394512

   @merlimat it seems the review has been already done but we conflict a few files here.
   
   Could you rebase the patch onto master so that we can proceed the PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #15955: PIP-174: New managed ledger entry cache implementation

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #15955:
URL: https://github.com/apache/pulsar/pull/15955#issuecomment-1376622941

   The pr had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on pull request #15955: PIP-174: New managed ledger entry cache implementation

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on PR #15955:
URL: https://github.com/apache/pulsar/pull/15955#issuecomment-1203835236

   Checkstyle failed @merlimat 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #15955: PIP-174: New managed ledger entry cache implementation

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #15955:
URL: https://github.com/apache/pulsar/pull/15955#discussion_r946798736


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/SharedCacheSegmentBufferRefCount.java:
##########
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl.cache;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.IllegalReferenceCountException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.pulsar.common.util.collections.ConcurrentLongPairObjectHashMap;
+
+class SharedCacheSegmentBufferRefCount implements SharedCacheSegment {
+
+    private final AtomicInteger currentSize = new AtomicInteger();
+    private final ConcurrentLongPairObjectHashMap<ByteBuf> index;
+    private final int segmentSize;
+
+    SharedCacheSegmentBufferRefCount(int segmentSize) {
+        this.segmentSize = segmentSize;
+        this.index = ConcurrentLongPairObjectHashMap.<ByteBuf>newBuilder()
+                // We are going to often clear() the map, with the expectation that it's going to get filled again
+                // immediately after. In these conditions it does not make sense to shrink it each time.
+                .autoShrink(false)
+                .concurrencyLevel(Runtime.getRuntime().availableProcessors() * 2)
+                .build();
+    }
+
+    @Override
+    public boolean insert(long ledgerId, long entryId, ByteBuf entry) {
+        int newSize = currentSize.addAndGet(entry.readableBytes());
+
+        if (newSize > segmentSize) {
+            // The segment is full
+            return false;
+        } else {
+            // Insert entry into read cache segment
+            ByteBuf oldValue = index.putIfAbsent(ledgerId, entryId, entry.retain());
+            if (oldValue != null) {
+                entry.release();
+                return false;
+            } else {
+                return true;
+            }
+        }
+    }
+
+    @Override
+    public ByteBuf get(long ledgerId, long entryId) {
+        ByteBuf entry = index.get(ledgerId, entryId);
+        if (entry != null) {
+            try {
+                return entry.retain();
+            } catch (IllegalReferenceCountException e) {
+                // Entry was removed between the get() and the retain() calls
+                return null;
+            }
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public int getSize() {
+        return currentSize.get();
+    }
+
+    @Override
+    public void close() {
+        clear();
+    }
+
+    @Override
+    public void clear() {
+        index.forEach((ledgerId, entryId, e) -> e.release());
+        index.clear();

Review Comment:
   fixed



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/SharedCacheSegmentBufferRefCount.java:
##########
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl.cache;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.IllegalReferenceCountException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.pulsar.common.util.collections.ConcurrentLongPairObjectHashMap;
+
+class SharedCacheSegmentBufferRefCount implements SharedCacheSegment {
+
+    private final AtomicInteger currentSize = new AtomicInteger();
+    private final ConcurrentLongPairObjectHashMap<ByteBuf> index;
+    private final int segmentSize;
+
+    SharedCacheSegmentBufferRefCount(int segmentSize) {
+        this.segmentSize = segmentSize;
+        this.index = ConcurrentLongPairObjectHashMap.<ByteBuf>newBuilder()
+                // We are going to often clear() the map, with the expectation that it's going to get filled again
+                // immediately after. In these conditions it does not make sense to shrink it each time.
+                .autoShrink(false)
+                .concurrencyLevel(Runtime.getRuntime().availableProcessors() * 2)
+                .build();
+    }
+
+    @Override
+    public boolean insert(long ledgerId, long entryId, ByteBuf entry) {
+        int newSize = currentSize.addAndGet(entry.readableBytes());
+
+        if (newSize > segmentSize) {
+            // The segment is full

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #15955: PIP-174: New managed ledger entry cache implementation

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #15955:
URL: https://github.com/apache/pulsar/pull/15955#discussion_r946799085


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/SharedCacheSegmentBufferCopy.java:
##########
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl.cache;
+
+import io.netty.buffer.ByteBuf;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
+
+class SharedCacheSegmentBufferCopy implements AutoCloseable, SharedCacheSegment {
+
+    private final ByteBuf cacheBuffer;
+    private final AtomicInteger currentOffset = new AtomicInteger();
+    private final ConcurrentLongLongPairHashMap index;
+    private final int segmentSize;
+
+    private static final int ALIGN_64_MASK = ~(64 - 1);
+
+    SharedCacheSegmentBufferCopy(int segmentSize) {
+        this.segmentSize = segmentSize;
+        this.cacheBuffer = PulsarByteBufAllocator.DEFAULT.buffer(segmentSize, segmentSize);
+        this.cacheBuffer.writerIndex(segmentSize - 1);
+        this.index = ConcurrentLongLongPairHashMap.newBuilder()
+                // We are going to often clear() the map, with the expectation that it's going to get filled again
+                // immediately after. In these conditions it does not make sense to shrink it each time.
+                .autoShrink(false)
+                .concurrencyLevel(Runtime.getRuntime().availableProcessors() * 8)
+                .build();
+    }
+
+    @Override
+    public boolean insert(long ledgerId, long entryId, ByteBuf entry) {
+        int entrySize = entry.readableBytes();
+        int alignedSize = align64(entrySize);
+        int offset = currentOffset.getAndAdd(alignedSize);
+
+        if (offset + entrySize > segmentSize) {
+            // The segment is full

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] asafm commented on a diff in pull request #15955: PIP-174: New managed ledger entry cache implementation

Posted by GitBox <gi...@apache.org>.
asafm commented on code in PR #15955:
URL: https://github.com/apache/pulsar/pull/15955#discussion_r903996177


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/SharedCacheSegment.java:
##########
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl.cache;
+
+import io.netty.buffer.ByteBuf;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
+
+class SharedCacheSegment implements AutoCloseable {
+
+    private final ByteBuf cacheBuffer;
+    private final AtomicInteger currentOffset = new AtomicInteger();
+    private final ConcurrentLongLongPairHashMap index;
+    private final int segmentSize;
+
+    private static final int ALIGN_64_MASK = ~(64 - 1);
+
+    SharedCacheSegment(int segmentSize) {
+        this.segmentSize = segmentSize;
+        this.cacheBuffer = PulsarByteBufAllocator.DEFAULT.buffer(segmentSize, segmentSize);
+        this.index = ConcurrentLongLongPairHashMap.newBuilder()
+                // We are going to often clear() the map, with the expectation that it's going to get filled again
+                // immediately after. In these conditions it does not make sense to shrink it each time.
+                .autoShrink(false)
+                .concurrencyLevel(Runtime.getRuntime().availableProcessors() * 2)
+                .build();
+    }
+
+    boolean insert(long ledgerId, long entryId, ByteBuf entry) {
+        int entrySize = entry.readableBytes();
+        int alignedSize = align64(entrySize);
+        int offset = currentOffset.getAndAdd(alignedSize);
+
+        if (offset + entrySize > segmentSize) {
+            // The segment is full
+            return false;
+        } else {
+            // Copy entry into read cache segment
+            cacheBuffer.setBytes(offset, entry, entry.readerIndex(), entry.readableBytes());
+            long value = offset << 32 | entrySize;
+            index.put(ledgerId, entryId, value, 0);
+            return true;
+        }
+    }
+
+    public ByteBuf get(long ledgerId, long entryId) {
+        long value = index.getFirstValue(ledgerId, entryId);
+        if (value >= 0) {
+            int offset = (int) (value >> 32);
+            int entryLen = (int) value;
+
+            ByteBuf entry = PulsarByteBufAllocator.DEFAULT.buffer(entryLen, entryLen);

Review Comment:
   >Since we already have a flag to control the copy/not-copy of the cache, another approach I was thinking of was to keep maps of the original ByteBuf (so that we also eliminate the copy on insertion in the cache).
   We still do the rotation based on rotating the segments, where each segment has its own hash map.
   I didn't understand that part. What do you mean by maps of original bytebuf?



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/SharedCacheSegment.java:
##########
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl.cache;
+
+import io.netty.buffer.ByteBuf;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
+
+class SharedCacheSegment implements AutoCloseable {
+
+    private final ByteBuf cacheBuffer;
+    private final AtomicInteger currentOffset = new AtomicInteger();
+    private final ConcurrentLongLongPairHashMap index;
+    private final int segmentSize;
+
+    private static final int ALIGN_64_MASK = ~(64 - 1);
+
+    SharedCacheSegment(int segmentSize) {
+        this.segmentSize = segmentSize;
+        this.cacheBuffer = PulsarByteBufAllocator.DEFAULT.buffer(segmentSize, segmentSize);
+        this.index = ConcurrentLongLongPairHashMap.newBuilder()
+                // We are going to often clear() the map, with the expectation that it's going to get filled again
+                // immediately after. In these conditions it does not make sense to shrink it each time.
+                .autoShrink(false)
+                .concurrencyLevel(Runtime.getRuntime().availableProcessors() * 2)
+                .build();
+    }
+
+    boolean insert(long ledgerId, long entryId, ByteBuf entry) {
+        int entrySize = entry.readableBytes();
+        int alignedSize = align64(entrySize);
+        int offset = currentOffset.getAndAdd(alignedSize);
+
+        if (offset + entrySize > segmentSize) {
+            // The segment is full
+            return false;
+        } else {
+            // Copy entry into read cache segment
+            cacheBuffer.setBytes(offset, entry, entry.readerIndex(), entry.readableBytes());
+            long value = offset << 32 | entrySize;
+            index.put(ledgerId, entryId, value, 0);
+            return true;
+        }
+    }
+
+    public ByteBuf get(long ledgerId, long entryId) {
+        long value = index.getFirstValue(ledgerId, entryId);
+        if (value >= 0) {
+            int offset = (int) (value >> 32);
+            int entryLen = (int) value;
+
+            ByteBuf entry = PulsarByteBufAllocator.DEFAULT.buffer(entryLen, entryLen);

Review Comment:
   >Since we already have a flag to control the copy/not-copy of the cache, another approach I was thinking of was to keep maps of the original ByteBuf (so that we also eliminate the copy on insertion in the cache).
   We still do the rotation based on rotating the segments, where each segment has its own hash map.
   
   I didn't understand that part. What do you mean by maps of original bytebuf?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #15955: PIP-174: New managed ledger entry cache implementation

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #15955:
URL: https://github.com/apache/pulsar/pull/15955#discussion_r931762130


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/SharedEntryCacheImpl.java:
##########
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl.cache;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.api.BKException;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
+import org.apache.commons.lang3.tuple.Pair;
+
+@Slf4j
+class SharedEntryCacheImpl implements EntryCache {
+
+    private final SharedEntryCacheManagerImpl entryCacheManager;
+    private final ManagedLedgerImpl ml;
+    private final ManagedLedgerInterceptor interceptor;
+
+    SharedEntryCacheImpl(ManagedLedgerImpl ml, SharedEntryCacheManagerImpl entryCacheManager) {
+        this.ml = ml;
+        this.entryCacheManager = entryCacheManager;
+        this.interceptor = ml.getManagedLedgerInterceptor();
+    }
+
+    @Override
+    public String getName() {
+        return ml.getName();
+    }
+
+    @Override
+    public boolean insert(EntryImpl entry) {
+        return entryCacheManager.insert(entry);
+    }
+
+    @Override
+    public void invalidateEntries(PositionImpl lastPosition) {
+        // No-Op. The cache invalidation is based only on rotating the segment buffers
+    }
+
+    @Override
+    public void invalidateEntriesBeforeTimestamp(long timestamp) {
+        // No-Op. The cache invalidation is based only on rotating the segment buffers
+    }
+
+    @Override
+    public void invalidateAllEntries(long ledgerId) {
+        // No-Op. The cache invalidation is based only on rotating the segment buffers
+    }
+
+    @Override
+    public void clear() {
+        // No-Op. The cache invalidation is based only on rotating the segment buffers
+    }
+
+    private static final Pair<Integer, Long> NO_EVICTION = Pair.of(0, 0L);
+
+    @Override
+    public Pair<Integer, Long> evictEntries(long sizeToFree) {
+        return NO_EVICTION;
+    }
+
+    @Override
+    public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader,
+                               AsyncCallbacks.ReadEntriesCallback callback, Object ctx) {
+        final long ledgerId = lh.getId();
+        final int entriesToRead = (int) (lastEntry - firstEntry) + 1;
+
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Reading entries range ledger {}: {} to {}", ml.getName(), ledgerId, firstEntry, lastEntry);
+        }
+
+        List<Entry> cachedEntries = new ArrayList<>(entriesToRead);
+        long totalCachedSize = entryCacheManager.getRange(ledgerId, firstEntry, lastEntry, cachedEntries);
+
+        if (cachedEntries.size() == entriesToRead) {
+            // All entries found in cache
+            entryCacheManager.getFactoryMBean().recordCacheHits(entriesToRead, totalCachedSize);
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Ledger {} -- Found in cache entries: {}-{}", ml.getName(), ledgerId, firstEntry,
+                        lastEntry);
+            }
+
+            callback.readEntriesComplete(cachedEntries, ctx);
+
+        } else {
+            if (!cachedEntries.isEmpty()) {
+                cachedEntries.forEach(entry -> entry.release());
+            }

Review Comment:
   Looks like we are safe to return part of the data from the cache? I'm not sure if I missed something, a little waste of resources to skip a partially hit cache data. The old implementation also follows this way, so we can also use a separate PR to improve this part if possible.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/SharedEntryCacheManagerImpl.java:
##########
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl.cache;
+
+import io.netty.buffer.ByteBuf;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.locks.StampedLock;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryMBeanImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+
+@Slf4j
+public class SharedEntryCacheManagerImpl implements EntryCacheManager {
+
+    private final ManagedLedgerFactoryConfig config;
+    private final ManagedLedgerFactoryMBeanImpl factoryMBean;
+    private final List<SharedCacheSegment> segments = new ArrayList<>();
+    private int currentSegmentIdx = 0;
+    private final int segmentSize;
+    private final int segmentsCount;
+
+    private final StampedLock lock = new StampedLock();
+
+    private static final int DEFAULT_MAX_SEGMENT_SIZE = 1 * 1024 * 1024 * 1024;
+
+    public SharedEntryCacheManagerImpl(ManagedLedgerFactoryImpl factory) {
+        this.config = factory.getConfig();
+        this.factoryMBean = factory.getMbean();
+        long maxCacheSize = config.getMaxCacheSize();
+        if (maxCacheSize > 0) {
+            this.segmentsCount = Math.max(2, (int) (maxCacheSize / DEFAULT_MAX_SEGMENT_SIZE));
+            this.segmentSize = (int) (maxCacheSize / segmentsCount);
+
+            for (int i = 0; i < segmentsCount; i++) {
+                if (config.isCopyEntriesInCache()) {
+                    segments.add(new SharedCacheSegmentBufferCopy(segmentSize));
+                } else {
+                    segments.add(new SharedCacheSegmentBufferRefCount(segmentSize));
+                }
+            }
+        } else {
+            this.segmentsCount = 0;
+            this.segmentSize = 0;
+        }
+    }
+
+    ManagedLedgerFactoryMBeanImpl getFactoryMBean() {
+        return factoryMBean;
+    }
+
+    @Override
+    public EntryCache getEntryCache(ManagedLedgerImpl ml) {
+        if (getMaxSize() > 0) {
+            return new SharedEntryCacheImpl(ml, this);
+        } else {
+            return new EntryCacheDisabled(ml);
+        }
+    }
+
+    @Override
+    public void removeEntryCache(String name) {
+        // no-op
+    }
+
+    @Override
+    public long getSize() {
+        long totalSize = 0;
+        for (int i = 0; i < segmentsCount; i++) {
+            totalSize += segments.get(i).getSize();
+        }
+        return totalSize;
+    }
+
+    @Override
+    public long getMaxSize() {
+        return config.getMaxCacheSize();
+    }
+
+    @Override
+    public void clear() {
+        segments.forEach(SharedCacheSegment::clear);
+    }
+
+    @Override
+    public void close() {
+        segments.forEach(SharedCacheSegment::close);
+    }
+
+    @Override
+    public void updateCacheSizeAndThreshold(long maxSize) {
+
+    }
+
+    @Override
+    public void updateCacheEvictionWatermark(double cacheEvictionWatermark) {
+        // No-Op. We don't use the cache eviction watermark in this implementation
+    }
+
+    @Override
+    public double getCacheEvictionWatermark() {
+        return config.getCacheEvictionWatermark();
+    }
+
+    boolean insert(EntryImpl entry) {
+        int entrySize = entry.getLength();
+
+        if (entrySize > segmentSize) {
+            log.debug("entrySize {} > segmentSize {}, skip update read cache!", entrySize, segmentSize);
+            return false;
+        }
+
+        long stamp = lock.readLock();
+        try {
+            SharedCacheSegment s = segments.get(currentSegmentIdx);
+
+            if (s.insert(entry.getLedgerId(), entry.getEntryId(), entry.getDataBuffer())) {
+                return true;
+            }
+        } finally {
+            lock.unlockRead(stamp);
+        }
+
+        // We could not insert in segment, we to get the write lock and roll-over to
+        // next segment
+        stamp = lock.writeLock();
+
+        try {
+            SharedCacheSegment segment = segments.get(currentSegmentIdx);
+
+            if (segment.insert(entry.getLedgerId(), entry.getEntryId(), entry.getDataBuffer())) {
+                return true;
+            }
+
+            // Roll to next segment
+            currentSegmentIdx = (currentSegmentIdx + 1) % segmentsCount;
+            segment = segments.get(currentSegmentIdx);
+            segment.clear();
+            return segment.insert(entry.getLedgerId(), entry.getEntryId(), entry.getDataBuffer());
+        } finally {
+            lock.unlockWrite(stamp);
+        }
+    }
+
+    EntryImpl get(long ledgerId, long entryId) {
+        long stamp = lock.readLock();
+
+        try {
+            // We need to check all the segments, starting from the current one and looking
+            // backward to minimize the checks for recently inserted entries
+            for (int i = 0; i < segmentsCount; i++) {
+                int segmentIdx = (currentSegmentIdx + (segmentsCount - i)) % segmentsCount;
+
+                ByteBuf res = segments.get(segmentIdx).get(ledgerId, entryId);
+                if (res != null) {
+                    return EntryImpl.create(ledgerId, entryId, res);
+                }
+            }
+        } finally {
+            lock.unlockRead(stamp);
+        }
+
+        return null;
+    }
+
+    long getRange(long ledgerId, long firstEntryId, long lastEntryId, List<Entry> results) {
+        long totalSize = 0;
+        long stamp = lock.readLock();
+
+        try {
+            // We need to check all the segments, starting from the current one and looking
+            // backward to minimize the checks for recently inserted entries
+            long entryId = firstEntryId;
+            for (int i = 0; i < segmentsCount; i++) {
+                int segmentIdx = (currentSegmentIdx + (segmentsCount - i)) % segmentsCount;
+                SharedCacheSegment s = segments.get(segmentIdx);
+
+                for (; entryId <= lastEntryId; entryId++) {
+                    ByteBuf res = s.get(ledgerId, entryId);
+                    if (res != null) {
+                        results.add(EntryImpl.create(ledgerId, entryId, res));
+                        totalSize += res.readableBytes();
+                    } else {
+                        break;
+                    }

Review Comment:
   Is it possible that the cache of subsequent entries still in this segment? after we get null from this segment, we will move to the next segment.
   



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/SharedEntryCacheImpl.java:
##########
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl.cache;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.api.BKException;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
+import org.apache.commons.lang3.tuple.Pair;
+
+@Slf4j
+class SharedEntryCacheImpl implements EntryCache {
+
+    private final SharedEntryCacheManagerImpl entryCacheManager;
+    private final ManagedLedgerImpl ml;
+    private final ManagedLedgerInterceptor interceptor;
+
+    SharedEntryCacheImpl(ManagedLedgerImpl ml, SharedEntryCacheManagerImpl entryCacheManager) {
+        this.ml = ml;
+        this.entryCacheManager = entryCacheManager;
+        this.interceptor = ml.getManagedLedgerInterceptor();
+    }
+
+    @Override
+    public String getName() {
+        return ml.getName();
+    }
+
+    @Override
+    public boolean insert(EntryImpl entry) {
+        return entryCacheManager.insert(entry);
+    }
+
+    @Override
+    public void invalidateEntries(PositionImpl lastPosition) {
+        // No-Op. The cache invalidation is based only on rotating the segment buffers
+    }
+
+    @Override
+    public void invalidateEntriesBeforeTimestamp(long timestamp) {
+        // No-Op. The cache invalidation is based only on rotating the segment buffers
+    }
+
+    @Override
+    public void invalidateAllEntries(long ledgerId) {
+        // No-Op. The cache invalidation is based only on rotating the segment buffers
+    }
+
+    @Override
+    public void clear() {
+        // No-Op. The cache invalidation is based only on rotating the segment buffers
+    }
+
+    private static final Pair<Integer, Long> NO_EVICTION = Pair.of(0, 0L);
+
+    @Override
+    public Pair<Integer, Long> evictEntries(long sizeToFree) {
+        return NO_EVICTION;
+    }
+
+    @Override
+    public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader,
+                               AsyncCallbacks.ReadEntriesCallback callback, Object ctx) {
+        final long ledgerId = lh.getId();
+        final int entriesToRead = (int) (lastEntry - firstEntry) + 1;
+
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Reading entries range ledger {}: {} to {}", ml.getName(), ledgerId, firstEntry, lastEntry);
+        }
+
+        List<Entry> cachedEntries = new ArrayList<>(entriesToRead);
+        long totalCachedSize = entryCacheManager.getRange(ledgerId, firstEntry, lastEntry, cachedEntries);
+
+        if (cachedEntries.size() == entriesToRead) {
+            // All entries found in cache
+            entryCacheManager.getFactoryMBean().recordCacheHits(entriesToRead, totalCachedSize);
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Ledger {} -- Found in cache entries: {}-{}", ml.getName(), ledgerId, firstEntry,
+                        lastEntry);
+            }
+
+            callback.readEntriesComplete(cachedEntries, ctx);
+
+        } else {
+            if (!cachedEntries.isEmpty()) {
+                cachedEntries.forEach(entry -> entry.release());
+            }
+
+            // Read all the entries from bookkeeper
+            lh.readAsync(firstEntry, lastEntry).thenAcceptAsync(
+                    ledgerEntries -> {
+                        checkNotNull(ml.getName());
+                        checkNotNull(ml.getExecutor());
+
+                        try {
+                            // We got the entries, we need to transform them to a List<> type
+                            long totalSize = 0;
+                            final List<Entry> entriesToReturn = Lists.newArrayListWithExpectedSize(entriesToRead);
+                            for (LedgerEntry e : ledgerEntries) {
+                                EntryImpl entry = EntryCacheManager.create(e, interceptor);
+
+                                entriesToReturn.add(entry);
+                                totalSize += entry.getLength();
+                            }
+
+                            entryCacheManager.getFactoryMBean().recordCacheMiss(entriesToReturn.size(), totalSize);
+                            ml.getMbean().addReadEntriesSample(entriesToReturn.size(), totalSize);
+
+                            callback.readEntriesComplete(entriesToReturn, ctx);
+                        } finally {
+                            ledgerEntries.close();
+                        }
+                    }, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception -> {
+                if (exception instanceof BKException
+                        && ((BKException) exception).getCode() == BKException.Code.TooManyRequestsException) {
+                    callback.readEntriesFailed(createManagedLedgerException(exception), ctx);
+                } else {
+                    ml.invalidateLedgerHandle(lh);
+                    ManagedLedgerException mlException = createManagedLedgerException(exception);
+                    callback.readEntriesFailed(mlException, ctx);
+                }
+                return null;
+            });
+        }
+    }
+
+    @Override
+    public void asyncReadEntry(ReadHandle lh, PositionImpl position, AsyncCallbacks.ReadEntryCallback callback,
+                               Object ctx) {
+        try {
+            asyncReadEntry0(lh, position, callback, ctx);
+        } catch (Throwable t) {
+            log.warn("[{}] Failed to read entries for {}-{}", getName(), lh.getId(), position, t);
+            callback.readEntryFailed(createManagedLedgerException(t), ctx);
+        }
+    }
+
+    private void asyncReadEntry0(ReadHandle lh, PositionImpl position, AsyncCallbacks.ReadEntryCallback callback,
+                                 Object ctx) {
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Reading entry ledger {}: {}", ml.getName(), lh.getId(), position.getEntryId());
+        }
+
+        EntryImpl cachedEntry = entryCacheManager.get(position.getLedgerId(), position.getEntryId());
+
+        if (cachedEntry != null) {
+            entryCacheManager.getFactoryMBean().recordCacheHit(cachedEntry.getLength());
+            callback.readEntryComplete(cachedEntry, ctx);
+        } else {
+            lh.readAsync(position.getEntryId(), position.getEntryId()).thenAcceptAsync(
+                    ledgerEntries -> {
+                        try {
+                            Iterator<LedgerEntry> iterator = ledgerEntries.iterator();
+                            if (iterator.hasNext()) {
+                                LedgerEntry ledgerEntry = iterator.next();
+                                EntryImpl returnEntry = EntryCacheManager.create(ledgerEntry, interceptor);
+
+                                entryCacheManager.getFactoryMBean().recordCacheMiss(1, returnEntry.getLength());
+                                ml.getMbean().addReadEntriesSample(1, returnEntry.getLength());
+                                callback.readEntryComplete(returnEntry, ctx);
+                            } else {
+                                // got an empty sequence
+                                callback.readEntryFailed(new ManagedLedgerException("Could not read given position"),
+                                        ctx);
+                            }
+                        } finally {
+                            ledgerEntries.close();
+                        }
+                    }, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception -> {
+                ml.invalidateLedgerHandle(lh);
+                callback.readEntryFailed(createManagedLedgerException(exception), ctx);
+                return null;
+            });
+        }
+    }
+
+    @Override
+    public long getSize() {
+        return 0;

Review Comment:
   It's better to add some comments here, return 0 here to avoid the cache eviction, and will not expose topic-level cache size metrics since the implementation shared the cache across all topics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #15955: PIP-174: New managed ledger entry cache implementation

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #15955:
URL: https://github.com/apache/pulsar/pull/15955#discussion_r949025813


##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java:
##########
@@ -43,15 +43,26 @@
 import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
 import org.apache.bookkeeper.mledger.impl.cache.EntryCacheDisabled;
 import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
+import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl;
+import org.apache.bookkeeper.mledger.impl.cache.SharedEntryCacheManagerImpl;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.testng.Assert;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
 
     ManagedLedgerImpl ml1;
     ManagedLedgerImpl ml2;
 
+    @DataProvider(name = "EntryCacheManagerClass")
+    public static Object[][] primeNumbers() {

Review Comment:
   nit: primeNumbers ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] aloyszhang commented on a diff in pull request #15955: PIP-174: New managed ledger entry cache implementation

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on code in PR #15955:
URL: https://github.com/apache/pulsar/pull/15955#discussion_r940005038


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/SharedEntryCacheManagerImpl.java:
##########
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl.cache;
+
+import io.netty.buffer.ByteBuf;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.locks.StampedLock;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryMBeanImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+
+@Slf4j
+public class SharedEntryCacheManagerImpl implements EntryCacheManager {
+
+    private final ManagedLedgerFactoryConfig config;
+    private final ManagedLedgerFactoryMBeanImpl factoryMBean;
+    private final List<SharedCacheSegment> segments = new ArrayList<>();
+    private int currentSegmentIdx = 0;
+    private final int segmentSize;
+    private final int segmentsCount;
+
+    private final StampedLock lock = new StampedLock();
+
+    private static final int DEFAULT_MAX_SEGMENT_SIZE = 1 * 1024 * 1024 * 1024;
+
+    public SharedEntryCacheManagerImpl(ManagedLedgerFactoryImpl factory) {
+        this.config = factory.getConfig();
+        this.factoryMBean = factory.getMbean();
+        long maxCacheSize = config.getMaxCacheSize();
+        if (maxCacheSize > 0) {
+            this.segmentsCount = Math.max(2, (int) (maxCacheSize / DEFAULT_MAX_SEGMENT_SIZE));
+            this.segmentSize = (int) (maxCacheSize / segmentsCount);
+
+            for (int i = 0; i < segmentsCount; i++) {
+                if (config.isCopyEntriesInCache()) {
+                    segments.add(new SharedCacheSegmentBufferCopy(segmentSize));
+                } else {
+                    segments.add(new SharedCacheSegmentBufferRefCount(segmentSize));
+                }
+            }
+        } else {
+            this.segmentsCount = 0;
+            this.segmentSize = 0;
+        }
+    }
+
+    ManagedLedgerFactoryMBeanImpl getFactoryMBean() {
+        return factoryMBean;
+    }
+
+    @Override
+    public EntryCache getEntryCache(ManagedLedgerImpl ml) {
+        if (getMaxSize() > 0) {
+            return new SharedEntryCacheImpl(ml, this);
+        } else {
+            return new EntryCacheDisabled(ml);
+        }
+    }
+
+    @Override
+    public void removeEntryCache(String name) {
+        // no-op
+    }
+
+    @Override
+    public long getSize() {
+        long totalSize = 0;
+        for (int i = 0; i < segmentsCount; i++) {
+            totalSize += segments.get(i).getSize();
+        }
+        return totalSize;
+    }
+
+    @Override
+    public long getMaxSize() {
+        return config.getMaxCacheSize();
+    }
+
+    @Override
+    public void clear() {
+        segments.forEach(SharedCacheSegment::clear);
+    }
+
+    @Override
+    public void close() {
+        segments.forEach(SharedCacheSegment::close);
+    }
+
+    @Override
+    public void updateCacheSizeAndThreshold(long maxSize) {
+
+    }
+
+    @Override
+    public void updateCacheEvictionWatermark(double cacheEvictionWatermark) {
+        // No-Op. We don't use the cache eviction watermark in this implementation
+    }
+
+    @Override
+    public double getCacheEvictionWatermark() {
+        return config.getCacheEvictionWatermark();
+    }
+
+    boolean insert(EntryImpl entry) {
+        int entrySize = entry.getLength();
+
+        if (entrySize > segmentSize) {
+            log.debug("entrySize {} > segmentSize {}, skip update read cache!", entrySize, segmentSize);
+            return false;
+        }
+
+        long stamp = lock.readLock();
+        try {
+            SharedCacheSegment s = segments.get(currentSegmentIdx);
+
+            if (s.insert(entry.getLedgerId(), entry.getEntryId(), entry.getDataBuffer())) {
+                return true;
+            }
+        } finally {
+            lock.unlockRead(stamp);
+        }
+
+        // We could not insert in segment, we to get the write lock and roll-over to
+        // next segment
+        stamp = lock.writeLock();
+
+        try {
+            SharedCacheSegment segment = segments.get(currentSegmentIdx);
+
+            if (segment.insert(entry.getLedgerId(), entry.getEntryId(), entry.getDataBuffer())) {
+                return true;
+            }
+
+            // Roll to next segment
+            currentSegmentIdx = (currentSegmentIdx + 1) % segmentsCount;
+            segment = segments.get(currentSegmentIdx);
+            segment.clear();
+            return segment.insert(entry.getLedgerId(), entry.getEntryId(), entry.getDataBuffer());
+        } finally {
+            lock.unlockWrite(stamp);
+        }
+    }
+
+    EntryImpl get(long ledgerId, long entryId) {
+        long stamp = lock.readLock();
+
+        try {
+            // We need to check all the segments, starting from the current one and looking
+            // backward to minimize the checks for recently inserted entries
+            for (int i = 0; i < segmentsCount; i++) {

Review Comment:
   How about maintaining a mapping from the position <LedgerId, EntryId> to the SharedCacheSegment? 
   With this mapping, we can get the target segment directly instead of traversing the whole segment list.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org