You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2022/07/26 09:32:44 UTC

[bookkeeper] branch master updated: Fix autoRecover memory leak. (#3361)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new da1b29a510 Fix autoRecover memory leak. (#3361)
da1b29a510 is described below

commit da1b29a510fa924f612d22f36b3c73d61d7953d2
Author: Yan Zhao <ho...@apache.org>
AuthorDate: Tue Jul 26 17:32:40 2022 +0800

    Fix autoRecover memory leak. (#3361)
---
 .../bookkeeper/meta/AbstractZkLedgerManager.java   | 42 ++++++++++++++++++++++
 .../meta/AbstractZkLedgerManagerTest.java          |  7 ++++
 .../zookeeper/MockZooKeeperTestCase.java           | 32 +++++++++++++++++
 3 files changed, 81 insertions(+)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
index cda93704e0..0d4d6ee425 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
@@ -51,6 +51,7 @@ import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.bookkeeper.versioning.LongVersion;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.AsyncCallback.DataCallback;
 import org.apache.zookeeper.AsyncCallback.StatCallback;
@@ -154,6 +155,30 @@ public abstract class AbstractZkLedgerManager implements LedgerManager, Watcher
         }
     }
 
+    /**
+     * CancelWatchLedgerMetadataTask class.
+     */
+    protected class CancelWatchLedgerMetadataTask implements Runnable {
+
+        final long ledgerId;
+
+        CancelWatchLedgerMetadataTask(long ledgerId) {
+            this.ledgerId = ledgerId;
+        }
+
+        @Override
+        public void run() {
+            Set<LedgerMetadataListener> listeners = AbstractZkLedgerManager.this.listeners.get(ledgerId);
+            if (!CollectionUtils.isEmpty(listeners)) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Still watch ledgerId: {}, ignore this unwatch task.", ledgerId);
+                }
+                return;
+            }
+            cancelMetadataWatch(ledgerId, AbstractZkLedgerManager.this);
+        }
+    }
+
     /**
      * ZooKeeper-based Ledger Manager Constructor.
      *
@@ -420,11 +445,28 @@ public abstract class AbstractZkLedgerManager implements LedgerManager, Watcher
                 }
                 if (listenerSet.isEmpty()) {
                     listeners.remove(ledgerId, listenerSet);
+                    new CancelWatchLedgerMetadataTask(ledgerId).run();
                 }
             }
         }
     }
 
+    private void cancelMetadataWatch(long ledgerId, Watcher watcher) {
+        zk.removeWatches(getLedgerPath(ledgerId), watcher, WatcherType.Data, true, new VoidCallback() {
+            @Override
+            public void processResult(int rc, String path, Object o) {
+                if (rc != KeeperException.Code.OK.intValue()) {
+                    LOG.error("Cancel watch ledger {} metadata failed.", ledgerId,
+                            KeeperException.create(KeeperException.Code.get(rc), path));
+                    return;
+                }
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Cancel watch ledger {} metadata succeed.", ledgerId);
+                }
+            }
+        }, null);
+    }
+
     @Override
     public CompletableFuture<Versioned<LedgerMetadata>> readLedgerMetadata(long ledgerId) {
         return readLedgerMetadata(ledgerId, null);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java
index 5c6a514a7e..720ed3a594 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java
@@ -824,9 +824,16 @@ public class AbstractZkLedgerManagerTest extends MockZooKeeperTestCase {
             ledgerStr, true,
             KeeperException.Code.OK.intValue(), serDe.serialize(metadata), stat);
 
+        mockZkRemoveWatcher();
+
         // unregister the listener
         ledgerManager.unregisterLedgerMetadataListener(ledgerId, listener);
         assertFalse(ledgerManager.listeners.containsKey(ledgerId));
+        assertFalse(watchers.containsKey(ledgerStr));
+        verify(mockZk, times(1)).removeWatches(eq(ledgerManager.getLedgerPath(ledgerId)),
+                any(Watcher.class), any(Watcher.WatcherType.class), any(Boolean.class),
+                any(VoidCallback.class), any());
+
 
         // notify the watcher event
         notifyWatchedEvent(
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/MockZooKeeperTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/MockZooKeeperTestCase.java
index 4190bf5c42..fdabc5e0d4 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/MockZooKeeperTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/MockZooKeeperTestCase.java
@@ -40,6 +40,7 @@ import org.apache.zookeeper.AsyncCallback.StatCallback;
 import org.apache.zookeeper.AsyncCallback.StringCallback;
 import org.apache.zookeeper.AsyncCallback.VoidCallback;
 import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.EventType;
@@ -83,6 +84,20 @@ public abstract class MockZooKeeperTestCase {
         watcherSet.add(watcher);
     }
 
+    private void removeWatcher(String path, Watcher watcher) {
+        if (watcher == null) {
+            return;
+        }
+        Set<Watcher> watcherSet = watchers.get(path);
+        if (null == watcherSet) {
+            return;
+        }
+        watcherSet.remove(watcher);
+        if (watcherSet.isEmpty()) {
+            watchers.remove(path);
+        }
+    }
+
     protected void mockZkUtilsAsyncCreateFullPathOptimistic(
         String expectedLedgerPath,
         CreateMode expectedCreateMode,
@@ -187,7 +202,24 @@ public abstract class MockZooKeeperTestCase {
             expectedWatcher ? any(Watcher.class) : eq(null),
             any(DataCallback.class),
             any());
+    }
 
+    protected void mockZkRemoveWatcher () throws Exception {
+        doAnswer(invocationOnMock -> {
+            String path = invocationOnMock.getArgument(0);
+            Watcher watcher = invocationOnMock.getArgument(1);
+            VoidCallback callback = invocationOnMock.getArgument(4);
+            removeWatcher(path, watcher);
+
+            callback.processResult(KeeperException.Code.OK.intValue(), path, null);
+            return null;
+        }).when(mockZk).removeWatches(
+                any(String.class),
+                any(Watcher.class),
+                any(Watcher.WatcherType.class),
+                any(Boolean.class),
+                any(VoidCallback.class),
+                any());
     }
 
     protected void mockZkSetData(