You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by yo...@apache.org on 2022/08/01 13:44:49 UTC
[bookkeeper] 03/17: issue #2879 : let bookie quit if journal thread exit (#2887)
This is an automated email from the ASF dual-hosted git repository.
yong pushed a commit to branch branch-4.15
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit aaf496d006fc70a527dfe47b4c7048de4d327580
Author: AloysZhang <lo...@gmail.com>
AuthorDate: Sun Jul 31 11:30:45 2022 +0800
issue #2879 : let bookie quit if journal thread exit (#2887)
Descriptions of the changes in this PR:
fix #2879
This pull request let bookie quit when there's journal thread exit
### Motivation
As described in #2879, now if a bookie has multi journal directories means it has multi journal thread. Once a journal thread exits, the bookie will be unhealthy due to the block of all bookie-io threads, and then the bookie will not work but progress is still alive.
This pull request tries to fix this problem.
### Changes
check the journal thread alive in a fixed interval, let bookie quit once there's a journal thread exit
(cherry picked from commit 67208fb74181faa640e793cd5757712fd9b5d9d5)
---
.../org/apache/bookkeeper/bookie/BookieImpl.java | 14 +++++---
.../java/org/apache/bookkeeper/bookie/Journal.java | 12 +++++++
.../bookkeeper/bookie/JournalAliveListener.java | 28 +++++++++++++++
.../bookie/BookieMultipleJournalsTest.java | 41 ++++++++++++++++++++++
.../bookkeeper/bookie/datainteg/WriteSetsTest.java | 1 +
5 files changed, 91 insertions(+), 5 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
index 9bc084ee92..dd5eeb49b7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
@@ -50,6 +50,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.bookkeeper.bookie.BookieException.DiskPartitionDuplicationException;
@@ -425,11 +426,13 @@ public class BookieImpl extends BookieCriticalThread implements Bookie {
}
}
+ JournalAliveListener journalAliveListener =
+ () -> BookieImpl.this.triggerBookieShutdown(ExitCode.BOOKIE_EXCEPTION);
// instantiate the journals
journals = Lists.newArrayList();
for (int i = 0; i < journalDirectories.size(); i++) {
journals.add(new Journal(i, journalDirectories.get(i),
- conf, ledgerDirsManager, statsLogger.scope(JOURNAL_SCOPE), allocator));
+ conf, ledgerDirsManager, statsLogger.scope(JOURNAL_SCOPE), allocator, journalAliveListener));
}
this.entryLogPerLedgerEnabled = conf.isEntryLogPerLedgerEnabled();
@@ -828,12 +831,13 @@ public class BookieImpl extends BookieCriticalThread implements Bookie {
public int shutdown() {
return shutdown(ExitCode.OK);
}
-
// internal shutdown method to let shutdown bookie gracefully
// when encountering exception
- synchronized int shutdown(int exitCode) {
+ ReentrantLock lock = new ReentrantLock(true);
+ int shutdown(int exitCode) {
+ lock.lock();
try {
- if (isRunning()) { // avoid shutdown twice
+ if (isRunning()) {
// the exitCode only set when first shutdown usually due to exception found
LOG.info("Shutting down Bookie-{} with exitCode {}",
conf.getBookiePort(), exitCode);
@@ -854,7 +858,6 @@ public class BookieImpl extends BookieCriticalThread implements Bookie {
for (Journal journal : journals) {
journal.shutdown();
}
- this.join();
// Shutdown the EntryLogger which has the GarbageCollector Thread running
ledgerStorage.shutdown();
@@ -871,6 +874,7 @@ public class BookieImpl extends BookieCriticalThread implements Bookie {
LOG.error("Got Exception while trying to shutdown Bookie", e);
throw e;
} finally {
+ lock.unlock();
// setting running to false here, so watch thread
// in bookie server know it only after bookie shut down
stateManager.close();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index a4c91e9483..193b557312 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -687,6 +687,8 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
// Expose Stats
private final JournalStats journalStats;
+ private JournalAliveListener journalAliveListener;
+
public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf,
LedgerDirsManager ledgerDirsManager) {
this(journalIndex, journalDirectory, conf, ledgerDirsManager, NullStatsLogger.INSTANCE,
@@ -767,6 +769,13 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
() -> memoryLimitController.currentUsage());
}
+ public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf,
+ LedgerDirsManager ledgerDirsManager, StatsLogger statsLogger,
+ ByteBufAllocator allocator, JournalAliveListener journalAliveListener) {
+ this(journalIndex, journalDirectory, conf, ledgerDirsManager, statsLogger, allocator);
+ this.journalAliveListener = journalAliveListener;
+ }
+
JournalStats getJournalStats() {
return this.journalStats;
}
@@ -1227,6 +1236,9 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
// close will flush the file system cache making any previous
// cached writes durable so this is fine as well.
IOUtils.close(LOG, bc);
+ if (journalAliveListener != null) {
+ journalAliveListener.onJournalExit();
+ }
}
LOG.info("Journal exited loop!");
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalAliveListener.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalAliveListener.java
new file mode 100644
index 0000000000..ef73edc0ea
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalAliveListener.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+/**
+ * Listener for journal alive.
+ * */
+public interface JournalAliveListener {
+ void onJournalExit();
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java
index bc30246637..a6a9a67e70 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java
@@ -21,8 +21,10 @@
package org.apache.bookkeeper.bookie;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import java.io.File;
+import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
@@ -31,7 +33,9 @@ import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.awaitility.Awaitility;
import org.junit.Test;
/**
@@ -57,6 +61,43 @@ public class BookieMultipleJournalsTest extends BookKeeperClusterTestCase {
return conf;
}
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testJournalExit() throws Exception {
+
+ LedgerHandle ledgerHandle = bkc.createLedger(1, 1, DigestType.CRC32, new byte[0]);
+ for (int i = 0; i < 10; i++) {
+ ledgerHandle.addEntry(("entry-" + i).getBytes());
+ }
+
+ BookieServer bookieServer = serverByIndex(0);
+ BookieImpl bookie = (BookieImpl) bookieServer.getBookie();
+ Field journalList = bookie.getClass().getDeclaredField("journals");
+ journalList.setAccessible(true);
+ List<Journal> journals = (List<Journal>) journalList.get(bookie);
+ journals.get(0).interrupt();
+ Awaitility.await().untilAsserted(() -> assertFalse(bookie.isRunning()));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testJournalExitAndShutdown() throws Exception {
+
+ LedgerHandle ledgerHandle = bkc.createLedger(1, 1, DigestType.CRC32, new byte[0]);
+ for (int i = 0; i < 10; i++) {
+ ledgerHandle.addEntry(("entry-" + i).getBytes());
+ }
+
+ BookieServer bookieServer = serverByIndex(0);
+ BookieImpl bookie = (BookieImpl) bookieServer.getBookie();
+ Field journalList = bookie.getClass().getDeclaredField("journals");
+ journalList.setAccessible(true);
+ List<Journal> journals = (List<Journal>) journalList.get(bookie);
+ journals.get(0).interrupt();
+ bookie.shutdown(ExitCode.OK);
+ Awaitility.await().untilAsserted(() -> assertFalse(bookie.isRunning()));
+ }
+
@Test
public void testMultipleWritesAndBookieRestart() throws Exception {
// Creates few ledgers so that writes are spread across all journals
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/datainteg/WriteSetsTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/datainteg/WriteSetsTest.java
index 1a82b0bde0..139351b950 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/datainteg/WriteSetsTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/datainteg/WriteSetsTest.java
@@ -158,6 +158,7 @@ public class WriteSetsTest {
}
}
+ @SuppressWarnings("deprecation")
private static void assertContentsMatch(ImmutableList<Integer> writeSet,
DistributionSchedule.WriteSet distWriteSet)
throws Exception {