You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by yo...@apache.org on 2023/06/19 07:42:39 UTC

[bookkeeper] 15/31: Fix ledger replicated failed blocks bookie decommission process (#3917)

This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-4.16
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit db41a5c33f11b4f9e1c3aae7a81badf72ef1ab56
Author: Hang Chen <ch...@apache.org>
AuthorDate: Mon Jun 19 15:07:05 2023 +0800

    Fix ledger replicated failed blocks bookie decommission process (#3917)
    
    ### Motivation
    When I decommission one bookie (bk3), one ledger replicate failed and blocked decommission process.
    
    This is the auto-recovery log:
    ```
    2023-03-29T06:29:22,642+0000 [ReplicationWorker] ERROR org.apache.bookkeeper.client.LedgerHandle - ReadEntries exception on ledgerId:904368 firstEntry:14 lastEntry:14 lastAddConfirmed:13
    2023-03-29T06:29:22,642+0000 [ReplicationWorker] ERROR org.apache.bookkeeper.replication.ReplicationWorker - Received error: -1 while trying to read entry: 14 of ledger: 904368 in ReplicationWorker
    2023-03-29T06:29:22,642+0000 [ReplicationWorker] ERROR org.apache.bookkeeper.replication.ReplicationWorker - Failed to read faulty entries, so giving up replicating ledgerFragment Fragment(LedgerID: 904368, FirstEntryID: 0[0], LastKnownEntryID: 14[14], Host: [betausc1-bk-10.betausc1-bk-headless.o-vaxkx.svc.cluster.local:3181], Closed: true)
    2023-03-29T06:29:22,644+0000 [ReplicationWorker] ERROR org.apache.bookkeeper.replication.ReplicationWorker - ReplicationWorker failed to replicate Ledger : 904368 for 6 number of times, so deferring the ledger lock release by 300000 msecs
    ```
    The ledger's metadata:
    ```
    ledgerID: 904368
    2023-03-29T06:47:56,511+0000 [main] INFO  org.apache.bookkeeper.tools.cli.commands.
    client.LedgerMetaDataCommand - LedgerMetadata{formatVersion=3, ensembleSize=3, writeQuorumSize=3,
    ackQuorumSize=2, state=OPEN, digestType=CRC32C, password=base64:,
    ensembles={0=[bk1:3181, bk2:3181, bk3:3181], 15=[bk1:3181, bk2:3181, bk4:3181]},...}
    ```
    
    The ledger (904368) has two ensembles, `ensembles={0=[bk1:3181, bk2:3181, bk3:3181], 15=[bk1:3181, bk2:3181, bk4:3181]}`. However, the replication worker got the ledger's LAC is 13, but it got the replication fragment entry range is [0, 14]. When reading entry 14, it failed.
    
    ### One question
    **Why the ensembles created a new ensemble starting with entryId = 15, but the ledger's lastAddConfirm is 13.**
    
    This question is related to two parts, one is how the new ensemble was created and the other is how the lastAddConfirm was generated.
    
    #### 1. How the new ensemble was created
    The ensemble change is controlled on the bookie client side.
    
    When one entry is ready to send to the bookie server, the bookie client will check whether need to do the ensemble change.
    https://github.com/apache/bookkeeper/blob/912896deb2e748389e15e74c37539b2ff36302c7/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java#L254
    
    For the above case, when writing entry 15, one bookie is lost, it will trigger the ensemble change and generate the new ensemble: 15=[bk1:3181, bk2:3181, bk4:3181]. However, entry 15 write failed, such as timeout or bookie server rejected the write.
    
    For now, entry 14 is written succeed.
    
    #### 2. How the lastAddConfirm was generated
    Due to the ledger being in the `OPEN` state, the ledger handle will send a readLAC request according to the last ensemble to get the ledger's lastAddConfirm.
    
    For the above case, the readLAC request will send to bk1, bk2, and bk4.
    
    For the `V2` protocol (Pulsar uses the V2 protocol to interact with the BookKeeper cluster), the bookie client put the lastAddConfirm EntryId in the next Entry's metadata.
    https://github.com/apache/bookkeeper/blob/df4492012cc03682534cbc8dd68dd81163b0c947/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java#L134
    
    When we use the `V2` protocol to open an `OPEN` state ledger to read, it will send a readLastAddConfirm request to the bookie server, and the bookie server gets the last entry of this ledger and return to the client.
    https://github.com/apache/bookkeeper/blob/df4492012cc03682534cbc8dd68dd81163b0c947/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java#L108
    
    However, the bookie client will parse the response entry and get the lastAddConfirm from the entry's metadata. Due to the entry just recording the previous EntryId as the lastAddConfirm, the LedgerHandle got the lastAddConfirm will be the penultimate EntryId of the ledger.
    
    For the above case, the bk1 holds the max entry 14, bk2 holds the max entry 14, and bk4 returns NoSuchEntryException, LedgerHandle gets lastAddConfirm will be `14  - 1 = 13`, not 14.
    
    When the replicator tries to recover the first ensemble 0=[bk1:3181, bk2:3181, bk3:3181] with entry range [0, 14],  reading entry 14 will throw a ReadEntryException due to the lastAddConfirm is 13.
    https://github.com/apache/bookkeeper/blob/df4492012cc03682534cbc8dd68dd81163b0c947/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java#L685-L690
    
    ### Solution
    When encountered that case that
    - The ledger is `OPEN`
    - The ledger has multiple ensembles
    - The ledger's last ensemble doesn't have any entries, which means `lastAddConfirm < last ensemble key - 1`
    
    We should treat the penultimate segment/ensemble of the ledger as an `OPEN` state instead of a closed state.
    https://github.com/apache/bookkeeper/blob/df4492012cc03682534cbc8dd68dd81163b0c947/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java#L56-L57
    
    After we treat the segment/ensemble as `OPEN` state, the replicator will close the ledger first and replicate it.
    
    (cherry picked from commit eff38e45e317b90eb1cd456bd5d5629dedc1fd5f)
---
 .../apache/bookkeeper/client/LedgerFragment.java   |  7 +-
 .../bookkeeper/replication/ReplicationWorker.java  | 14 ++++
 .../bookkeeper/client/BookieWriteLedgerTest.java   | 75 ++++++++++++++++++++++
 .../replication/ReplicationTestUtil.java           |  2 +-
 .../replication/TestReplicationWorker.java         | 27 ++++++++
 5 files changed, 123 insertions(+), 2 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
index 6a09922f67..fba45aff11 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
@@ -53,8 +53,13 @@ public class LedgerFragment {
         this.schedule = lh.getDistributionSchedule();
         SortedMap<Long, ? extends List<BookieId>> ensembles = lh
                 .getLedgerMetadata().getAllEnsembles();
+        // Check if the ledger fragment is closed has two conditions
+        // 1. The ledger is closed
+        // 2. This fragment is not the last fragment and this ledger's lastAddConfirm >= ensembles.lastKey() - 1.
+        //    This case happens when the ledger's last ensemble is empty
         this.isLedgerClosed = lh.getLedgerMetadata().isClosed()
-                || !ensemble.equals(ensembles.get(ensembles.lastKey()));
+                || (!ensemble.equals(ensembles.get(ensembles.lastKey()))
+            && lh.getLastAddConfirmed() >= ensembles.lastKey() - 1);
     }
 
     public LedgerFragment(LedgerFragment lf, Set<Integer> subset) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
index 942c09d4fa..f22231c567 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
@@ -34,6 +34,7 @@ import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -45,6 +46,7 @@ import java.util.Set;
 import java.util.SortedMap;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CountDownLatch;
@@ -561,6 +563,14 @@ public class ReplicationWorker implements Runnable {
      *
      * <p>Missing bookies in closed ledgers are fine, as we know the last confirmed add, so
      * we can tell which entries are supposed to exist and rereplicate them if necessary.
+     *
+     * <p>Another corner case is that there are multiple ensembles in the ledger and the last
+     * segment/ensemble is open, but nothing has been written to some quorums in the ensemble.
+     * For the v2 protocol, this ledger's lastAddConfirm entry is the last segment/ensemble's `key - 2`,
+     * not `key - 2`, the explanation please refer to: https://github.com/apache/bookkeeper/pull/3917.
+     * If we treat the penultimate segment/ensemble as closed state, we will can't replicate
+     * the last entry in the segment. So in this case, we should also check if the penultimate
+     * segment/ensemble has missing bookies.
      */
     private boolean isLastSegmentOpenAndMissingBookies(LedgerHandle lh) throws BKException {
         LedgerMetadata md = admin.getLedgerMetadata(lh);
@@ -570,6 +580,10 @@ public class ReplicationWorker implements Runnable {
 
         SortedMap<Long, ? extends List<BookieId>> ensembles = admin.getLedgerMetadata(lh).getAllEnsembles();
         List<BookieId> finalEnsemble = ensembles.get(ensembles.lastKey());
+        if (ensembles.size() > 1 && lh.getLastAddConfirmed() < ensembles.lastKey() - 1) {
+            finalEnsemble = new ArrayList<>(finalEnsemble);
+            finalEnsemble.addAll((new TreeMap<>(ensembles)).floorEntry(ensembles.lastKey() - 1).getValue());
+        }
         Collection<BookieId> available = admin.getAvailableBookies();
         for (BookieId b : finalEnsemble) {
             if (!available.contains(b)) {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
index 2a57f9952f..3cfd398985 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
@@ -26,6 +26,7 @@ import static org.apache.bookkeeper.client.BookKeeperClientStats.CLIENT_SCOPE;
 import static org.apache.bookkeeper.client.BookKeeperClientStats.READ_OP_DM;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -36,10 +37,12 @@ import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
 import io.netty.buffer.UnpooledByteBufAllocator;
 import java.io.IOException;
+import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.List;
@@ -60,12 +63,22 @@ import org.apache.bookkeeper.client.api.LedgerEntries;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.client.api.WriteAdvHandle;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerMetadataSerDe;
+import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
 import org.apache.bookkeeper.meta.LongHierarchicalLedgerManagerFactory;
+import org.apache.bookkeeper.meta.MetadataBookieDriver;
+import org.apache.bookkeeper.meta.MetadataDrivers;
+import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
 import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.replication.ReplicationTestUtil;
+import org.apache.bookkeeper.replication.ReplicationWorker;
+import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.test.TestStatsProvider;
+import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.commons.lang3.tuple.Pair;
+import org.awaitility.Awaitility;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -1453,6 +1466,68 @@ public class BookieWriteLedgerTest extends
         bkc.deleteLedger(lh.ledgerId);
     }
 
+    @Test
+    public void testReadLacNotSameWithMetadataLedgerReplication() throws Exception {
+       lh = bkc.createLedger(3, 3, 2, digestType, ledgerPassword);
+        for (int i = 0; i < 10; ++i) {
+            ByteBuffer entry = ByteBuffer.allocate(4);
+            entry.putInt(rng.nextInt(maxInt));
+            entry.position(0);
+            lh.addEntry(entry.array());
+        }
+
+        List<BookieId> ensemble = lh.getLedgerMetadata().getAllEnsembles().entrySet().iterator().next().getValue();
+        assertEquals(1, lh.getLedgerMetadata().getAllEnsembles().size());
+        killBookie(ensemble.get(1));
+
+        try {
+            lh.ensembleChangeLoop(ensemble, Collections.singletonMap(1, ensemble.get(1)));
+        } catch (Exception e) {
+            fail();
+        }
+
+        LedgerHandle lh1 = bkc.openLedgerNoRecovery(lh.ledgerId, digestType, ledgerPassword);
+        assertEquals(2, lh1.getLedgerMetadata().getAllEnsembles().size());
+        List<BookieId> firstEnsemble = lh1.getLedgerMetadata().getAllEnsembles().firstEntry().getValue();
+
+        long entryId = lh1.getLedgerMetadata().getAllEnsembles().lastEntry().getKey() - 1;
+        try {
+            lh1.readAsync(entryId, entryId).get();
+            fail();
+        } catch (Exception e) {
+            LOG.info("Failed to read entry: {} ", entryId, e);
+        }
+
+        MetadataBookieDriver driver = MetadataDrivers.getBookieDriver(
+            URI.create(baseConf.getMetadataServiceUri()));
+        driver.initialize(
+            baseConf,
+            NullStatsLogger.INSTANCE);
+        // initialize urReplicationManager
+        LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
+        LedgerUnderreplicationManager underReplicationManager = mFactory.newLedgerUnderreplicationManager();
+        baseConf.setOpenLedgerRereplicationGracePeriod(String.valueOf(30));
+
+
+        ReplicationWorker replicationWorker = new ReplicationWorker(baseConf);
+        replicationWorker.start();
+        String basePath = ZKMetadataDriverBase.resolveZkLedgersRootPath(baseClientConf) + '/'
+            + BookKeeperConstants.UNDER_REPLICATION_NODE
+            + BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH;
+
+        try {
+            underReplicationManager.markLedgerUnderreplicated(lh1.getId(), ensemble.get(1).toString());
+
+            Awaitility.waitAtMost(30, TimeUnit.SECONDS).untilAsserted(() ->
+                assertFalse(ReplicationTestUtil.isLedgerInUnderReplication(zkc, lh1.getId(), basePath))
+            );
+
+            assertNotEquals(firstEnsemble, lh1.getLedgerMetadata().getAllEnsembles().firstEntry().getValue());
+        } finally {
+            replicationWorker.shutdown();
+        }
+    }
+
     @Test
     public void testLedgerMetadataTest() throws Exception {
         baseClientConf.setLedgerMetadataFormatVersion(LedgerMetadataSerDe.METADATA_FORMAT_VERSION_2);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/ReplicationTestUtil.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/ReplicationTestUtil.java
index 65a6f31c35..e36955a0c2 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/ReplicationTestUtil.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/ReplicationTestUtil.java
@@ -32,7 +32,7 @@ public class ReplicationTestUtil {
     /**
      * Checks whether ledger is in under-replication.
      */
-    static boolean isLedgerInUnderReplication(ZooKeeper zkc, long id,
+    public static boolean isLedgerInUnderReplication(ZooKeeper zkc, long id,
             String basePath) throws KeeperException, InterruptedException {
         List<String> children;
         try {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
index fab068ae60..8caad1178c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
@@ -1158,6 +1158,33 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
         }
     }
 
+    @Test
+    public void testReplicateEmptyOpenStateLedger() throws Exception {
+        LedgerHandle lh = bkc.createLedger(3, 3, 2, BookKeeper.DigestType.CRC32, TESTPASSWD);
+        assertFalse(lh.getLedgerMetadata().isClosed());
+
+        List<BookieId> firstEnsemble = lh.getLedgerMetadata().getAllEnsembles().firstEntry().getValue();
+        List<BookieId> ensemble = lh.getLedgerMetadata().getAllEnsembles().entrySet().iterator().next().getValue();
+        killBookie(ensemble.get(1));
+
+        startNewBookie();
+        baseConf.setOpenLedgerRereplicationGracePeriod(String.valueOf(30));
+        ReplicationWorker replicationWorker = new ReplicationWorker(baseConf);
+        replicationWorker.start();
+
+        try {
+            underReplicationManager.markLedgerUnderreplicated(lh.getId(), ensemble.get(1).toString());
+            Awaitility.waitAtMost(60, TimeUnit.SECONDS).untilAsserted(() ->
+                assertFalse(ReplicationTestUtil.isLedgerInUnderReplication(zkc, lh.getId(), basePath))
+            );
+
+            LedgerHandle lh1 = bkc.openLedgerNoRecovery(lh.getId(), BookKeeper.DigestType.CRC32, TESTPASSWD);
+            assertTrue(lh1.getLedgerMetadata().isClosed());
+        } finally {
+            replicationWorker.shutdown();
+        }
+    }
+
     @Test
     public void testRepairedNotAdheringPlacementPolicyLedgerFragmentsOnRack() throws Exception {
         testRepairedNotAdheringPlacementPolicyLedgerFragments(RackawareEnsemblePlacementPolicy.class, null);