You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2022/07/26 09:32:44 UTC
[bookkeeper] branch master updated: Fix autoRecover memory leak. (#3361)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new da1b29a510 Fix autoRecover memory leak. (#3361)
da1b29a510 is described below
commit da1b29a510fa924f612d22f36b3c73d61d7953d2
Author: Yan Zhao <ho...@apache.org>
AuthorDate: Tue Jul 26 17:32:40 2022 +0800
Fix autoRecover memory leak. (#3361)
---
.../bookkeeper/meta/AbstractZkLedgerManager.java | 42 ++++++++++++++++++++++
.../meta/AbstractZkLedgerManagerTest.java | 7 ++++
.../zookeeper/MockZooKeeperTestCase.java | 32 +++++++++++++++++
3 files changed, 81 insertions(+)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
index cda93704e0..0d4d6ee425 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
@@ -51,6 +51,7 @@ import org.apache.bookkeeper.util.ZkUtils;
import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.AsyncCallback.DataCallback;
import org.apache.zookeeper.AsyncCallback.StatCallback;
@@ -154,6 +155,30 @@ public abstract class AbstractZkLedgerManager implements LedgerManager, Watcher
}
}
+ /**
+ * CancelWatchLedgerMetadataTask class.
+ */
+ protected class CancelWatchLedgerMetadataTask implements Runnable {
+
+ final long ledgerId;
+
+ CancelWatchLedgerMetadataTask(long ledgerId) {
+ this.ledgerId = ledgerId;
+ }
+
+ @Override
+ public void run() {
+ Set<LedgerMetadataListener> listeners = AbstractZkLedgerManager.this.listeners.get(ledgerId);
+ if (!CollectionUtils.isEmpty(listeners)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Still watch ledgerId: {}, ignore this unwatch task.", ledgerId);
+ }
+ return;
+ }
+ cancelMetadataWatch(ledgerId, AbstractZkLedgerManager.this);
+ }
+ }
+
/**
* ZooKeeper-based Ledger Manager Constructor.
*
@@ -420,11 +445,28 @@ public abstract class AbstractZkLedgerManager implements LedgerManager, Watcher
}
if (listenerSet.isEmpty()) {
listeners.remove(ledgerId, listenerSet);
+ new CancelWatchLedgerMetadataTask(ledgerId).run();
}
}
}
}
+ private void cancelMetadataWatch(long ledgerId, Watcher watcher) {
+ zk.removeWatches(getLedgerPath(ledgerId), watcher, WatcherType.Data, true, new VoidCallback() {
+ @Override
+ public void processResult(int rc, String path, Object o) {
+ if (rc != KeeperException.Code.OK.intValue()) {
+ LOG.error("Cancel watch ledger {} metadata failed.", ledgerId,
+ KeeperException.create(KeeperException.Code.get(rc), path));
+ return;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cancel watch ledger {} metadata succeed.", ledgerId);
+ }
+ }
+ }, null);
+ }
+
@Override
public CompletableFuture<Versioned<LedgerMetadata>> readLedgerMetadata(long ledgerId) {
return readLedgerMetadata(ledgerId, null);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java
index 5c6a514a7e..720ed3a594 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java
@@ -824,9 +824,16 @@ public class AbstractZkLedgerManagerTest extends MockZooKeeperTestCase {
ledgerStr, true,
KeeperException.Code.OK.intValue(), serDe.serialize(metadata), stat);
+ mockZkRemoveWatcher();
+
// unregister the listener
ledgerManager.unregisterLedgerMetadataListener(ledgerId, listener);
assertFalse(ledgerManager.listeners.containsKey(ledgerId));
+ assertFalse(watchers.containsKey(ledgerStr));
+ verify(mockZk, times(1)).removeWatches(eq(ledgerManager.getLedgerPath(ledgerId)),
+ any(Watcher.class), any(Watcher.WatcherType.class), any(Boolean.class),
+ any(VoidCallback.class), any());
+
// notify the watcher event
notifyWatchedEvent(
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/MockZooKeeperTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/MockZooKeeperTestCase.java
index 4190bf5c42..fdabc5e0d4 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/MockZooKeeperTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/MockZooKeeperTestCase.java
@@ -40,6 +40,7 @@ import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.AsyncCallback.StringCallback;
import org.apache.zookeeper.AsyncCallback.VoidCallback;
import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
@@ -83,6 +84,20 @@ public abstract class MockZooKeeperTestCase {
watcherSet.add(watcher);
}
+ private void removeWatcher(String path, Watcher watcher) {
+ if (watcher == null) {
+ return;
+ }
+ Set<Watcher> watcherSet = watchers.get(path);
+ if (null == watcherSet) {
+ return;
+ }
+ watcherSet.remove(watcher);
+ if (watcherSet.isEmpty()) {
+ watchers.remove(path);
+ }
+ }
+
protected void mockZkUtilsAsyncCreateFullPathOptimistic(
String expectedLedgerPath,
CreateMode expectedCreateMode,
@@ -187,7 +202,24 @@ public abstract class MockZooKeeperTestCase {
expectedWatcher ? any(Watcher.class) : eq(null),
any(DataCallback.class),
any());
+ }
+ protected void mockZkRemoveWatcher () throws Exception {
+ doAnswer(invocationOnMock -> {
+ String path = invocationOnMock.getArgument(0);
+ Watcher watcher = invocationOnMock.getArgument(1);
+ VoidCallback callback = invocationOnMock.getArgument(4);
+ removeWatcher(path, watcher);
+
+ callback.processResult(KeeperException.Code.OK.intValue(), path, null);
+ return null;
+ }).when(mockZk).removeWatches(
+ any(String.class),
+ any(Watcher.class),
+ any(Watcher.WatcherType.class),
+ any(Boolean.class),
+ any(VoidCallback.class),
+ any());
}
protected void mockZkSetData(