You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by yo...@apache.org on 2022/08/01 13:44:49 UTC

[bookkeeper] 03/17: issue #2879 : let bookie quit if journal thread exit (#2887)

This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-4.15
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit aaf496d006fc70a527dfe47b4c7048de4d327580
Author: AloysZhang <lo...@gmail.com>
AuthorDate: Sun Jul 31 11:30:45 2022 +0800

    issue #2879 : let bookie quit if journal thread exit (#2887)
    
    Descriptions of the changes in this PR:
    fix #2879
    This pull request let bookie quit when there's journal thread exit
    
    ### Motivation
    
    As described in #2879, now if a bookie has multi journal directories means it has multi journal thread. Once a journal thread exits, the bookie will be unhealthy due to the block of all bookie-io threads, and then the bookie will not work but progress is still alive.
    This pull request tries to fix this problem.
    
    ### Changes
    
    check the journal thread alive in a fixed interval, let bookie quit once there's a journal thread exit
    
    (cherry picked from commit 67208fb74181faa640e793cd5757712fd9b5d9d5)
---
 .../org/apache/bookkeeper/bookie/BookieImpl.java   | 14 +++++---
 .../java/org/apache/bookkeeper/bookie/Journal.java | 12 +++++++
 .../bookkeeper/bookie/JournalAliveListener.java    | 28 +++++++++++++++
 .../bookie/BookieMultipleJournalsTest.java         | 41 ++++++++++++++++++++++
 .../bookkeeper/bookie/datainteg/WriteSetsTest.java |  1 +
 5 files changed, 91 insertions(+), 5 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
index 9bc084ee92..dd5eeb49b7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
@@ -50,6 +50,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import org.apache.bookkeeper.bookie.BookieException.DiskPartitionDuplicationException;
@@ -425,11 +426,13 @@ public class BookieImpl extends BookieCriticalThread implements Bookie {
             }
         }
 
+        JournalAliveListener journalAliveListener =
+                () -> BookieImpl.this.triggerBookieShutdown(ExitCode.BOOKIE_EXCEPTION);
         // instantiate the journals
         journals = Lists.newArrayList();
         for (int i = 0; i < journalDirectories.size(); i++) {
             journals.add(new Journal(i, journalDirectories.get(i),
-                    conf, ledgerDirsManager, statsLogger.scope(JOURNAL_SCOPE), allocator));
+                    conf, ledgerDirsManager, statsLogger.scope(JOURNAL_SCOPE), allocator, journalAliveListener));
         }
 
         this.entryLogPerLedgerEnabled = conf.isEntryLogPerLedgerEnabled();
@@ -828,12 +831,13 @@ public class BookieImpl extends BookieCriticalThread implements Bookie {
     public int shutdown() {
         return shutdown(ExitCode.OK);
     }
-
     // internal shutdown method to let shutdown bookie gracefully
     // when encountering exception
-    synchronized int shutdown(int exitCode) {
+    ReentrantLock lock = new ReentrantLock(true);
+    int shutdown(int exitCode) {
+        lock.lock();
         try {
-            if (isRunning()) { // avoid shutdown twice
+            if (isRunning()) {
                 // the exitCode only set when first shutdown usually due to exception found
                 LOG.info("Shutting down Bookie-{} with exitCode {}",
                          conf.getBookiePort(), exitCode);
@@ -854,7 +858,6 @@ public class BookieImpl extends BookieCriticalThread implements Bookie {
                 for (Journal journal : journals) {
                     journal.shutdown();
                 }
-                this.join();
 
                 // Shutdown the EntryLogger which has the GarbageCollector Thread running
                 ledgerStorage.shutdown();
@@ -871,6 +874,7 @@ public class BookieImpl extends BookieCriticalThread implements Bookie {
             LOG.error("Got Exception while trying to shutdown Bookie", e);
             throw e;
         } finally {
+            lock.unlock();
             // setting running to false here, so watch thread
             // in bookie server know it only after bookie shut down
             stateManager.close();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index a4c91e9483..193b557312 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -687,6 +687,8 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
     // Expose Stats
     private final JournalStats journalStats;
 
+    private JournalAliveListener journalAliveListener;
+
     public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf,
             LedgerDirsManager ledgerDirsManager) {
         this(journalIndex, journalDirectory, conf, ledgerDirsManager, NullStatsLogger.INSTANCE,
@@ -767,6 +769,13 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
                 () -> memoryLimitController.currentUsage());
     }
 
+    public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf,
+                   LedgerDirsManager ledgerDirsManager, StatsLogger statsLogger,
+                   ByteBufAllocator allocator, JournalAliveListener journalAliveListener) {
+        this(journalIndex, journalDirectory, conf, ledgerDirsManager, statsLogger, allocator);
+        this.journalAliveListener = journalAliveListener;
+    }
+
     JournalStats getJournalStats() {
         return this.journalStats;
     }
@@ -1227,6 +1236,9 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
             // close will flush the file system cache making any previous
             // cached writes durable so this is fine as well.
             IOUtils.close(LOG, bc);
+            if (journalAliveListener != null) {
+                journalAliveListener.onJournalExit();
+            }
         }
         LOG.info("Journal exited loop!");
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalAliveListener.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalAliveListener.java
new file mode 100644
index 0000000000..ef73edc0ea
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalAliveListener.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+/**
+ * Listener for journal alive.
+ * */
+public interface JournalAliveListener {
+    void onJournalExit();
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java
index bc30246637..a6a9a67e70 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java
@@ -21,8 +21,10 @@
 package org.apache.bookkeeper.bookie;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 
 import java.io.File;
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.List;
@@ -31,7 +33,9 @@ import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.awaitility.Awaitility;
 import org.junit.Test;
 
 /**
@@ -57,6 +61,43 @@ public class BookieMultipleJournalsTest extends BookKeeperClusterTestCase {
         return conf;
     }
 
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testJournalExit() throws Exception {
+
+        LedgerHandle ledgerHandle = bkc.createLedger(1, 1, DigestType.CRC32, new byte[0]);
+        for (int i = 0; i < 10; i++) {
+            ledgerHandle.addEntry(("entry-" + i).getBytes());
+        }
+
+        BookieServer bookieServer = serverByIndex(0);
+        BookieImpl bookie = (BookieImpl) bookieServer.getBookie();
+        Field journalList = bookie.getClass().getDeclaredField("journals");
+        journalList.setAccessible(true);
+        List<Journal> journals = (List<Journal>) journalList.get(bookie);
+        journals.get(0).interrupt();
+        Awaitility.await().untilAsserted(() -> assertFalse(bookie.isRunning()));
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testJournalExitAndShutdown() throws Exception {
+
+        LedgerHandle ledgerHandle = bkc.createLedger(1, 1, DigestType.CRC32, new byte[0]);
+        for (int i = 0; i < 10; i++) {
+            ledgerHandle.addEntry(("entry-" + i).getBytes());
+        }
+
+        BookieServer bookieServer = serverByIndex(0);
+        BookieImpl bookie = (BookieImpl) bookieServer.getBookie();
+        Field journalList = bookie.getClass().getDeclaredField("journals");
+        journalList.setAccessible(true);
+        List<Journal> journals = (List<Journal>) journalList.get(bookie);
+        journals.get(0).interrupt();
+        bookie.shutdown(ExitCode.OK);
+        Awaitility.await().untilAsserted(() -> assertFalse(bookie.isRunning()));
+    }
+
     @Test
     public void testMultipleWritesAndBookieRestart() throws Exception {
         // Creates few ledgers so that writes are spread across all journals
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/datainteg/WriteSetsTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/datainteg/WriteSetsTest.java
index 1a82b0bde0..139351b950 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/datainteg/WriteSetsTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/datainteg/WriteSetsTest.java
@@ -158,6 +158,7 @@ public class WriteSetsTest {
         }
     }
 
+    @SuppressWarnings("deprecation")
     private static void assertContentsMatch(ImmutableList<Integer> writeSet,
                                             DistributionSchedule.WriteSet distWriteSet)
             throws Exception {