You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/06/06 23:39:23 UTC
[pulsar] branch master updated: Extracted interface for EntryCacheManager (#15933)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c7faf6248b1 Extracted interface for EntryCacheManager (#15933)
c7faf6248b1 is described below
commit c7faf6248b1413fc71465fc12571886db323193c
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Jun 6 16:39:14 2022 -0700
Extracted interface for EntryCacheManager (#15933)
* Extracted interface for EntryCacheManager
* Fixed references
* added more methods to the interface
* Fixed mocked test
* Removed unused import
* Fixed wrong casting in reflection access
---
.../bookkeeper/mledger/ManagedLedgerFactory.java | 2 +-
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 2 +-
.../mledger/impl/ManagedLedgerFactoryImpl.java | 6 +-
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 25 ++--
.../mledger/impl/{ => cache}/EntryCache.java | 4 +-
.../EntryCacheDefaultEvictionPolicy.java | 4 +-
.../mledger/impl/cache/EntryCacheDisabled.java | 147 +++++++++++++++++++++
.../impl/{ => cache}/EntryCacheEvictionPolicy.java | 2 +-
.../EntryCacheManager.java} | 34 ++---
.../RangeEntryCacheImpl.java} | 21 +--
.../RangeEntryCacheManagerImpl.java} | 145 +++-----------------
.../package-info.java} | 20 +--
.../mledger/impl/EntryCacheManagerTest.java | 126 +++++++++---------
.../bookkeeper/mledger/impl/EntryCacheTest.java | 4 +-
.../mledger/impl/ManagedLedgerBkTest.java | 11 +-
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 18 ++-
.../service/nonpersistent/NonPersistentTopic.java | 2 +-
.../broker/stats/AllocatorStatsGenerator.java | 4 +-
.../stats/metrics/ManagedLedgerCacheMetrics.java | 4 +-
.../broker/service/PersistentTopicE2ETest.java | 4 +-
.../client/api/SimpleProducerConsumerTest.java | 4 +-
.../client/api/v1/V1_ProducerConsumerTest.java | 4 +-
22 files changed, 314 insertions(+), 279 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
index ea78987eb3e..e42c2581ba1 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
@@ -26,7 +26,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ManagedLedgerInfoCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenReadOnlyCursorCallback;
-import org.apache.bookkeeper.mledger.impl.EntryCacheManager;
+import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
/**
* A factory to open/create managed ledgers and delete them.
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 3d558a231db..10d8d19eb33 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -2832,7 +2832,7 @@ public class ManagedCursorImpl implements ManagedCursor {
boolean shouldCloseLedger(LedgerHandle lh) {
long now = clock.millis();
- if (ledger.factory.isMetadataServiceAvailable()
+ if (ledger.getFactory().isMetadataServiceAvailable()
&& (lh.getLastAddConfirmed() >= config.getMetadataMaxEntriesPerLedger()
|| lastLedgerSwitchTimestamp < (now - config.getLedgerRolloverTimeout() * 1000))
&& (STATE_UPDATER.get(this) != State.Closed && STATE_UPDATER.get(this) != State.Closing)) {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index b6b1bcf3e8d..48e85423e84 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -68,6 +68,8 @@ import org.apache.bookkeeper.mledger.ReadOnlyCursor;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.ManagedLedgerInitializeLedgerCallback;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
+import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
+import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.LongProperty;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
@@ -90,10 +92,12 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
private final BookkeeperFactoryForCustomEnsemblePlacementPolicy bookkeeperFactory;
private final boolean isBookkeeperManaged;
private final ManagedLedgerFactoryConfig config;
+ @Getter
protected final OrderedScheduler scheduledExecutor;
private final ExecutorService cacheEvictionExecutor;
+ @Getter
protected final ManagedLedgerFactoryMBeanImpl mbean;
protected final ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>> ledgers = new ConcurrentHashMap<>();
@@ -189,7 +193,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
config.getManagedCursorInfoCompressionType());
this.config = config;
this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
- this.entryCacheManager = new EntryCacheManager(this);
+ this.entryCacheManager = new RangeEntryCacheManagerImpl(this);
this.statsTask = scheduledExecutor.scheduleWithFixedDelay(catchingAndLoggingThrowables(this::refreshStats),
0, config.getStatsPeriodSeconds(), TimeUnit.SECONDS);
this.flushCursorsTask = scheduledExecutor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::flushCursors),
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 56399f74fd6..6b7eb90e0d8 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -69,6 +69,7 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import lombok.Getter;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
@@ -116,6 +117,7 @@ import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.WaitingEntryCallBack;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
+import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.bookkeeper.mledger.offload.OffloadUtils;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
@@ -262,9 +264,16 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
AtomicReferenceFieldUpdater.newUpdater(ManagedLedgerImpl.class, State.class, "state");
protected volatile State state = null;
+ @Getter
private final OrderedScheduler scheduledExecutor;
+
+ @Getter
private final OrderedExecutor executor;
- final ManagedLedgerFactoryImpl factory;
+
+ @Getter
+ private final ManagedLedgerFactoryImpl factory;
+
+ @Getter
protected final ManagedLedgerMBeanImpl mbean;
protected final Clock clock;
@@ -1865,7 +1874,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
}
}
- void invalidateLedgerHandle(ReadHandle ledgerHandle) {
+ public void invalidateLedgerHandle(ReadHandle ledgerHandle) {
long ledgerId = ledgerHandle.getId();
LedgerHandle currentLedger = this.currentLedger;
@@ -3564,14 +3573,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
return ledgers;
}
- OrderedScheduler getScheduledExecutor() {
- return scheduledExecutor;
- }
-
- OrderedExecutor getExecutor() {
- return executor;
- }
-
private ManagedLedgerInfo getManagedLedgerInfo() {
return buildManagedLedgerInfo(ledgers);
}
@@ -3679,10 +3680,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
return STATE_UPDATER.get(this);
}
- public ManagedLedgerMBeanImpl getMBean() {
- return mbean;
- }
-
public long getCacheSize() {
return entryCache.getSize();
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCache.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCache.java
similarity index 96%
rename from managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCache.java
rename to managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCache.java
index 0e20a0facff..8f5b3e9b19e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCache.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCache.java
@@ -16,11 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.bookkeeper.mledger.impl;
+package org.apache.bookkeeper.mledger.impl.cache;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
/**
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheDefaultEvictionPolicy.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDefaultEvictionPolicy.java
similarity index 96%
rename from managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheDefaultEvictionPolicy.java
rename to managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDefaultEvictionPolicy.java
index 4c26a93514b..f2a3cd4e51e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheDefaultEvictionPolicy.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDefaultEvictionPolicy.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.bookkeeper.mledger.impl;
+package org.apache.bookkeeper.mledger.impl.cache;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Collections.reverseOrder;
@@ -89,7 +89,7 @@ public class EntryCacheDefaultEvictionPolicy implements EntryCacheEvictionPolicy
}
log.info("Completed cache eviction. Removed {} entries from {} caches. ({} Mb)", evictedEntries,
- cachesToEvict.size(), evictedSize / EntryCacheManager.MB);
+ cachesToEvict.size(), evictedSize / RangeEntryCacheManagerImpl.MB);
}
private static final Logger log = LoggerFactory.getLogger(EntryCacheDefaultEvictionPolicy.class);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java
new file mode 100644
index 00000000000..a09b8ba27fc
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl.cache;
+
+import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Longs;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
+import org.apache.commons.lang3.tuple.Pair;
+
+/**
+ * Implementation of cache that always read from BookKeeper.
+ */
+public class EntryCacheDisabled implements EntryCache {
+ private final ManagedLedgerImpl ml;
+ private final ManagedLedgerInterceptor interceptor;
+
+ public EntryCacheDisabled(ManagedLedgerImpl ml) {
+ this.ml = ml;
+ this.interceptor = ml.getManagedLedgerInterceptor();
+ }
+
+ @Override
+ public String getName() {
+ return ml.getName();
+ }
+
+ @Override
+ public boolean insert(EntryImpl entry) {
+ return false;
+ }
+
+ @Override
+ public void invalidateEntries(PositionImpl lastPosition) {
+ }
+
+ @Override
+ public void invalidateAllEntries(long ledgerId) {
+ }
+
+ @Override
+ public void clear() {
+ }
+
+ @Override
+ public Pair<Integer, Long> evictEntries(long sizeToFree) {
+ return Pair.of(0, (long) 0);
+ }
+
+ @Override
+ public void invalidateEntriesBeforeTimestamp(long timestamp) {
+ }
+
+ @Override
+ public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader,
+ final AsyncCallbacks.ReadEntriesCallback callback, Object ctx) {
+ lh.readAsync(firstEntry, lastEntry).thenAcceptAsync(
+ ledgerEntries -> {
+ List<Entry> entries = Lists.newArrayList();
+ long totalSize = 0;
+ try {
+ for (LedgerEntry e : ledgerEntries) {
+ // Insert the entries at the end of the list (they will be unsorted for now)
+ EntryImpl entry = RangeEntryCacheManagerImpl.create(e, interceptor);
+ entries.add(entry);
+ totalSize += entry.getLength();
+ }
+ } finally {
+ ledgerEntries.close();
+ }
+ ml.getFactory().getMbean().recordCacheMiss(entries.size(), totalSize);
+ ml.getMbean().addReadEntriesSample(entries.size(), totalSize);
+
+ callback.readEntriesComplete(entries, ctx);
+ }, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception -> {
+ callback.readEntriesFailed(createManagedLedgerException(exception), ctx);
+ return null;
+ });
+ }
+
+ @Override
+ public void asyncReadEntry(ReadHandle lh, PositionImpl position, AsyncCallbacks.ReadEntryCallback callback,
+ Object ctx) {
+ lh.readAsync(position.getEntryId(), position.getEntryId()).whenCompleteAsync(
+ (ledgerEntries, exception) -> {
+ if (exception != null) {
+ ml.invalidateLedgerHandle(lh);
+ callback.readEntryFailed(createManagedLedgerException(exception), ctx);
+ return;
+ }
+
+ try {
+ Iterator<LedgerEntry> iterator = ledgerEntries.iterator();
+ if (iterator.hasNext()) {
+ LedgerEntry ledgerEntry = iterator.next();
+ EntryImpl returnEntry = RangeEntryCacheManagerImpl.create(ledgerEntry, interceptor);
+
+ ml.getFactory().getMbean().recordCacheMiss(1, returnEntry.getLength());
+ ml.getMbean().addReadEntriesSample(1, returnEntry.getLength());
+ callback.readEntryComplete(returnEntry, ctx);
+ } else {
+ callback.readEntryFailed(new ManagedLedgerException("Could not read given position"),
+ ctx);
+ }
+ } finally {
+ ledgerEntries.close();
+ }
+ }, ml.getExecutor().chooseThread(ml.getName()));
+ }
+
+ @Override
+ public long getSize() {
+ return 0;
+ }
+
+ @Override
+ public int compareTo(EntryCache other) {
+ return Longs.compare(getSize(), other.getSize());
+ }
+
+}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheEvictionPolicy.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheEvictionPolicy.java
similarity index 96%
copy from managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheEvictionPolicy.java
copy to managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheEvictionPolicy.java
index 341c5c328ad..8c55ce7cf7f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheEvictionPolicy.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheEvictionPolicy.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.bookkeeper.mledger.impl;
+package org.apache.bookkeeper.mledger.impl.cache;
import java.util.List;
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheEvictionPolicy.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheManager.java
similarity index 61%
copy from managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheEvictionPolicy.java
copy to managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheManager.java
index 341c5c328ad..12cbb023f86 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheEvictionPolicy.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheManager.java
@@ -16,22 +16,24 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.bookkeeper.mledger.impl;
+package org.apache.bookkeeper.mledger.impl.cache;
-import java.util.List;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
-/**
- * Cache eviction policy abstraction interface.
- *
- */
-public interface EntryCacheEvictionPolicy {
- /**
- * Perform the cache eviction of at least sizeToFree bytes on the supplied list of caches.
- *
- * @param caches
- * the list of caches to consider
- * @param sizeToFree
- * the minimum size in bytes to be freed
- */
- void doEviction(List<EntryCache> caches, long sizeToFree);
+public interface EntryCacheManager {
+ EntryCache getEntryCache(ManagedLedgerImpl ml);
+
+ void removeEntryCache(String name);
+
+ long getSize();
+
+ long getMaxSize();
+
+ void clear();
+
+ void updateCacheSizeAndThreshold(long maxSize);
+
+ void updateCacheEvictionWatermark(double cacheEvictionWatermark);
+
+ double getCacheEvictionWatermark();
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
similarity index 94%
rename from managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
rename to managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
index 1a6986a7405..d37676b8d65 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.bookkeeper.mledger.impl;
+package org.apache.bookkeeper.mledger.impl.cache;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -34,6 +34,9 @@ import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.bookkeeper.mledger.util.RangeCache;
import org.apache.commons.lang3.tuple.Pair;
@@ -43,9 +46,9 @@ import org.slf4j.LoggerFactory;
/**
* Cache data payload for entries of all ledgers.
*/
-public class EntryCacheImpl implements EntryCache {
+public class RangeEntryCacheImpl implements EntryCache {
- private final EntryCacheManager manager;
+ private final RangeEntryCacheManagerImpl manager;
private final ManagedLedgerImpl ml;
private ManagedLedgerInterceptor interceptor;
private final RangeCache<PositionImpl, EntryImpl> entries;
@@ -53,7 +56,7 @@ public class EntryCacheImpl implements EntryCache {
private static final double MB = 1024 * 1024;
- public EntryCacheImpl(EntryCacheManager manager, ManagedLedgerImpl ml, boolean copyEntries) {
+ public RangeEntryCacheImpl(RangeEntryCacheManagerImpl manager, ManagedLedgerImpl ml, boolean copyEntries) {
this.manager = manager;
this.ml = ml;
this.interceptor = ml.getManagedLedgerInterceptor();
@@ -214,10 +217,10 @@ public class EntryCacheImpl implements EntryCache {
Iterator<LedgerEntry> iterator = ledgerEntries.iterator();
if (iterator.hasNext()) {
LedgerEntry ledgerEntry = iterator.next();
- EntryImpl returnEntry = EntryCacheManager.create(ledgerEntry, interceptor);
+ EntryImpl returnEntry = RangeEntryCacheManagerImpl.create(ledgerEntry, interceptor);
manager.mlFactoryMBean.recordCacheMiss(1, returnEntry.getLength());
- ml.mbean.addReadEntriesSample(1, returnEntry.getLength());
+ ml.getMbean().addReadEntriesSample(1, returnEntry.getLength());
callback.readEntryComplete(returnEntry, ctx);
} else {
// got an empty sequence
@@ -299,14 +302,14 @@ public class EntryCacheImpl implements EntryCache {
long totalSize = 0;
final List<EntryImpl> entriesToReturn = Lists.newArrayListWithExpectedSize(entriesToRead);
for (LedgerEntry e : ledgerEntries) {
- EntryImpl entry = EntryCacheManager.create(e, interceptor);
+ EntryImpl entry = RangeEntryCacheManagerImpl.create(e, interceptor);
entriesToReturn.add(entry);
totalSize += entry.getLength();
}
manager.mlFactoryMBean.recordCacheMiss(entriesToReturn.size(), totalSize);
- ml.getMBean().addReadEntriesSample(entriesToReturn.size(), totalSize);
+ ml.getMbean().addReadEntriesSample(entriesToReturn.size(), totalSize);
callback.readEntriesComplete((List) entriesToReturn, ctx);
} finally {
@@ -364,5 +367,5 @@ public class EntryCacheImpl implements EntryCache {
manager.entriesRemoved(evictedSize);
}
- private static final Logger log = LoggerFactory.getLogger(EntryCacheImpl.class);
+ private static final Logger log = LoggerFactory.getLogger(RangeEntryCacheImpl.class);
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java
similarity index 56%
rename from managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
rename to managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java
index 132feca6de8..4c27781b1f0 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java
@@ -16,34 +16,29 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.bookkeeper.mledger.impl;
+package org.apache.bookkeeper.mledger.impl.cache;
-import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.common.primitives.Longs;
import io.netty.buffer.ByteBuf;
-import java.util.Iterator;
-import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.client.api.LedgerEntry;
-import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
-import org.apache.bookkeeper.mledger.AsyncCallbacks;
-import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
-import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryMBeanImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
-import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings("checkstyle:javadoctype")
-public class EntryCacheManager {
+public class RangeEntryCacheManagerImpl implements EntryCacheManager {
private volatile long maxSize;
private volatile long evictionTriggerThreshold;
@@ -62,13 +57,13 @@ public class EntryCacheManager {
private static final double evictionTriggerThresholdPercent = 0.98;
- public EntryCacheManager(ManagedLedgerFactoryImpl factory) {
+ public RangeEntryCacheManagerImpl(ManagedLedgerFactoryImpl factory) {
this.maxSize = factory.getConfig().getMaxCacheSize();
this.evictionTriggerThreshold = (long) (maxSize * evictionTriggerThresholdPercent);
this.cacheEvictionWatermark = factory.getConfig().getCacheEvictionWatermark();
this.evictionPolicy = new EntryCacheDefaultEvictionPolicy();
this.mlFactory = factory;
- this.mlFactoryMBean = factory.mbean;
+ this.mlFactoryMBean = factory.getMbean();
log.info("Initialized managed-ledger entry cache of {} Mb", maxSize / MB);
}
@@ -79,7 +74,7 @@ public class EntryCacheManager {
return new EntryCacheDisabled(ml);
}
- EntryCache newEntryCache = new EntryCacheImpl(this, ml, mlFactory.getConfig().isCopyEntriesInCache());
+ EntryCache newEntryCache = new RangeEntryCacheImpl(this, ml, mlFactory.getConfig().isCopyEntriesInCache());
EntryCache currentEntryCache = caches.putIfAbsent(ml.getName(), newEntryCache);
if (currentEntryCache != null) {
return currentEntryCache;
@@ -88,16 +83,19 @@ public class EntryCacheManager {
}
}
+ @Override
public void updateCacheSizeAndThreshold(long maxSize) {
this.maxSize = maxSize;
this.evictionTriggerThreshold = (long) (maxSize * evictionTriggerThresholdPercent);
}
+ @Override
public void updateCacheEvictionWatermark(double cacheEvictionWatermark) {
this.cacheEvictionWatermark = cacheEvictionWatermark;
}
- void removeEntryCache(String name) {
+ @Override
+ public void removeEntryCache(String name) {
EntryCache entryCache = caches.remove(name);
if (entryCache == null) {
return;
@@ -116,7 +114,7 @@ public class EntryCacheManager {
// Trigger a single eviction in background. While the eviction is running we stop inserting entries in the cache
if (currentSize > evictionTriggerThreshold && evictionInProgress.compareAndSet(false, true)) {
- mlFactory.scheduledExecutor.execute(safeRun(() -> {
+ mlFactory.getScheduledExecutor().execute(safeRun(() -> {
// Trigger a new cache eviction cycle to bring the used memory below the cacheEvictionWatermark
// percentage limit
long sizeToEvict = currentSize - (long) (maxSize * cacheEvictionWatermark);
@@ -150,131 +148,26 @@ public class EntryCacheManager {
currentSize.addAndGet(-size);
}
+ @Override
public long getSize() {
return currentSize.get();
}
+ @Override
public long getMaxSize() {
return maxSize;
}
+ @Override
public double getCacheEvictionWatermark() {
return cacheEvictionWatermark;
}
+ @Override
public void clear() {
caches.values().forEach(EntryCache::clear);
}
- protected class EntryCacheDisabled implements EntryCache {
- private final ManagedLedgerImpl ml;
- private final ManagedLedgerInterceptor interceptor;
-
- public EntryCacheDisabled(ManagedLedgerImpl ml) {
- this.ml = ml;
- this.interceptor = ml.getManagedLedgerInterceptor();
- }
-
- @Override
- public String getName() {
- return ml.getName();
- }
-
- @Override
- public boolean insert(EntryImpl entry) {
- return false;
- }
-
- @Override
- public void invalidateEntries(PositionImpl lastPosition) {
- }
-
- @Override
- public void invalidateAllEntries(long ledgerId) {
- }
-
- @Override
- public void clear() {
- }
-
- @Override
- public Pair<Integer, Long> evictEntries(long sizeToFree) {
- return Pair.of(0, (long) 0);
- }
-
- @Override
- public void invalidateEntriesBeforeTimestamp(long timestamp) {
- }
-
- @Override
- public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader,
- final ReadEntriesCallback callback, Object ctx) {
- lh.readAsync(firstEntry, lastEntry).thenAcceptAsync(
- ledgerEntries -> {
- List<Entry> entries = Lists.newArrayList();
- long totalSize = 0;
- try {
- for (LedgerEntry e : ledgerEntries) {
- // Insert the entries at the end of the list (they will be unsorted for now)
- EntryImpl entry = create(e, interceptor);
- entries.add(entry);
- totalSize += entry.getLength();
- }
- } finally {
- ledgerEntries.close();
- }
- mlFactoryMBean.recordCacheMiss(entries.size(), totalSize);
- ml.mbean.addReadEntriesSample(entries.size(), totalSize);
-
- callback.readEntriesComplete(entries, ctx);
- }, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception -> {
- callback.readEntriesFailed(createManagedLedgerException(exception), ctx);
- return null;
- });
- }
-
- @Override
- public void asyncReadEntry(ReadHandle lh, PositionImpl position, AsyncCallbacks.ReadEntryCallback callback,
- Object ctx) {
- lh.readAsync(position.getEntryId(), position.getEntryId()).whenCompleteAsync(
- (ledgerEntries, exception) -> {
- if (exception != null) {
- ml.invalidateLedgerHandle(lh);
- callback.readEntryFailed(createManagedLedgerException(exception), ctx);
- return;
- }
-
- try {
- Iterator<LedgerEntry> iterator = ledgerEntries.iterator();
- if (iterator.hasNext()) {
- LedgerEntry ledgerEntry = iterator.next();
- EntryImpl returnEntry = create(ledgerEntry, interceptor);
-
- mlFactoryMBean.recordCacheMiss(1, returnEntry.getLength());
- ml.getMBean().addReadEntriesSample(1, returnEntry.getLength());
- callback.readEntryComplete(returnEntry, ctx);
- } else {
- callback.readEntryFailed(new ManagedLedgerException("Could not read given position"),
- ctx);
- }
- } finally {
- ledgerEntries.close();
- }
- }, ml.getExecutor().chooseThread(ml.getName()));
- }
-
- @Override
- public long getSize() {
- return 0;
- }
-
- @Override
- public int compareTo(EntryCache other) {
- return Longs.compare(getSize(), other.getSize());
- }
-
- }
-
public static Entry create(long ledgerId, long entryId, ByteBuf data) {
return EntryImpl.create(ledgerId, entryId, data);
}
@@ -300,5 +193,5 @@ public class EntryCacheManager {
return returnEntry;
}
- private static final Logger log = LoggerFactory.getLogger(EntryCacheManager.class);
+ private static final Logger log = LoggerFactory.getLogger(RangeEntryCacheManagerImpl.class);
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheEvictionPolicy.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/package-info.java
similarity index 61%
rename from managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheEvictionPolicy.java
rename to managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/package-info.java
index 341c5c328ad..898d250aa7f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheEvictionPolicy.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/package-info.java
@@ -16,22 +16,4 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.bookkeeper.mledger.impl;
-
-import java.util.List;
-
-/**
- * Cache eviction policy abstraction interface.
- *
- */
-public interface EntryCacheEvictionPolicy {
- /**
- * Perform the cache eviction of at least sizeToFree bytes on the supplied list of caches.
- *
- * @param caches
- * the list of caches to consider
- * @param sizeToFree
- * the minimum size in bytes to be freed
- */
- void doEviction(List<EntryCache> caches, long sizeToFree);
-}
+package org.apache.bookkeeper.mledger.impl.cache;
\ No newline at end of file
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
index 0e53b1a9254..55f58ecd11c 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
@@ -40,6 +40,9 @@ import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
+import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
+import org.apache.bookkeeper.mledger.impl.cache.EntryCacheDisabled;
+import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -56,8 +59,9 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
ml1 = mock(ManagedLedgerImpl.class);
when(ml1.getScheduledExecutor()).thenReturn(executor);
when(ml1.getName()).thenReturn("cache1");
- when(ml1.getMBean()).thenReturn(new ManagedLedgerMBeanImpl(ml1));
+ when(ml1.getMbean()).thenReturn(new ManagedLedgerMBeanImpl(ml1));
when(ml1.getExecutor()).thenReturn(super.executor);
+ when(ml1.getFactory()).thenReturn(factory);
ml2 = mock(ManagedLedgerImpl.class);
when(ml2.getScheduledExecutor()).thenReturn(executor);
@@ -83,13 +87,13 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
assertEquals(cache1.getSize(), 7);
assertEquals(cacheManager.getSize(), 7);
- cacheManager.mlFactoryMBean.refreshStats(1, TimeUnit.SECONDS);
- assertEquals(cacheManager.mlFactoryMBean.getCacheMaxSize(), 10);
- assertEquals(cacheManager.mlFactoryMBean.getCacheUsedSize(), 7);
- assertEquals(cacheManager.mlFactoryMBean.getCacheHitsRate(), 0.0);
- assertEquals(cacheManager.mlFactoryMBean.getCacheMissesRate(), 0.0);
- assertEquals(cacheManager.mlFactoryMBean.getCacheHitsThroughput(), 0.0);
- assertEquals(cacheManager.mlFactoryMBean.getNumberOfCacheEvictions(), 0);
+ factory2.getMbean().refreshStats(1, TimeUnit.SECONDS);
+ assertEquals(factory2.getMbean().getCacheMaxSize(), 10);
+ assertEquals(factory2.getMbean().getCacheUsedSize(), 7);
+ assertEquals(factory2.getMbean().getCacheHitsRate(), 0.0);
+ assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0);
+ assertEquals(factory2.getMbean().getCacheHitsThroughput(), 0.0);
+ assertEquals(factory2.getMbean().getNumberOfCacheEvictions(), 0);
cache2.insert(EntryImpl.create(2, 0, new byte[1]));
cache2.insert(EntryImpl.create(2, 1, new byte[1]));
@@ -117,14 +121,14 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
assertEquals(cacheManager.getSize(), 2);
assertEquals(cache2.getSize(), 2);
- cacheManager.mlFactoryMBean.refreshStats(1, TimeUnit.SECONDS);
+ factory2.getMbean().refreshStats(1, TimeUnit.SECONDS);
- assertEquals(cacheManager.mlFactoryMBean.getCacheMaxSize(), 10);
- assertEquals(cacheManager.mlFactoryMBean.getCacheUsedSize(), 2);
- assertEquals(cacheManager.mlFactoryMBean.getCacheHitsRate(), 0.0);
- assertEquals(cacheManager.mlFactoryMBean.getCacheMissesRate(), 0.0);
- assertEquals(cacheManager.mlFactoryMBean.getCacheHitsThroughput(), 0.0);
- assertEquals(cacheManager.mlFactoryMBean.getNumberOfCacheEvictions(), 1);
+ assertEquals(factory2.getMbean().getCacheMaxSize(), 10);
+ assertEquals(factory2.getMbean().getCacheUsedSize(), 2);
+ assertEquals(factory2.getMbean().getCacheHitsRate(), 0.0);
+ assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0);
+ assertEquals(factory2.getMbean().getCacheHitsThroughput(), 0.0);
+ assertEquals(factory2.getMbean().getNumberOfCacheEvictions(), 1);
}
@Test
@@ -197,8 +201,8 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
EntryCache cache1 = cacheManager.getEntryCache(ml1);
EntryCache cache2 = cacheManager.getEntryCache(ml2);
- assertTrue(cache1 instanceof EntryCacheManager.EntryCacheDisabled);
- assertTrue(cache2 instanceof EntryCacheManager.EntryCacheDisabled);
+ assertTrue(cache1 instanceof EntryCacheDisabled);
+ assertTrue(cache2 instanceof EntryCacheDisabled);
cache1.insert(EntryImpl.create(1, 1, new byte[4]));
cache1.insert(EntryImpl.create(1, 0, new byte[3]));
@@ -206,13 +210,13 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
assertEquals(cache1.getSize(), 0);
assertEquals(cacheManager.getSize(), 0);
- cacheManager.mlFactoryMBean.refreshStats(1, TimeUnit.SECONDS);
- assertEquals(cacheManager.mlFactoryMBean.getCacheMaxSize(), 0);
- assertEquals(cacheManager.mlFactoryMBean.getCacheUsedSize(), 0);
- assertEquals(cacheManager.mlFactoryMBean.getCacheHitsRate(), 0.0);
- assertEquals(cacheManager.mlFactoryMBean.getCacheMissesRate(), 0.0);
- assertEquals(cacheManager.mlFactoryMBean.getCacheHitsThroughput(), 0.0);
- assertEquals(cacheManager.mlFactoryMBean.getNumberOfCacheEvictions(), 0);
+ factory2.getMbean().refreshStats(1, TimeUnit.SECONDS);
+ assertEquals(factory2.getMbean().getCacheMaxSize(), 0);
+ assertEquals(factory2.getMbean().getCacheUsedSize(), 0);
+ assertEquals(factory2.getMbean().getCacheHitsRate(), 0.0);
+ assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0);
+ assertEquals(factory2.getMbean().getCacheHitsThroughput(), 0.0);
+ assertEquals(factory2.getMbean().getNumberOfCacheEvictions(), 0);
cache2.insert(EntryImpl.create(2, 0, new byte[1]));
cache2.insert(EntryImpl.create(2, 1, new byte[1]));
@@ -242,13 +246,13 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
assertEquals(cache1.getSize(), 0);
assertEquals(cacheManager.getSize(), 0);
- cacheManager.mlFactoryMBean.refreshStats(1, TimeUnit.SECONDS);
- assertEquals(cacheManager.mlFactoryMBean.getCacheMaxSize(), 7 * 10);
- assertEquals(cacheManager.mlFactoryMBean.getCacheUsedSize(), 0);
- assertEquals(cacheManager.mlFactoryMBean.getCacheHitsRate(), 0.0);
- assertEquals(cacheManager.mlFactoryMBean.getCacheMissesRate(), 0.0);
- assertEquals(cacheManager.mlFactoryMBean.getCacheHitsThroughput(), 0.0);
- assertEquals(cacheManager.mlFactoryMBean.getNumberOfCacheEvictions(), 0);
+ factory2.getMbean().refreshStats(1, TimeUnit.SECONDS);
+ assertEquals(factory2.getMbean().getCacheMaxSize(), 7 * 10);
+ assertEquals(factory2.getMbean().getCacheUsedSize(), 0);
+ assertEquals(factory2.getMbean().getCacheHitsRate(), 0.0);
+ assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0);
+ assertEquals(factory2.getMbean().getCacheHitsThroughput(), 0.0);
+ assertEquals(factory2.getMbean().getNumberOfCacheEvictions(), 0);
}
@Test
@@ -271,54 +275,54 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
ledger.addEntry(("entry-" + i).getBytes());
}
- cacheManager.mlFactoryMBean.refreshStats(1, TimeUnit.SECONDS);
- assertEquals(cacheManager.mlFactoryMBean.getCacheUsedSize(), 70);
- assertEquals(cacheManager.mlFactoryMBean.getCacheHitsRate(), 0.0);
- assertEquals(cacheManager.mlFactoryMBean.getCacheMissesRate(), 0.0);
- assertEquals(cacheManager.mlFactoryMBean.getCacheHitsThroughput(), 0.0);
- assertEquals(cacheManager.mlFactoryMBean.getNumberOfCacheEvictions(), 0);
+ factory2.getMbean().refreshStats(1, TimeUnit.SECONDS);
+ assertEquals(factory2.getMbean().getCacheUsedSize(), 70);
+ assertEquals(factory2.getMbean().getCacheHitsRate(), 0.0);
+ assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0);
+ assertEquals(factory2.getMbean().getCacheHitsThroughput(), 0.0);
+ assertEquals(factory2.getMbean().getNumberOfCacheEvictions(), 0);
List<Entry> entries = c1.readEntries(10);
assertEquals(entries.size(), 10);
entries.forEach(Entry::release);
- cacheManager.mlFactoryMBean.refreshStats(1, TimeUnit.SECONDS);
- assertEquals(cacheManager.mlFactoryMBean.getCacheUsedSize(), 70);
- assertEquals(cacheManager.mlFactoryMBean.getCacheHitsRate(), 10.0);
- assertEquals(cacheManager.mlFactoryMBean.getCacheMissesRate(), 0.0);
- assertEquals(cacheManager.mlFactoryMBean.getCacheHitsThroughput(), 70.0);
- assertEquals(cacheManager.mlFactoryMBean.getNumberOfCacheEvictions(), 0);
+ factory2.getMbean().refreshStats(1, TimeUnit.SECONDS);
+ assertEquals(factory2.getMbean().getCacheUsedSize(), 70);
+ assertEquals(factory2.getMbean().getCacheHitsRate(), 10.0);
+ assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0);
+ assertEquals(factory2.getMbean().getCacheHitsThroughput(), 70.0);
+ assertEquals(factory2.getMbean().getNumberOfCacheEvictions(), 0);
ledger.deactivateCursor(c1);
- cacheManager.mlFactoryMBean.refreshStats(1, TimeUnit.SECONDS);
- assertEquals(cacheManager.mlFactoryMBean.getCacheUsedSize(), 70);
- assertEquals(cacheManager.mlFactoryMBean.getCacheHitsRate(), 0.0);
- assertEquals(cacheManager.mlFactoryMBean.getCacheMissesRate(), 0.0);
- assertEquals(cacheManager.mlFactoryMBean.getCacheHitsThroughput(), 0.0);
- assertEquals(cacheManager.mlFactoryMBean.getNumberOfCacheEvictions(), 0);
+ factory2.getMbean().refreshStats(1, TimeUnit.SECONDS);
+ assertEquals(factory2.getMbean().getCacheUsedSize(), 70);
+ assertEquals(factory2.getMbean().getCacheHitsRate(), 0.0);
+ assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0);
+ assertEquals(factory2.getMbean().getCacheHitsThroughput(), 0.0);
+ assertEquals(factory2.getMbean().getNumberOfCacheEvictions(), 0);
entries = c2.readEntries(10);
assertEquals(entries.size(), 10);
- cacheManager.mlFactoryMBean.refreshStats(1, TimeUnit.SECONDS);
- assertEquals(cacheManager.mlFactoryMBean.getCacheUsedSize(), 70);
- assertEquals(cacheManager.mlFactoryMBean.getCacheHitsRate(), 10.0);
- assertEquals(cacheManager.mlFactoryMBean.getCacheMissesRate(), 0.0);
- assertEquals(cacheManager.mlFactoryMBean.getCacheHitsThroughput(), 70.0);
- assertEquals(cacheManager.mlFactoryMBean.getNumberOfCacheEvictions(), 0);
+ factory2.getMbean().refreshStats(1, TimeUnit.SECONDS);
+ assertEquals(factory2.getMbean().getCacheUsedSize(), 70);
+ assertEquals(factory2.getMbean().getCacheHitsRate(), 10.0);
+ assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0);
+ assertEquals(factory2.getMbean().getCacheHitsThroughput(), 70.0);
+ assertEquals(factory2.getMbean().getNumberOfCacheEvictions(), 0);
PositionImpl pos = (PositionImpl) entries.get(entries.size() - 1).getPosition();
c2.setReadPosition(pos);
ledger.discardEntriesFromCache(c2, pos);
entries.forEach(Entry::release);
- cacheManager.mlFactoryMBean.refreshStats(1, TimeUnit.SECONDS);
- assertEquals(cacheManager.mlFactoryMBean.getCacheUsedSize(), 7);
- assertEquals(cacheManager.mlFactoryMBean.getCacheHitsRate(), 0.0);
- assertEquals(cacheManager.mlFactoryMBean.getCacheMissesRate(), 0.0);
- assertEquals(cacheManager.mlFactoryMBean.getCacheHitsThroughput(), 0.0);
- assertEquals(cacheManager.mlFactoryMBean.getNumberOfCacheEvictions(), 0);
+ factory2.getMbean().refreshStats(1, TimeUnit.SECONDS);
+ assertEquals(factory2.getMbean().getCacheUsedSize(), 7);
+ assertEquals(factory2.getMbean().getCacheHitsRate(), 0.0);
+ assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0);
+ assertEquals(factory2.getMbean().getCacheHitsThroughput(), 0.0);
+ assertEquals(factory2.getMbean().getNumberOfCacheEvictions(), 0);
}
@Test
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java
index 6f177c9ac92..fc7f0bbb058 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java
@@ -41,6 +41,8 @@ import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
+import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.mockito.Mockito;
import org.testng.Assert;
@@ -55,7 +57,7 @@ public class EntryCacheTest extends MockedBookKeeperTestCase {
ml = mock(ManagedLedgerImpl.class);
when(ml.getName()).thenReturn("name");
when(ml.getExecutor()).thenReturn(executor);
- when(ml.getMBean()).thenReturn(new ManagedLedgerMBeanImpl(ml));
+ when(ml.getMbean()).thenReturn(new ManagedLedgerMBeanImpl(ml));
}
@Test(timeOut = 5000)
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
index b5358987d6e..511a5f9149a 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
@@ -48,6 +48,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerAlready
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.testng.annotations.Test;
@@ -216,12 +217,12 @@ public class ManagedLedgerBkTest extends BookKeeperClusterTestCase {
future.get();
}
- cacheManager.mlFactoryMBean.refreshStats(1, TimeUnit.SECONDS);
+ factory.getMbean().refreshStats(1, TimeUnit.SECONDS);
- assertTrue(cacheManager.mlFactoryMBean.getCacheHitsRate() > 0.0);
- assertEquals(cacheManager.mlFactoryMBean.getCacheMissesRate(), 0.0);
- assertTrue(cacheManager.mlFactoryMBean.getCacheHitsThroughput() > 0.0);
- assertEquals(cacheManager.mlFactoryMBean.getNumberOfCacheEvictions(), 0);
+ assertTrue(factory.getMbean().getCacheHitsRate() > 0.0);
+ assertEquals(factory.getMbean().getCacheMissesRate(), 0.0);
+ assertTrue(factory.getMbean().getCacheHitsThroughput() > 0.0);
+ assertEquals(factory.getMbean().getNumberOfCacheEvictions(), 0);
}
@Test
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index ded0f4990e9..32265cd49ac 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -108,23 +108,26 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
+import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
+import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
-import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
-import org.apache.pulsar.metadata.api.extended.SessionEvent;
-import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
+import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Stat;
+import org.apache.pulsar.metadata.api.extended.SessionEvent;
+import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
+import org.powermock.reflect.Whitebox;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -2585,9 +2588,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
Set<ManagedCursor> activeCursors = Sets.newHashSet();
activeCursors.add(cursor1);
activeCursors.add(cursor2);
- Field cacheField = ManagedLedgerImpl.class.getDeclaredField("entryCache");
- cacheField.setAccessible(true);
- EntryCacheImpl entryCache = (EntryCacheImpl) cacheField.get(ledger);
+ EntryCache entryCache = Whitebox.getInternalState(ledger, "entryCache");
Iterator<ManagedCursor> activeCursor = ledger.getActiveCursors().iterator();
@@ -2659,10 +2660,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
@Test
public void testActiveDeactiveCursor() throws Exception {
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("cache_eviction_ledger");
-
- Field cacheField = ManagedLedgerImpl.class.getDeclaredField("entryCache");
- cacheField.setAccessible(true);
- EntryCacheImpl entryCache = (EntryCacheImpl) cacheField.get(ledger);
+ EntryCache entryCache = Whitebox.getInternalState(ledger, "entryCache");
final int totalInsertedEntries = 20;
for (int i = 0; i < totalInsertedEntries; i++) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index b8f41fa4c40..2dee32f12c8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -19,7 +19,7 @@
package org.apache.pulsar.broker.service.nonpersistent;
import static com.google.common.base.Preconditions.checkArgument;
-import static org.apache.bookkeeper.mledger.impl.EntryCacheManager.create;
+import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl.create;
import static org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import com.carrotsearch.hppc.ObjectObjectHashMap;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/AllocatorStatsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/AllocatorStatsGenerator.java
index 43dbd642f80..5dd4c0b0402 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/AllocatorStatsGenerator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/AllocatorStatsGenerator.java
@@ -25,7 +25,7 @@ import io.netty.buffer.PoolChunkMetric;
import io.netty.buffer.PoolSubpageMetric;
import io.netty.buffer.PooledByteBufAllocator;
import java.util.stream.Collectors;
-import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
+import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl;
import org.apache.pulsar.common.stats.AllocatorStats;
import org.apache.pulsar.common.stats.AllocatorStats.PoolArenaStats;
import org.apache.pulsar.common.stats.AllocatorStats.PoolChunkListStats;
@@ -38,7 +38,7 @@ public class AllocatorStatsGenerator {
if ("default".equals(allocatorName)) {
allocator = PooledByteBufAllocator.DEFAULT;
} else if ("ml-cache".equals(allocatorName)) {
- allocator = EntryCacheImpl.ALLOCATOR;
+ allocator = RangeEntryCacheImpl.ALLOCATOR;
} else {
throw new IllegalArgumentException("Invalid allocator name : " + allocatorName);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java
index 2474ce2b8de..8e46b4cf254 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java
@@ -25,7 +25,7 @@ import io.netty.buffer.PoolChunkMetric;
import io.netty.buffer.PooledByteBufAllocator;
import java.util.List;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryMXBean;
-import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
+import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.common.stats.Metrics;
@@ -54,7 +54,7 @@ public class ManagedLedgerCacheMetrics extends AbstractMetrics {
m.put("brk_ml_cache_hits_throughput", mlCacheStats.getCacheHitsThroughput());
m.put("brk_ml_cache_misses_throughput", mlCacheStats.getCacheMissesThroughput());
- PooledByteBufAllocator allocator = EntryCacheImpl.ALLOCATOR;
+ PooledByteBufAllocator allocator = RangeEntryCacheImpl.ALLOCATOR;
long activeAllocations = 0;
long activeAllocationsSmall = 0;
long activeAllocationsNormal = 0;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index 7a90b10fa7e..c3f6a634a11 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -48,9 +48,9 @@ import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.apache.bookkeeper.mledger.ManagedCursor;
-import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.MessageRedeliveryController;
@@ -339,7 +339,7 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
ManagedLedgerImpl ledger = (ManagedLedgerImpl) topicRef.getManagedLedger();
Field cacheField = ManagedLedgerImpl.class.getDeclaredField("entryCache");
cacheField.setAccessible(true);
- EntryCacheImpl entryCache = (EntryCacheImpl) cacheField.get(ledger);
+ EntryCache entryCache = (EntryCache) cacheField.get(ledger);
/************* Validation on non-empty active-cursor **************/
// (4) Get ActiveCursor : which is list of active subscription
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index f4845c08c55..8205dbe935b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -75,8 +75,8 @@ import lombok.Data;
import lombok.EqualsAndHashCode;
import org.apache.avro.Schema.Parser;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
-import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -1058,7 +1058,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get();
ManagedLedgerImpl ledger = (ManagedLedgerImpl) topicRef.getManagedLedger();
- EntryCacheImpl entryCache = spy((EntryCacheImpl) Whitebox.getInternalState(ledger, "entryCache"));
+ EntryCache entryCache = spy((EntryCache) Whitebox.getInternalState(ledger, "entryCache"));
Whitebox.setInternalState(ledger, "entryCache", entryCache);
Message<byte[]> msg;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
index a36dd359e59..9679620c357 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
@@ -47,8 +47,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import lombok.Cleanup;
-import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
@@ -648,7 +648,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get();
ManagedLedgerImpl ledger = (ManagedLedgerImpl) topicRef.getManagedLedger();
- EntryCacheImpl entryCache = spy((EntryCacheImpl) Whitebox.getInternalState(ledger, "entryCache"));
+ EntryCache entryCache = spy((EntryCache) Whitebox.getInternalState(ledger, "entryCache"));
Whitebox.setInternalState(ledger, "entryCache", entryCache);
Message<byte[]>msg = null;