You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by mm...@apache.org on 2021/10/23 04:47:22 UTC

[bookkeeper] 01/02: Eliminate direct ZK access in ScanAndCompareGarbageCollector (#2833)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-4.14
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 577121fd5d02613d72f2e4e6ece2ef2b66490706
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Oct 18 08:46:07 2021 -0700

    Eliminate direct ZK access in ScanAndCompareGarbageCollector (#2833)
    
    * Eliminate direct ZK access in ScanAndCompareGarbageCollector
    
    * Removed unused imports
    
    * Fixed zk ACLs
    
    * Addressed comments
    
    * Fixed checkstyle
---
 .../bookie/ScanAndCompareGarbageCollector.java     | 82 +++++++++++++---------
 .../meta/LedgerUnderreplicationManager.java        |  6 ++
 .../meta/ZkLedgerUnderreplicationManager.java      | 27 ++++---
 3 files changed, 69 insertions(+), 46 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java
index cff0250..82f8924 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java
@@ -25,6 +25,7 @@ import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 
 import com.google.common.collect.Sets;
 import java.io.IOException;
+import java.net.URI;
 import java.util.List;
 import java.util.NavigableSet;
 import java.util.Set;
@@ -36,22 +37,22 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import lombok.Cleanup;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManager.LedgerRange;
 import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
-import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
-import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
+import org.apache.bookkeeper.meta.MetadataBookieDriver;
+import org.apache.bookkeeper.meta.MetadataDrivers;
+import org.apache.bookkeeper.meta.exceptions.MetadataException;
 import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.ACL;
+import org.apache.commons.configuration.ConfigurationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,26 +69,22 @@ import org.slf4j.LoggerFactory;
  * <b>globalActiveLedgers</b>, do garbage collection on them.
  * </ul>
  * </p>
- *
- * <p>TODO: eliminate the direct usage of zookeeper here {@link https://github.com/apache/bookkeeper/issues/1331}
  */
 public class ScanAndCompareGarbageCollector implements GarbageCollector {
 
     static final Logger LOG = LoggerFactory.getLogger(ScanAndCompareGarbageCollector.class);
-    static final int MAX_CONCURRENT_ZK_REQUESTS = 1000;
+    static final int MAX_CONCURRENT_METADATA_REQUESTS = 1000;
 
     private final LedgerManager ledgerManager;
     private final CompactableLedgerStorage ledgerStorage;
     private final ServerConfiguration conf;
     private final BookieId selfBookieAddress;
-    private ZooKeeper zk = null;
     private boolean enableGcOverReplicatedLedger;
     private final long gcOverReplicatedLedgerIntervalMillis;
     private long lastOverReplicatedLedgerGcTimeMillis;
-    private final String zkServers;
-    private final String zkLedgersRootPath;
     private final boolean verifyMetadataOnGc;
     private int activeLedgerCounter;
+    private StatsLogger statsLogger;
 
     public ScanAndCompareGarbageCollector(LedgerManager ledgerManager, CompactableLedgerStorage ledgerStorage,
             ServerConfiguration conf, StatsLogger statsLogger) throws IOException {
@@ -95,13 +92,13 @@ public class ScanAndCompareGarbageCollector implements GarbageCollector {
         this.ledgerStorage = ledgerStorage;
         this.conf = conf;
         this.selfBookieAddress = Bookie.getBookieId(conf);
+        this.statsLogger = statsLogger;
+
         this.gcOverReplicatedLedgerIntervalMillis = conf.getGcOverreplicatedLedgerWaitTimeMillis();
         this.lastOverReplicatedLedgerGcTimeMillis = System.currentTimeMillis();
         if (gcOverReplicatedLedgerIntervalMillis > 0) {
             this.enableGcOverReplicatedLedger = true;
         }
-        this.zkServers = ZKMetadataDriverBase.resolveZkServers(conf);
-        this.zkLedgersRootPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(conf);
         LOG.info("Over Replicated Ledger Deletion : enabled=" + enableGcOverReplicatedLedger + ", interval="
                 + gcOverReplicatedLedgerIntervalMillis);
 
@@ -132,8 +129,6 @@ public class ScanAndCompareGarbageCollector implements GarbageCollector {
             boolean checkOverreplicatedLedgers = (enableGcOverReplicatedLedger && curTime
                     - lastOverReplicatedLedgerGcTimeMillis > gcOverReplicatedLedgerIntervalMillis);
             if (checkOverreplicatedLedgers) {
-                zk = ZooKeeperClient.newBuilder().connectString(zkServers)
-                        .sessionTimeoutMs(conf.getZkTimeout()).build();
                 // remove all the overreplicated ledgers from the local bookie
                 Set<Long> overReplicatedLedgers = removeOverReplicatedledgers(bkActiveLedgers, garbageCleaner);
                 if (overReplicatedLedgers.isEmpty()) {
@@ -215,37 +210,36 @@ public class ScanAndCompareGarbageCollector implements GarbageCollector {
         } catch (Throwable t) {
             // ignore exception, collecting garbage next time
             LOG.warn("Exception when iterating over the metadata", t);
-        } finally {
-            if (zk != null) {
-                try {
-                    zk.close();
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                    LOG.error("Error closing zk session", e);
-                }
-                zk = null;
-            }
         }
     }
 
     private Set<Long> removeOverReplicatedledgers(Set<Long> bkActiveledgers, final GarbageCleaner garbageCleaner)
-            throws InterruptedException, KeeperException {
-        final List<ACL> zkAcls = ZkUtils.getACLs(conf);
+            throws Exception {
         final Set<Long> overReplicatedLedgers = Sets.newHashSet();
-        final Semaphore semaphore = new Semaphore(MAX_CONCURRENT_ZK_REQUESTS);
+        final Semaphore semaphore = new Semaphore(MAX_CONCURRENT_METADATA_REQUESTS);
         final CountDownLatch latch = new CountDownLatch(bkActiveledgers.size());
+        // instantiate zookeeper client to initialize ledger manager
+
+        @Cleanup
+        MetadataBookieDriver metadataDriver = instantiateMetadataDriver(conf, statsLogger);
+
+        @Cleanup
+        LedgerManagerFactory lmf = metadataDriver.getLedgerManagerFactory();
+
+        @Cleanup
+        LedgerUnderreplicationManager lum = lmf.newLedgerUnderreplicationManager();
+
         for (final Long ledgerId : bkActiveledgers) {
             try {
                 // check if the ledger is being replicated already by the replication worker
-                if (ZkLedgerUnderreplicationManager.isLedgerBeingReplicated(zk, zkLedgersRootPath, ledgerId)) {
+                if (lum.isLedgerBeingReplicated(ledgerId)) {
                     latch.countDown();
                     continue;
                 }
                 // we try to acquire the underreplicated ledger lock to not let the bookie replicate the ledger that is
                 // already being checked for deletion, since that might change the ledger ensemble to include the
                 // current bookie again and, in that case, we cannot remove the ledger from local storage
-                ZkLedgerUnderreplicationManager.acquireUnderreplicatedLedgerLock(zk, zkLedgersRootPath, ledgerId,
-                        zkAcls);
+                lum.acquireUnderreplicatedLedger(ledgerId);
                 semaphore.acquire();
                 ledgerManager.readLedgerMetadata(ledgerId)
                     .whenComplete((metadata, exception) -> {
@@ -273,8 +267,7 @@ public class ScanAndCompareGarbageCollector implements GarbageCollector {
                                 semaphore.release();
                                 latch.countDown();
                                 try {
-                                    ZkLedgerUnderreplicationManager.releaseUnderreplicatedLedgerLock(
-                                            zk, zkLedgersRootPath, ledgerId);
+                                    lum.releaseUnderreplicatedLedger(ledgerId);
                                 } catch (Throwable t) {
                                     LOG.error("Exception when removing underreplicated lock for ledger {}",
                                               ledgerId, t);
@@ -290,4 +283,23 @@ public class ScanAndCompareGarbageCollector implements GarbageCollector {
         bkActiveledgers.removeAll(overReplicatedLedgers);
         return overReplicatedLedgers;
     }
+
+    private static MetadataBookieDriver instantiateMetadataDriver(ServerConfiguration conf, StatsLogger statsLogger)
+            throws BookieException {
+        try {
+            String metadataServiceUriStr = conf.getMetadataServiceUri();
+            MetadataBookieDriver driver = MetadataDrivers.getBookieDriver(URI.create(metadataServiceUriStr));
+            driver.initialize(
+                    conf,
+                    () -> {
+                    },
+                    statsLogger);
+            return driver;
+        } catch (MetadataException me) {
+            throw new BookieException.MetadataStoreException("Failed to initialize metadata bookie driver", me);
+        } catch (ConfigurationException e) {
+            throw new BookieException.BookieIllegalOpException(e);
+        }
+    }
+
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
index 0718fb5..ac468d3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
@@ -45,6 +45,11 @@ public interface LedgerUnderreplicationManager extends AutoCloseable {
     }
 
     /**
+     * Check whether the ledger is being replicated by any bookie.
+     */
+    boolean isLedgerBeingReplicated(long ledgerId) throws ReplicationException;
+
+    /**
      * Mark a ledger as underreplicated with missing bookies. The replication should then
      * check which fragements are underreplicated and rereplicate them.
      *
@@ -105,6 +110,7 @@ public interface LedgerUnderreplicationManager extends AutoCloseable {
     long pollLedgerToRereplicate()
             throws ReplicationException.UnavailableException;
 
+    void acquireUnderreplicatedLedger(long ledgerId) throws ReplicationException;
 
     /**
      * Release a previously acquired ledger. This allows others to acquire the ledger.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
index 266570f..680264a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
@@ -770,10 +770,13 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
     /**
      * Check whether the ledger is being replicated by any bookie.
      */
-    public static boolean isLedgerBeingReplicated(ZooKeeper zkc, String zkLedgersRootPath, long ledgerId)
-            throws KeeperException,
-            InterruptedException {
-        return zkc.exists(getUrLedgerLockZnode(getUrLockPath(zkLedgersRootPath), ledgerId), false) != null;
+    @Override
+    public boolean isLedgerBeingReplicated(long ledgerId) throws ReplicationException {
+        try {
+            return zkc.exists(getUrLedgerLockZnode(urLockPath, ledgerId), false) != null;
+        } catch (Exception e) {
+            throw new ReplicationException.UnavailableException("Failed to check ledger lock", e);
+        }
     }
 
     /**
@@ -786,13 +789,15 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
                 LOCK_DATA, zkAcls, CreateMode.EPHEMERAL);
     }
 
-    /**
-     * Release the underreplicated ledger lock if it exists.
-     */
-    public static void releaseUnderreplicatedLedgerLock(ZooKeeper zkc, String zkLedgersRootPath, long ledgerId)
-            throws InterruptedException, KeeperException {
-        if (isLedgerBeingReplicated(zkc, zkLedgersRootPath, ledgerId)) {
-            zkc.delete(getUrLedgerLockZnode(getUrLockPath(zkLedgersRootPath), ledgerId), -1);
+    @Override
+    public void acquireUnderreplicatedLedger(long ledgerId)
+            throws ReplicationException  {
+        try {
+            acquireUnderreplicatedLedgerLock(zkc, getUrLedgerLockZnode(urLockPath, ledgerId), ledgerId,
+                    ZkUtils.getACLs(conf));
+        } catch (Exception e) {
+            throw new ReplicationException.UnavailableException(
+                    "Failed to acquire underreplicated ledger lock for " + ledgerId, e);
         }
     }