You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2021/12/13 10:24:11 UTC

[GitHub] [bookkeeper] Vanlightly opened a new pull request #2936: BP-46: Data integrity check for running without journal

Vanlightly opened a new pull request #2936:
URL: https://github.com/apache/bookkeeper/pull/2936


   ### Motivation
   
   To be able to run without the journal without compromising the replication protocol.
   
   See #2705
   
   ### Changes
   
   Introduction of a data integrity check that prevents data loss due to
   running without the journal and auto-repairs the data.
   
   * Data integrity check
   
   The integrity check is comprised two parts: a preboot check that
   is triggered by either an unclean shutdown or an invalid cookie.
   The preboot check marks any open ledgers as both fenced and in
   limbo to prevent ledgers affected by potential data loss from
   being written to. The limbo status prevents NoSuchEntry and
   NoSuchLedger responses from being sent which avoid ledger
   truncation from any ledger recovery operations. Finally it sets
   a storage flag that a full check is required.
   
   A new data integrity check service has been to run the full
   integrity check once the bookie is running. If the service
   sees that the full check storage flag is set then it runs
   a full check. This involves scanning the index and comparing
   it against metadata to discover missing entries. Any
   missing entries are sourced from peer bookies by the
   EntryCopier and written to ledger storage.
   
   The data integrity check also has a different cookie validation
   implementation.
   
   The following configurations have been added to the conf
   file:
   - dataIntegrityCheckingEnabled=true/false. False by default.
     This config enables or disables data integrity checking.
     When set to false the legacy cookie validation is used.
   - dataIntegrityStampMissingCookies=true/false. False by
     default. This config allows the data integrity process
     to stamp new cookies if a cookie is missing from a
     directory. The full check will repair any lost data if
     the directory data was lost.
   
   * Cookie verification for data integrity checking
   
   The algorithm differs from that of LegacyCookieValidation in the
   following ways:
   
   - A empty directory isn't considered a fatal condition. It just means
     that the preboot phase of the data integrity checker must run. Once
     the preboot phase runs, it should be safe to stamp the cookies
     again.
   - Bookies are not allowed to change their identity. If they do, manual
     operator intervention is required (which is ok as it is expected
     that an operator would have to intervene to change the identity in
     the first place).
   - A missing cookie in zookeeper is only valid, if there are no cookies
     in any of the directories, as this is considered a new
     boot. Otherwise, manual operator intervention is required.
   
   * Async iterator for ledger metadata
   
   Common code for iterating over ledger metadata. There is already
   asyncProcessLedgers in LedgerManager, but that only gives the ledgerId
   and the API is nasty (it even uses ZK specific callbacks).
   
   This change adds a more modern iterator, which takes a function which
   returns a CompletableFuture. The iterator has rudimentary rate
   limiting, by limiting the number of ledgers which can be processed at
   a time. We should add something more advanced later, which takes into
   account the response time from ZK.
   
   * Add limbo state to bookie ledger representation
   
   Limbo state for a ledger means that we don't know whether we should
   have an entry for the ledger or not, which can happen when a bookie is
   started after having its disk wiped. We cannot response with a
   NoSuchEntryException or NoSuchLedgerException as this tell the client
   that we never had the requested entry, which may or may not be true,
   but if we tell it to the client, the client will act like it's true
   and possible mark the end of the ledger at an incorrect point.
   
   This change also adds locking to LedgerMetadataIndex. Previously it
   relied on the good graces to the calling code to avoid modifying the
   same ledger concurrently. Now that we are also using the index to
   store limbo state, we can't be so blasé.
   
   * Add entryExists call to ledgerStorage
   
   Currently the only way to check if an entry exists in the storage is
   to try to read the entry. However, this means pulling data out of the
   entrylog, which it should be sufficient to check that the entry exists
   in the index.
   
   This change adds the entryExists call to ledgerStorage. This has only
   been implemented for DbLedgerStorage. The implementation for the
   others should be trivial, but it needs to be tested.
   
   * Pregenerate the writeset from ledger metadata
   
   The bookkeeper client uses DistributionSchedule (of which
   RoundRobinDistributionSchedule is the only impl) to decide which
   members of the ensemble it writes an entry to. This writeset is
   generated for each entry. However, there is only |ensemble| possible
   writesets, so we should pregenerate them for the ledger and stop
   trashing memory.
   
   WriteSets represent a set of pregenerated writesets as would be
   otherwise generated from the distribution schedule. The constructor
   takes a list of indices (which should be generated based on the list
   of bookies in the ensemble), which specifies the preferred order that
   bookies should be tried for reads.
   
   * Storage state flags for LedgerStorage
   
   If a bookie crashes in the middle of a full integrity check, it needs
   to know to start it again when it reboots. For this, we need to
   persist some flag to persistent storage.
   
   This change adds persistent flags to the ledger storage
   interface. Multiple flags can be added in future, for example to mark
   the storage as dirty on boot, so we can detect non-clean shutdown.
   
   Flags are only implemented for DbLedgerStorage. The flags are stored
   in the metadata index, with a negative ledger id as key. The key of
   the storage selected for ledger 0 is used. This does mean flags will
   be lost if there is a change in the storage disk configuration, but
   data integrity checks will run in this case regardless.
   
   * EUNKNOWN code
   
   A new response code has been added to communicate that the state
   of an entry is unknown due the ledger being in limbo.
   
   * Added bookie unclean shutdown detection
   
   Adds unclean shutdown detection. When running with journal
   writes disabled and data integrity checking enabled, if
   the prior shutdown was unclean (not a graceful shutdown)
   then the data integrity checks are triggered. These checks
   avoid additional data loss scenarios and repair any lost
   data caused by the loss of unflushed data at the time
   of the unclean shutdown.
   
   The BookieServer registers start-up and shutdown with the
   UncleanShutdownDetection class. This class adds a dirty
   file to each ledger dir on registering start-up and clears
   all these files on registering shutdown. The presence of
   any of these files on boot-up indicates the prior shutdown
   was unclean.
   
   Master Issue: #2705
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] Vanlightly commented on a change in pull request #2936: BP-46: Data integrity check for running without journal

Posted by GitBox <gi...@apache.org>.
Vanlightly commented on a change in pull request #2936:
URL: https://github.com/apache/bookkeeper/pull/2936#discussion_r772223348



##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/UncleanShutdownDetectionImpl.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.File;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used to determine if the prior shutdown was unclean or not. It does so
+ * by adding a file to each ledger directory after successful start-up
+ * and removing the file on graceful shutdown.
+ * Any abrupt termination will cause one or more of these files to not be cleared
+ * and so on the subsequent boot-up, the presence of any of these files will
+ * indicate an unclean shutdown.
+ */
+public class UncleanShutdownDetectionImpl implements UncleanShutdownDetection {
+    private static final Logger LOG = LoggerFactory.getLogger(UncleanShutdownDetectionImpl.class);
+    private final LedgerDirsManager ledgerDirsManager;
+    static final String DirtyFileName = "DIRTY";
+
+    public UncleanShutdownDetectionImpl(LedgerDirsManager ledgerDirsManager) {
+        this.ledgerDirsManager = ledgerDirsManager;
+    }
+
+    @Override
+    public void registerStartUp() {
+        for (File ledgerDir : ledgerDirsManager.getAllLedgerDirs()) {
+            try {
+                File dirtyFile = new File(ledgerDir, DirtyFileName);
+                dirtyFile.createNewFile();
+            } catch (Throwable t) {
+                LOG.error("Unable to register start-up (so an unclean shutdown cannot"
+                        + " be detected). Dirty file of ledger dir {} could not be created",
+                        ledgerDir.getAbsolutePath(), t);
+            }
+        }
+    }
+
+    @Override
+    public void registerCleanShutdown() {
+        for (File ledgerDir : ledgerDirsManager.getAllLedgerDirs()) {
+            try {
+                File dirtyFile = new File(ledgerDir, DirtyFileName);
+                dirtyFile.delete();
+            } catch (Throwable t) {
+                LOG.error("Unable to register a clean shutdown, dirty file of "
+                        + " ledger dir {} could not be deleted",
+                        ledgerDir.getAbsolutePath(), t);
+            }
+        }
+    }
+
+    @Override
+    public boolean lastShutdownWasUnclean() {
+        try {
+            for (File ledgerDir : ledgerDirsManager.getAllLedgerDirs()) {
+                File dirtyFile = new File(ledgerDir, DirtyFileName);
+                if (dirtyFile.exists()) {
+                    return true;

Review comment:
       I've updated it to do that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] Vanlightly commented on a change in pull request #2936: BP-46: Data integrity check for running without journal

Posted by GitBox <gi...@apache.org>.
Vanlightly commented on a change in pull request #2936:
URL: https://github.com/apache/bookkeeper/pull/2936#discussion_r772216400



##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/UncleanShutdownDetectionImpl.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.File;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used to determine if the prior shutdown was unclean or not. It does so
+ * by adding a file to each ledger directory after successful start-up
+ * and removing the file on graceful shutdown.
+ * Any abrupt termination will cause one or more of these files to not be cleared
+ * and so on the subsequent boot-up, the presence of any of these files will
+ * indicate an unclean shutdown.
+ */
+public class UncleanShutdownDetectionImpl implements UncleanShutdownDetection {
+    private static final Logger LOG = LoggerFactory.getLogger(UncleanShutdownDetectionImpl.class);
+    private final LedgerDirsManager ledgerDirsManager;
+    static final String DirtyFileName = "DIRTY";
+
+    public UncleanShutdownDetectionImpl(LedgerDirsManager ledgerDirsManager) {
+        this.ledgerDirsManager = ledgerDirsManager;
+    }
+
+    @Override
+    public void registerStartUp() {
+        for (File ledgerDir : ledgerDirsManager.getAllLedgerDirs()) {
+            try {
+                File dirtyFile = new File(ledgerDir, DirtyFileName);
+                dirtyFile.createNewFile();
+            } catch (Throwable t) {
+                LOG.error("Unable to register start-up (so an unclean shutdown cannot"
+                        + " be detected). Dirty file of ledger dir {} could not be created",
+                        ledgerDir.getAbsolutePath(), t);
+            }
+        }
+    }
+
+    @Override
+    public void registerCleanShutdown() {
+        for (File ledgerDir : ledgerDirsManager.getAllLedgerDirs()) {
+            try {
+                File dirtyFile = new File(ledgerDir, DirtyFileName);
+                dirtyFile.delete();
+            } catch (Throwable t) {
+                LOG.error("Unable to register a clean shutdown, dirty file of "
+                        + " ledger dir {} could not be deleted",
+                        ledgerDir.getAbsolutePath(), t);
+            }
+        }
+    }
+
+    @Override
+    public boolean lastShutdownWasUnclean() {
+        try {
+            for (File ledgerDir : ledgerDirsManager.getAllLedgerDirs()) {
+                File dirtyFile = new File(ledgerDir, DirtyFileName);
+                if (dirtyFile.exists()) {
+                    return true;

Review comment:
       Well, we log the detection of an unclean shutdown in the Main class, but I see that it could be useful to know which ledger directory did not get shutdown cleanly. But if we want to go that route, we'd want to collect all the directories with a dirty file and log them, rather returning on the first.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] Vanlightly commented on a change in pull request #2936: BP-46: Data integrity check for running without journal

Posted by GitBox <gi...@apache.org>.
Vanlightly commented on a change in pull request #2936:
URL: https://github.com/apache/bookkeeper/pull/2936#discussion_r772968452



##########
File path: bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/MockLedgerStorage.java
##########
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import com.google.common.util.concurrent.RateLimiter;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.PrimitiveIterator;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.common.util.Watcher;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+/**
+ * A mock for running tests that require ledger storage.
+ */
+public class MockLedgerStorage implements LedgerStorage {

Review comment:
       I think a future project is to make more use of mocks like this in order to speed up the tests. Currently it takes too long to run everything.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] Vanlightly commented on a change in pull request #2936: BP-46: Data integrity check for running without journal

Posted by GitBox <gi...@apache.org>.
Vanlightly commented on a change in pull request #2936:
URL: https://github.com/apache/bookkeeper/pull/2936#discussion_r772223450



##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/datainteg/DataIntegrityCheck.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.bookie.datainteg;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The interface for the data integrity check feature. This feature allows
+ * a bookie to handle data loss scenarios such as when running without
+ * the journal or after a disk failure has caused the loss of all data.
+ */
+public interface DataIntegrityCheck {
+    /**
+     * Run quick preboot check. This check should do enough to ensure that
+     * it is safe to complete the boot sequence without compromising correctness.
+     * To this end, if it finds that this bookie is part of the last ensemble of
+     * an unclosed ledger, it must prevent the bookie from being able store new
+     * entries for that ledger and must prevent the bookie from taking part in
+     * the discovery of the last entry of that ledger.
+     */
+    CompletableFuture<Void> runPreBoot(String reason);
+
+    /**
+     * Whether we need to run a full check.
+     * This condition can be set by the runPreBoot() call to run a full check
+     * in the background once the bookie is running. This can later be used
+     * to run the full check periodically, or to exponentially backoff and retry
+     * when some transient condition prevents a ledger being fixed during a
+     * full check.
+     */
+    boolean needsFull() throws IOException;

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] dlg99 commented on a change in pull request #2936: BP-46: Data integrity check for running without journal

Posted by GitBox <gi...@apache.org>.
dlg99 commented on a change in pull request #2936:
URL: https://github.com/apache/bookkeeper/pull/2936#discussion_r772839255



##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
##########
@@ -1013,4 +1077,91 @@ public void diskJustWritable(File disk) {
             }
         };
     }
+
+    @Override
+    public void setLimboState(long ledgerId) throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug("setLimboState. ledger: {}", ledgerId);
+        }
+        ledgerIndex.setLimbo(ledgerId);
+    }
+
+    @Override
+    public boolean hasLimboState(long ledgerId) throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug("hasLimboState. ledger: {}", ledgerId);
+        }
+        return ledgerIndex.get(ledgerId).getLimbo();
+    }
+
+    @Override
+    public void clearLimboState(long ledgerId) throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug("clearLimboState. ledger: {}", ledgerId);
+        }
+        ledgerIndex.clearLimbo(ledgerId);
+    }
+
+    private void throwIfLimbo(long ledgerId) throws IOException, BookieException {

Review comment:
       that's exactly what I wanted to avoid - some other subclassed exception thrown but handled the same because IDE generated catch for BookieException. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] Vanlightly commented on a change in pull request #2936: BP-46: Data integrity check for running without journal

Posted by GitBox <gi...@apache.org>.
Vanlightly commented on a change in pull request #2936:
URL: https://github.com/apache/bookkeeper/pull/2936#discussion_r772212527



##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/UncleanShutdownDetectionImpl.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.File;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used to determine if the prior shutdown was unclean or not. It does so
+ * by adding a file to each ledger directory after successful start-up
+ * and removing the file on graceful shutdown.
+ * Any abrupt termination will cause one or more of these files to not be cleared
+ * and so on the subsequent boot-up, the presence of any of these files will
+ * indicate an unclean shutdown.
+ */
+public class UncleanShutdownDetectionImpl implements UncleanShutdownDetection {
+    private static final Logger LOG = LoggerFactory.getLogger(UncleanShutdownDetectionImpl.class);
+    private final LedgerDirsManager ledgerDirsManager;
+    static final String DirtyFileName = "DIRTY";
+
+    public UncleanShutdownDetectionImpl(LedgerDirsManager ledgerDirsManager) {
+        this.ledgerDirsManager = ledgerDirsManager;
+    }
+
+    @Override
+    public void registerStartUp() {
+        for (File ledgerDir : ledgerDirsManager.getAllLedgerDirs()) {
+            try {
+                File dirtyFile = new File(ledgerDir, DirtyFileName);
+                dirtyFile.createNewFile();
+            } catch (Throwable t) {
+                LOG.error("Unable to register start-up (so an unclean shutdown cannot"

Review comment:
       Agreed. I have changed this so it will crash, and logs the creation of the file.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] Vanlightly commented on a change in pull request #2936: BP-46: Data integrity check for running without journal

Posted by GitBox <gi...@apache.org>.
Vanlightly commented on a change in pull request #2936:
URL: https://github.com/apache/bookkeeper/pull/2936#discussion_r772213631



##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/UncleanShutdownDetectionImpl.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.File;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used to determine if the prior shutdown was unclean or not. It does so
+ * by adding a file to each ledger directory after successful start-up
+ * and removing the file on graceful shutdown.
+ * Any abrupt termination will cause one or more of these files to not be cleared
+ * and so on the subsequent boot-up, the presence of any of these files will
+ * indicate an unclean shutdown.
+ */
+public class UncleanShutdownDetectionImpl implements UncleanShutdownDetection {
+    private static final Logger LOG = LoggerFactory.getLogger(UncleanShutdownDetectionImpl.class);
+    private final LedgerDirsManager ledgerDirsManager;
+    static final String DirtyFileName = "DIRTY";

Review comment:
       Updated to DIRTY_FILENAME. Added more checks to deletion of dirty files.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] Vanlightly commented on a change in pull request #2936: BP-46: Data integrity check for running without journal

Posted by GitBox <gi...@apache.org>.
Vanlightly commented on a change in pull request #2936:
URL: https://github.com/apache/bookkeeper/pull/2936#discussion_r772939852



##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
##########
@@ -1013,4 +1077,91 @@ public void diskJustWritable(File disk) {
             }
         };
     }
+
+    @Override
+    public void setLimboState(long ledgerId) throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug("setLimboState. ledger: {}", ledgerId);
+        }
+        ledgerIndex.setLimbo(ledgerId);
+    }
+
+    @Override
+    public boolean hasLimboState(long ledgerId) throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug("hasLimboState. ledger: {}", ledgerId);
+        }
+        return ledgerIndex.get(ledgerId).getLimbo();
+    }
+
+    @Override
+    public void clearLimboState(long ledgerId) throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug("clearLimboState. ledger: {}", ledgerId);
+        }
+        ledgerIndex.clearLimbo(ledgerId);
+    }
+
+    private void throwIfLimbo(long ledgerId) throws IOException, BookieException {

Review comment:
       I used a subclass of BookieException as I try to follow the existing approaches to things. So I guess the question is why do we have a BookieException with subclasses? Is there a guideline for how to use it correctly or is the use of BookieException in general something we need to start refactoring out?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] Vanlightly commented on a change in pull request #2936: BP-46: Data integrity check for running without journal

Posted by GitBox <gi...@apache.org>.
Vanlightly commented on a change in pull request #2936:
URL: https://github.com/apache/bookkeeper/pull/2936#discussion_r772184271



##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
##########
@@ -1013,4 +1077,91 @@ public void diskJustWritable(File disk) {
             }
         };
     }
+
+    @Override
+    public void setLimboState(long ledgerId) throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug("setLimboState. ledger: {}", ledgerId);
+        }
+        ledgerIndex.setLimbo(ledgerId);
+    }
+
+    @Override
+    public boolean hasLimboState(long ledgerId) throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug("hasLimboState. ledger: {}", ledgerId);
+        }
+        return ledgerIndex.get(ledgerId).getLimbo();
+    }
+
+    @Override
+    public void clearLimboState(long ledgerId) throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug("clearLimboState. ledger: {}", ledgerId);
+        }
+        ledgerIndex.clearLimbo(ledgerId);
+    }
+
+    private void throwIfLimbo(long ledgerId) throws IOException, BookieException {

Review comment:
       A DataUnknownException is thrown which is a subclass of BookieException.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] Vanlightly commented on a change in pull request #2936: BP-46: Data integrity check for running without journal

Posted by GitBox <gi...@apache.org>.
Vanlightly commented on a change in pull request #2936:
URL: https://github.com/apache/bookkeeper/pull/2936#discussion_r772237897



##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/datainteg/DataIntegrityCheckImpl.java
##########
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.bookie.datainteg;
+
+import com.google.common.collect.ImmutableSortedMap;
+import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.core.Maybe;
+import io.reactivex.rxjava3.core.Scheduler;
+import io.reactivex.rxjava3.core.Single;
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.bookie.LedgerStorage;
+import org.apache.bookkeeper.bookie.LedgerStorage.StorageState;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeperAdmin;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.net.BookieId;
+
+/**
+ * An implementation of the DataIntegrityCheck interface.
+ */
+@Slf4j
+public class DataIntegrityCheckImpl implements DataIntegrityCheck {
+    private static final int MAX_INFLIGHT = 300;
+    private static final int MAX_ENTRIES_INFLIGHT = 3000;
+    private static final int ZK_TIMEOUT_S = 30;
+    private final BookieId bookieId;
+    private final LedgerManager ledgerManager;
+    private final LedgerStorage ledgerStorage;
+    private final EntryCopier entryCopier;
+    private final BookKeeperAdmin admin;
+    private final Scheduler scheduler;
+    private final AtomicReference<Map<Long, LedgerMetadata>> ledgersCacheRef =
+        new AtomicReference<>(null);
+    private CompletableFuture<Void> preBootFuture;
+
+    public DataIntegrityCheckImpl(BookieId bookieId,
+                                  LedgerManager ledgerManager,
+                                  LedgerStorage ledgerStorage,
+                                  EntryCopier entryCopier,
+                                  BookKeeperAdmin admin,
+                                  Scheduler scheduler) {
+        this.bookieId = bookieId;
+        this.ledgerManager = ledgerManager;
+        this.ledgerStorage = ledgerStorage;
+        this.entryCopier = entryCopier;
+        this.admin = admin;
+        this.scheduler = scheduler;
+    }
+
+    @Override
+    public CompletableFuture<Void> runPreBoot(String reason) {
+        // we only run this once, it could be kicked off by different checks
+        synchronized (this) {
+            if (preBootFuture == null) {
+                preBootFuture = runPreBootSequence(reason);
+            }
+        }
+        return preBootFuture;
+
+    }
+
+    private CompletableFuture<Void> runPreBootSequence(String reason) {
+        String runId = UUID.randomUUID().toString();
+        log.info("Event: {}, RunId: {}, Reason: {}", Events.PREBOOT_START, runId, reason);
+        try {
+            this.ledgerStorage.setStorageStateFlag(StorageState.NEEDS_INTEGRITY_CHECK);
+        } catch (IOException ioe) {
+            log.error("Event: {}, RunId: {}", Events.PREBOOT_ERROR, runId, ioe);
+            return FutureUtils.exception(ioe);
+        }
+
+        MetadataAsyncIterator iter = new MetadataAsyncIterator(scheduler,
+                ledgerManager, MAX_INFLIGHT, ZK_TIMEOUT_S, TimeUnit.SECONDS);
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        Map<Long, LedgerMetadata> ledgersCache =
+            new ConcurrentSkipListMap<>(Comparator.<Long>naturalOrder().reversed());
+        iter.forEach((ledgerId, metadata) -> {
+                if (ensemblesContainBookie(metadata, bookieId)) {
+                    ledgersCache.put(ledgerId, metadata);
+                    try {
+                        if (!ledgerStorage.ledgerExists(ledgerId)) {
+                            ledgerStorage.setMasterKey(ledgerId, new byte[0]);
+                        }
+                    } catch (IOException ioe) {
+                        return FutureUtils.exception(ioe);

Review comment:
       Agreed. I've logged the error here with some more context.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] Vanlightly commented on a change in pull request #2936: BP-46: Data integrity check for running without journal

Posted by GitBox <gi...@apache.org>.
Vanlightly commented on a change in pull request #2936:
URL: https://github.com/apache/bookkeeper/pull/2936#discussion_r772967679



##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java
##########
@@ -340,14 +359,61 @@ public static LifecycleComponentStack buildBookieServer(BookieConfiguration conf
         LedgerDirsManager indexDirsManager = BookieResources.createIndexDirsManager(
                 conf.getServerConf(), diskChecker, bookieStats.scope(LD_INDEX_SCOPE), ledgerDirsManager);
 
-        CookieValidation cookieValidation = new LegacyCookieValidation(conf.getServerConf(), rm);
-        cookieValidation.checkCookies(storageDirectoriesFromConf(conf.getServerConf()));
-
         ByteBufAllocatorWithOomHandler allocator = BookieResources.createAllocator(conf.getServerConf());
 
+        UncleanShutdownDetection uncleanShutdownDetection = new UncleanShutdownDetectionImpl(ledgerDirsManager);
+        if (uncleanShutdownDetection.lastShutdownWasUnclean()) {
+            log.info("Unclean shutdown detected. The bookie did not register a graceful shutdown prior to this boot.");
+        }
+
         // bookie takes ownership of storage, so shuts it down
-        LedgerStorage storage = BookieResources.createLedgerStorage(
-                conf.getServerConf(), ledgerManager, ledgerDirsManager, indexDirsManager, bookieStats, allocator);
+        LedgerStorage storage = null;
+        DataIntegrityCheck integCheck = null;
+
+        if (conf.getServerConf().isDataIntegrityCheckingEnabled()) {
+            StatsLogger clientStats = bookieStats.scope(CLIENT_SCOPE);
+            ClientConfiguration clientConfiguration = new ClientConfiguration(conf.getServerConf());
+            clientConfiguration.setClientRole(ClientConfiguration.CLIENT_ROLE_SYSTEM);
+            BookKeeper bkc = BookKeeper.forConfig(clientConfiguration).statsLogger(clientStats).build();
+            serverBuilder.addComponent(new AutoCloseableLifecycleComponent("bkc", bkc));
+
+            BookieId bookieId = BookieImpl.getBookieId(conf.getServerConf());
+            ExecutorService rxExecutor = Executors.newFixedThreadPool(

Review comment:
       I have modified the RxSchedulerLifecycleComponent to take ownership of the rxExecutor and its shutdown.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] dlg99 commented on a change in pull request #2936: BP-46: Data integrity check for running without journal

Posted by GitBox <gi...@apache.org>.
dlg99 commented on a change in pull request #2936:
URL: https://github.com/apache/bookkeeper/pull/2936#discussion_r771743310



##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/UncleanShutdownDetectionImpl.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.File;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used to determine if the prior shutdown was unclean or not. It does so
+ * by adding a file to each ledger directory after successful start-up
+ * and removing the file on graceful shutdown.
+ * Any abrupt termination will cause one or more of these files to not be cleared
+ * and so on the subsequent boot-up, the presence of any of these files will
+ * indicate an unclean shutdown.
+ */
+public class UncleanShutdownDetectionImpl implements UncleanShutdownDetection {
+    private static final Logger LOG = LoggerFactory.getLogger(UncleanShutdownDetectionImpl.class);
+    private final LedgerDirsManager ledgerDirsManager;
+    static final String DirtyFileName = "DIRTY";
+
+    public UncleanShutdownDetectionImpl(LedgerDirsManager ledgerDirsManager) {
+        this.ledgerDirsManager = ledgerDirsManager;
+    }
+
+    @Override
+    public void registerStartUp() {
+        for (File ledgerDir : ledgerDirsManager.getAllLedgerDirs()) {
+            try {
+                File dirtyFile = new File(ledgerDir, DirtyFileName);
+                dirtyFile.createNewFile();
+            } catch (Throwable t) {
+                LOG.error("Unable to register start-up (so an unclean shutdown cannot"

Review comment:
       throw/crash in this case?
   Otherwise one can get bookie started in an unclear state without data correctness guarantees later

##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/datainteg/DataIntegrityCheck.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.bookie.datainteg;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The interface for the data integrity check feature. This feature allows
+ * a bookie to handle data loss scenarios such as when running without
+ * the journal or after a disk failure has caused the loss of all data.
+ */
+public interface DataIntegrityCheck {
+    /**
+     * Run quick preboot check. This check should do enough to ensure that
+     * it is safe to complete the boot sequence without compromising correctness.
+     * To this end, if it finds that this bookie is part of the last ensemble of
+     * an unclosed ledger, it must prevent the bookie from being able store new
+     * entries for that ledger and must prevent the bookie from taking part in
+     * the discovery of the last entry of that ledger.
+     */
+    CompletableFuture<Void> runPreBoot(String reason);
+
+    /**
+     * Whether we need to run a full check.
+     * This condition can be set by the runPreBoot() call to run a full check
+     * in the background once the bookie is running. This can later be used
+     * to run the full check periodically, or to exponentially backoff and retry
+     * when some transient condition prevents a ledger being fixed during a
+     * full check.
+     */
+    boolean needsFull() throws IOException;
+
+    /**
+     * Run full check of bookies local data. This check should ensure that
+     * if the metadata service states that it should have an entry, then it
+     * should have that entry. If the entry is missing, it should copy it
+     * from another available source.
+     */
+    CompletableFuture<Void> runFullCheck();

Review comment:
       nit: naming consistency. 
   either runPreBoot/needsFull/runFull or runPreBootCheck/needsFullCheck/runFullCheck (I prefer the latter)

##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
##########
@@ -180,6 +180,11 @@ public static short getFlags(int packetHeader) {
      */
     int ETOOMANYREQUESTS = 106;
 
+    /**
+     * Ledger in unknown state.
+     */
+    int EUNKNOWN = 107;

Review comment:
       do we need to bump up CURRENT_PROTOCOL_VERSION?

##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
##########
@@ -1013,4 +1077,91 @@ public void diskJustWritable(File disk) {
             }
         };
     }
+
+    @Override
+    public void setLimboState(long ledgerId) throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug("setLimboState. ledger: {}", ledgerId);
+        }
+        ledgerIndex.setLimbo(ledgerId);
+    }
+
+    @Override
+    public boolean hasLimboState(long ledgerId) throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug("hasLimboState. ledger: {}", ledgerId);
+        }
+        return ledgerIndex.get(ledgerId).getLimbo();
+    }
+
+    @Override
+    public void clearLimboState(long ledgerId) throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug("clearLimboState. ledger: {}", ledgerId);
+        }
+        ledgerIndex.clearLimbo(ledgerId);
+    }
+
+    private void throwIfLimbo(long ledgerId) throws IOException, BookieException {

Review comment:
       maybe do `throws .. DataUnknownException` (instead of BookieException)? with similar changes wherever it bubbles up (readLastAddConfirmed/...), to make sure that some other Bookie Exception is not handled as a DataUnknown case. 

##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/datainteg/DataIntegrityCheckImpl.java
##########
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.bookie.datainteg;
+
+import com.google.common.collect.ImmutableSortedMap;
+import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.core.Maybe;
+import io.reactivex.rxjava3.core.Scheduler;
+import io.reactivex.rxjava3.core.Single;
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.bookie.LedgerStorage;
+import org.apache.bookkeeper.bookie.LedgerStorage.StorageState;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeperAdmin;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.net.BookieId;
+
+/**
+ * An implementation of the DataIntegrityCheck interface.
+ */
+@Slf4j
+public class DataIntegrityCheckImpl implements DataIntegrityCheck {
+    private static final int MAX_INFLIGHT = 300;
+    private static final int MAX_ENTRIES_INFLIGHT = 3000;
+    private static final int ZK_TIMEOUT_S = 30;
+    private final BookieId bookieId;
+    private final LedgerManager ledgerManager;
+    private final LedgerStorage ledgerStorage;
+    private final EntryCopier entryCopier;
+    private final BookKeeperAdmin admin;
+    private final Scheduler scheduler;
+    private final AtomicReference<Map<Long, LedgerMetadata>> ledgersCacheRef =
+        new AtomicReference<>(null);
+    private CompletableFuture<Void> preBootFuture;
+
+    public DataIntegrityCheckImpl(BookieId bookieId,
+                                  LedgerManager ledgerManager,
+                                  LedgerStorage ledgerStorage,
+                                  EntryCopier entryCopier,
+                                  BookKeeperAdmin admin,
+                                  Scheduler scheduler) {
+        this.bookieId = bookieId;
+        this.ledgerManager = ledgerManager;
+        this.ledgerStorage = ledgerStorage;
+        this.entryCopier = entryCopier;
+        this.admin = admin;
+        this.scheduler = scheduler;
+    }
+
+    @Override
+    public CompletableFuture<Void> runPreBoot(String reason) {
+        // we only run this once, it could be kicked off by different checks
+        synchronized (this) {
+            if (preBootFuture == null) {
+                preBootFuture = runPreBootSequence(reason);
+            }
+        }
+        return preBootFuture;
+
+    }
+
+    private CompletableFuture<Void> runPreBootSequence(String reason) {
+        String runId = UUID.randomUUID().toString();
+        log.info("Event: {}, RunId: {}, Reason: {}", Events.PREBOOT_START, runId, reason);
+        try {
+            this.ledgerStorage.setStorageStateFlag(StorageState.NEEDS_INTEGRITY_CHECK);
+        } catch (IOException ioe) {
+            log.error("Event: {}, RunId: {}", Events.PREBOOT_ERROR, runId, ioe);
+            return FutureUtils.exception(ioe);
+        }
+
+        MetadataAsyncIterator iter = new MetadataAsyncIterator(scheduler,
+                ledgerManager, MAX_INFLIGHT, ZK_TIMEOUT_S, TimeUnit.SECONDS);
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        Map<Long, LedgerMetadata> ledgersCache =

Review comment:
       should we be concerned about memory requirements here in case of large disks (to many ledgers/metadata with many segments)? See https://github.com/apache/bookkeeper/pull/1949




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] eolivelli commented on a change in pull request #2936: BP-46: Data integrity check for running without journal

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #2936:
URL: https://github.com/apache/bookkeeper/pull/2936#discussion_r772928202



##########
File path: bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/MockLedgerStorage.java
##########
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import com.google.common.util.concurrent.RateLimiter;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.PrimitiveIterator;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.common.util.Watcher;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+/**
+ * A mock for running tests that require ledger storage.
+ */
+public class MockLedgerStorage implements LedgerStorage {

Review comment:
       it would be great, as a follow up work, to move this class out of the "tests" package, this way people can use this LedgerStorage implementation in unit tests or downstream applications that need a Bookie.
   
   So this class + don't write to the journal = lightweight bookie for unit tests
   
   btw people can "depend" on bookkeeper-server tests, so not a big deal but we could add more visibility
   

##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java
##########
@@ -340,14 +359,61 @@ public static LifecycleComponentStack buildBookieServer(BookieConfiguration conf
         LedgerDirsManager indexDirsManager = BookieResources.createIndexDirsManager(
                 conf.getServerConf(), diskChecker, bookieStats.scope(LD_INDEX_SCOPE), ledgerDirsManager);
 
-        CookieValidation cookieValidation = new LegacyCookieValidation(conf.getServerConf(), rm);
-        cookieValidation.checkCookies(storageDirectoriesFromConf(conf.getServerConf()));
-
         ByteBufAllocatorWithOomHandler allocator = BookieResources.createAllocator(conf.getServerConf());
 
+        UncleanShutdownDetection uncleanShutdownDetection = new UncleanShutdownDetectionImpl(ledgerDirsManager);
+        if (uncleanShutdownDetection.lastShutdownWasUnclean()) {
+            log.info("Unclean shutdown detected. The bookie did not register a graceful shutdown prior to this boot.");
+        }
+
         // bookie takes ownership of storage, so shuts it down
-        LedgerStorage storage = BookieResources.createLedgerStorage(
-                conf.getServerConf(), ledgerManager, ledgerDirsManager, indexDirsManager, bookieStats, allocator);
+        LedgerStorage storage = null;
+        DataIntegrityCheck integCheck = null;
+
+        if (conf.getServerConf().isDataIntegrityCheckingEnabled()) {
+            StatsLogger clientStats = bookieStats.scope(CLIENT_SCOPE);
+            ClientConfiguration clientConfiguration = new ClientConfiguration(conf.getServerConf());
+            clientConfiguration.setClientRole(ClientConfiguration.CLIENT_ROLE_SYSTEM);
+            BookKeeper bkc = BookKeeper.forConfig(clientConfiguration).statsLogger(clientStats).build();
+            serverBuilder.addComponent(new AutoCloseableLifecycleComponent("bkc", bkc));
+
+            BookieId bookieId = BookieImpl.getBookieId(conf.getServerConf());
+            ExecutorService rxExecutor = Executors.newFixedThreadPool(

Review comment:
       it looks like we are not shutting down this ExecutorService.
   not a big deal because we are in the Main class and the JVM will die, but if there is a clean way to shutdown this thread pool it would be better

##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
##########
@@ -327,6 +328,13 @@ public boolean ledgerExists(long ledgerId) throws IOException {
         return ledgerCache.ledgerExists(ledgerId);
     }
 
+    @Override
+    public boolean entryExists(long ledgerId, long entryId) throws IOException {
+        //Implementation should be as simple as what's below, but this needs testing
+        //return ledgerCache.getEntryOffset(ledgerId, entryId) > 0;
+        throw new UnsupportedOperationException("entry exists not supported");

Review comment:
       makes sense




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] Vanlightly commented on a change in pull request #2936: BP-46: Data integrity check for running without journal

Posted by GitBox <gi...@apache.org>.
Vanlightly commented on a change in pull request #2936:
URL: https://github.com/apache/bookkeeper/pull/2936#discussion_r772214016



##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/UncleanShutdownDetectionImpl.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.File;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used to determine if the prior shutdown was unclean or not. It does so
+ * by adding a file to each ledger directory after successful start-up
+ * and removing the file on graceful shutdown.
+ * Any abrupt termination will cause one or more of these files to not be cleared
+ * and so on the subsequent boot-up, the presence of any of these files will
+ * indicate an unclean shutdown.
+ */
+public class UncleanShutdownDetectionImpl implements UncleanShutdownDetection {
+    private static final Logger LOG = LoggerFactory.getLogger(UncleanShutdownDetectionImpl.class);
+    private final LedgerDirsManager ledgerDirsManager;
+    static final String DirtyFileName = "DIRTY";
+
+    public UncleanShutdownDetectionImpl(LedgerDirsManager ledgerDirsManager) {
+        this.ledgerDirsManager = ledgerDirsManager;
+    }
+
+    @Override
+    public void registerStartUp() {
+        for (File ledgerDir : ledgerDirsManager.getAllLedgerDirs()) {
+            try {
+                File dirtyFile = new File(ledgerDir, DirtyFileName);
+                dirtyFile.createNewFile();
+            } catch (Throwable t) {
+                LOG.error("Unable to register start-up (so an unclean shutdown cannot"
+                        + " be detected). Dirty file of ledger dir {} could not be created",
+                        ledgerDir.getAbsolutePath(), t);
+            }
+        }
+    }
+
+    @Override
+    public void registerCleanShutdown() {
+        for (File ledgerDir : ledgerDirsManager.getAllLedgerDirs()) {
+            try {
+                File dirtyFile = new File(ledgerDir, DirtyFileName);
+                dirtyFile.delete();

Review comment:
       Added more checks to deletion of dirty files.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] Vanlightly commented on a change in pull request #2936: BP-46: Data integrity check for running without journal

Posted by GitBox <gi...@apache.org>.
Vanlightly commented on a change in pull request #2936:
URL: https://github.com/apache/bookkeeper/pull/2936#discussion_r772210865



##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
##########
@@ -180,6 +180,11 @@ public static short getFlags(int packetHeader) {
      */
     int ETOOMANYREQUESTS = 106;
 
+    /**
+     * Ledger in unknown state.
+     */
+    int EUNKNOWN = 107;

Review comment:
       What are the current guidelines for modifying the CURRENT_PROTOCOL_VERSION?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] dlg99 commented on a change in pull request #2936: BP-46: Data integrity check for running without journal

Posted by GitBox <gi...@apache.org>.
dlg99 commented on a change in pull request #2936:
URL: https://github.com/apache/bookkeeper/pull/2936#discussion_r779109677



##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
##########
@@ -180,6 +180,11 @@ public static short getFlags(int packetHeader) {
      */
     int ETOOMANYREQUESTS = 106;
 
+    /**
+     * Ledger in unknown state.
+     */
+    int EUNKNOWN = 107;

Review comment:
       I don't think we have it documented (@sijie and @merlimat can correct me) but the rule of thumb was around old client not handling new responses correctly/doing something dangerously wrong when it gets the new response.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] Vanlightly commented on a change in pull request #2936: BP-46: Data integrity check for running without journal

Posted by GitBox <gi...@apache.org>.
Vanlightly commented on a change in pull request #2936:
URL: https://github.com/apache/bookkeeper/pull/2936#discussion_r772212815



##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/datainteg/DataIntegrityCheck.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.bookie.datainteg;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The interface for the data integrity check feature. This feature allows
+ * a bookie to handle data loss scenarios such as when running without
+ * the journal or after a disk failure has caused the loss of all data.
+ */
+public interface DataIntegrityCheck {
+    /**
+     * Run quick preboot check. This check should do enough to ensure that
+     * it is safe to complete the boot sequence without compromising correctness.
+     * To this end, if it finds that this bookie is part of the last ensemble of
+     * an unclosed ledger, it must prevent the bookie from being able store new
+     * entries for that ledger and must prevent the bookie from taking part in
+     * the discovery of the last entry of that ledger.
+     */
+    CompletableFuture<Void> runPreBoot(String reason);
+
+    /**
+     * Whether we need to run a full check.
+     * This condition can be set by the runPreBoot() call to run a full check
+     * in the background once the bookie is running. This can later be used
+     * to run the full check periodically, or to exponentially backoff and retry
+     * when some transient condition prevents a ledger being fixed during a
+     * full check.
+     */
+    boolean needsFull() throws IOException;
+
+    /**
+     * Run full check of bookies local data. This check should ensure that
+     * if the metadata service states that it should have an entry, then it
+     * should have that entry. If the entry is missing, it should copy it
+     * from another available source.
+     */
+    CompletableFuture<Void> runFullCheck();

Review comment:
       Agreed, updated to runPreBootCheck/needsFullCheck/runFullCheck.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] Vanlightly commented on a change in pull request #2936: BP-46: Data integrity check for running without journal

Posted by GitBox <gi...@apache.org>.
Vanlightly commented on a change in pull request #2936:
URL: https://github.com/apache/bookkeeper/pull/2936#discussion_r772213631



##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/UncleanShutdownDetectionImpl.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.File;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used to determine if the prior shutdown was unclean or not. It does so
+ * by adding a file to each ledger directory after successful start-up
+ * and removing the file on graceful shutdown.
+ * Any abrupt termination will cause one or more of these files to not be cleared
+ * and so on the subsequent boot-up, the presence of any of these files will
+ * indicate an unclean shutdown.
+ */
+public class UncleanShutdownDetectionImpl implements UncleanShutdownDetection {
+    private static final Logger LOG = LoggerFactory.getLogger(UncleanShutdownDetectionImpl.class);
+    private final LedgerDirsManager ledgerDirsManager;
+    static final String DirtyFileName = "DIRTY";

Review comment:
       Updated to DIRTY_FILENAME.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] eolivelli commented on a change in pull request #2936: BP-46: Data integrity check for running without journal

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #2936:
URL: https://github.com/apache/bookkeeper/pull/2936#discussion_r771811531



##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
##########
@@ -327,6 +328,13 @@ public boolean ledgerExists(long ledgerId) throws IOException {
         return ledgerCache.ledgerExists(ledgerId);
     }
 
+    @Override
+    public boolean entryExists(long ledgerId, long entryId) throws IOException {
+        //Implementation should be as simple as what's below, but this needs testing
+        //return ledgerCache.getEntryOffset(ledgerId, entryId) > 0;
+        throw new UnsupportedOperationException("entry exists not supported");

Review comment:
       is it better to throw a IOException (here and in the other methods in this class) ?
   Unchecked exception are usually not handled and so we can fall into unexpected errors.
   
   not a big deal in any case, as the bookie is not working

##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/datainteg/DataIntegrityCookieValidation.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.bookie.datainteg;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.bookie.BookieImpl;
+import org.apache.bookkeeper.bookie.Cookie;
+import org.apache.bookkeeper.bookie.CookieValidation;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.discover.RegistrationManager;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An implementation of the CookieValidation interface that allows for auto-stamping
+ * cookies when configured and used in conjunction with the data integrity service.
+ * Because the data integrity service can heal a bookie with lost data due to a disk
+ * failure, a bookie can auto stamp new cookies as part of the healing process.
+ */
+public class DataIntegrityCookieValidation implements CookieValidation {
+    private static final Logger log = LoggerFactory.getLogger(DataIntegrityCookieValidation.class);
+    private final ServerConfiguration conf;
+    private final BookieId bookieId;
+    private final RegistrationManager registrationManager;
+    private final DataIntegrityCheck dataIntegCheck;
+
+    public DataIntegrityCookieValidation(ServerConfiguration conf,
+                                         RegistrationManager registrationManager,
+                                         DataIntegrityCheck dataIntegCheck)
+            throws UnknownHostException {
+        this.conf = conf;
+        this.registrationManager = registrationManager;
+        this.bookieId = BookieImpl.getBookieId(conf);
+        this.dataIntegCheck = dataIntegCheck;
+    }
+
+    private Optional<Versioned<Cookie>> getRegManagerCookie() throws BookieException {
+        try {
+            return Optional.of(Cookie.readFromRegistrationManager(registrationManager, bookieId));
+        } catch (BookieException.CookieNotFoundException noCookieException) {
+            return Optional.empty();
+        }
+    }
+
+    private List<Optional<Cookie>> collectDirectoryCookies(List<File> directories) throws BookieException {
+        List<Optional<Cookie>> cookies = new ArrayList<>();
+        for (File d : directories) {
+            try {
+                cookies.add(Optional.of(Cookie.readFromDirectory(d)));
+            } catch (FileNotFoundException fnfe) {
+                cookies.add(Optional.empty());
+            } catch (IOException ioe) {
+                throw new BookieException.InvalidCookieException(ioe);
+            }
+        }
+        return cookies;
+    }
+
+    private void stampCookie(Cookie masterCookie, Version expectedVersion, List<File> directories)
+            throws BookieException {
+        // stamp to ZK first as it's the authoritive cookie. If this fails part way through
+        // stamping the directories, then a data integrity check will occur.
+        log.info("Stamping cookie to ZK");
+        masterCookie.writeToRegistrationManager(registrationManager, conf, expectedVersion);
+        for (File d : directories) {
+            try {
+                log.info("Stamping cookie to directory {}", d);
+                masterCookie.writeToDirectory(d);
+            } catch (IOException ioe) {
+                log.error("Exception writing cookie to {}", ioe);
+                throw new BookieException.InvalidCookieException(ioe);
+            }
+        }
+    }
+
+    @Override
+    public void checkCookies(List<File> directories)
+            throws BookieException, InterruptedException {
+        String instanceId = registrationManager.getClusterInstanceId();
+        if (instanceId == null) {
+            throw new BookieException.InvalidCookieException("Cluster instance ID unavailable");
+        }
+        Cookie masterCookie;
+        try {
+            masterCookie = Cookie.generateCookie(conf).setInstanceId(instanceId).build();
+        } catch (UnknownHostException uhe) {
+            throw new BookieException.InvalidCookieException(uhe);
+        }
+
+        // collect existing cookies
+        Optional<Versioned<Cookie>> regManagerCookie = getRegManagerCookie();
+        List<Optional<Cookie>> directoryCookies = collectDirectoryCookies(directories);
+
+        // if master is empty, everything must be empty, otherwise the cluster is messed up
+        if (!regManagerCookie.isPresent()) {
+            // if everything is empty, it's a new install, just stamp the cookies
+            if (directoryCookies.stream().noneMatch(Optional::isPresent)) {
+                log.info("New environment found. Stamping cookies");
+                stampCookie(masterCookie, Version.NEW, directories);
+            } else {
+                String errorMsg =
+                    "Cookie missing from ZK. Either it was manually deleted, "
+                    + "or the bookie was started pointing to a different ZK cluster "
+                    + "than the one it was originally started with. "
+                    + "This requires manual intervention to fix";
+                log.error(errorMsg);
+                throw new BookieException.InvalidCookieException(errorMsg);
+            }
+        } else if (!regManagerCookie.get().getValue().equals(masterCookie)
+                   || !directoryCookies.stream().allMatch(c -> c.map(masterCookie::equals).orElse(false))) {
+            if (conf.isDataIntegrityStampMissingCookiesEnabled()) {
+                log.warn("ZK cookie({}) or directory cookies({}) do not match master cookie ({}), running check",
+                        regManagerCookie, directoryCookies, masterCookie);
+                try {
+                    dataIntegCheck.runPreBoot("INVALID_COOKIE").get();
+                } catch (ExecutionException ee) {
+                    if (ee.getCause() instanceof BookieException) {
+                        throw (BookieException) ee.getCause();
+                    } else {
+                        throw new BookieException.InvalidCookieException(ee.getCause());
+                    }
+                }
+                log.info("Environment should be in a sane state. Stamp new cookies");
+                stampCookie(masterCookie, regManagerCookie.get().getVersion(), directories);
+            } else {
+                String errorMsg = MessageFormat.format(
+                        "ZK cookie({0}) or directory cookies({1}) do not match master cookie ({2})"
+                                + " and missing cookie stamping is disabled.",
+                        regManagerCookie, directoryCookies, masterCookie);
+                log.error(errorMsg);
+                throw new BookieException.InvalidCookieException(errorMsg);
+            }
+        } // else all cookies match the masterCookie, meaning nothing has changed in the configuration

Review comment:
       what about logging something ?

##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/UncleanShutdownDetectionImpl.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.File;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used to determine if the prior shutdown was unclean or not. It does so
+ * by adding a file to each ledger directory after successful start-up
+ * and removing the file on graceful shutdown.
+ * Any abrupt termination will cause one or more of these files to not be cleared
+ * and so on the subsequent boot-up, the presence of any of these files will
+ * indicate an unclean shutdown.
+ */
+public class UncleanShutdownDetectionImpl implements UncleanShutdownDetection {
+    private static final Logger LOG = LoggerFactory.getLogger(UncleanShutdownDetectionImpl.class);
+    private final LedgerDirsManager ledgerDirsManager;
+    static final String DirtyFileName = "DIRTY";

Review comment:
       nit: DIRTY_FILENAME is a better name

##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/UncleanShutdownDetectionImpl.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.File;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used to determine if the prior shutdown was unclean or not. It does so
+ * by adding a file to each ledger directory after successful start-up
+ * and removing the file on graceful shutdown.
+ * Any abrupt termination will cause one or more of these files to not be cleared
+ * and so on the subsequent boot-up, the presence of any of these files will
+ * indicate an unclean shutdown.
+ */
+public class UncleanShutdownDetectionImpl implements UncleanShutdownDetection {
+    private static final Logger LOG = LoggerFactory.getLogger(UncleanShutdownDetectionImpl.class);
+    private final LedgerDirsManager ledgerDirsManager;
+    static final String DirtyFileName = "DIRTY";
+
+    public UncleanShutdownDetectionImpl(LedgerDirsManager ledgerDirsManager) {
+        this.ledgerDirsManager = ledgerDirsManager;
+    }
+
+    @Override
+    public void registerStartUp() {
+        for (File ledgerDir : ledgerDirsManager.getAllLedgerDirs()) {
+            try {
+                File dirtyFile = new File(ledgerDir, DirtyFileName);
+                dirtyFile.createNewFile();
+            } catch (Throwable t) {
+                LOG.error("Unable to register start-up (so an unclean shutdown cannot"

Review comment:
       +1 for crashing in this case
   
   can we also log the we created this file ?

##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/UncleanShutdownDetectionImpl.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.File;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used to determine if the prior shutdown was unclean or not. It does so
+ * by adding a file to each ledger directory after successful start-up
+ * and removing the file on graceful shutdown.
+ * Any abrupt termination will cause one or more of these files to not be cleared
+ * and so on the subsequent boot-up, the presence of any of these files will
+ * indicate an unclean shutdown.
+ */
+public class UncleanShutdownDetectionImpl implements UncleanShutdownDetection {
+    private static final Logger LOG = LoggerFactory.getLogger(UncleanShutdownDetectionImpl.class);
+    private final LedgerDirsManager ledgerDirsManager;
+    static final String DirtyFileName = "DIRTY";
+
+    public UncleanShutdownDetectionImpl(LedgerDirsManager ledgerDirsManager) {
+        this.ledgerDirsManager = ledgerDirsManager;
+    }
+
+    @Override
+    public void registerStartUp() {
+        for (File ledgerDir : ledgerDirsManager.getAllLedgerDirs()) {
+            try {
+                File dirtyFile = new File(ledgerDir, DirtyFileName);
+                dirtyFile.createNewFile();
+            } catch (Throwable t) {
+                LOG.error("Unable to register start-up (so an unclean shutdown cannot"
+                        + " be detected). Dirty file of ledger dir {} could not be created",
+                        ledgerDir.getAbsolutePath(), t);
+            }
+        }
+    }
+
+    @Override
+    public void registerCleanShutdown() {
+        for (File ledgerDir : ledgerDirsManager.getAllLedgerDirs()) {
+            try {
+                File dirtyFile = new File(ledgerDir, DirtyFileName);
+                dirtyFile.delete();

Review comment:
       we should check the return value here (I wonder why spotbugs didn't complaint)

##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/datainteg/DataIntegrityCheck.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.bookie.datainteg;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The interface for the data integrity check feature. This feature allows
+ * a bookie to handle data loss scenarios such as when running without
+ * the journal or after a disk failure has caused the loss of all data.
+ */
+public interface DataIntegrityCheck {
+    /**
+     * Run quick preboot check. This check should do enough to ensure that
+     * it is safe to complete the boot sequence without compromising correctness.
+     * To this end, if it finds that this bookie is part of the last ensemble of
+     * an unclosed ledger, it must prevent the bookie from being able store new
+     * entries for that ledger and must prevent the bookie from taking part in
+     * the discovery of the last entry of that ledger.
+     */
+    CompletableFuture<Void> runPreBoot(String reason);
+
+    /**
+     * Whether we need to run a full check.
+     * This condition can be set by the runPreBoot() call to run a full check
+     * in the background once the bookie is running. This can later be used
+     * to run the full check periodically, or to exponentially backoff and retry
+     * when some transient condition prevents a ledger being fixed during a
+     * full check.
+     */
+    boolean needsFull() throws IOException;

Review comment:
       nit: needsFullCheck

##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
##########
@@ -2288,6 +2288,8 @@ private int statusCodeToExceptionCode(StatusCode status) {
                 return BKException.Code.WriteOnReadOnlyBookieException;
             case ETOOMANYREQUESTS:
                 return BKException.Code.TooManyRequestsException;
+            case EUNKNOWN:

Review comment:
       what about naming this EUNKNOWNLEDGERSTATE ? otherwise people can think that is is kind of "unknown/generic error"

##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/datainteg/DataIntegrityCheckImpl.java
##########
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.bookie.datainteg;
+
+import com.google.common.collect.ImmutableSortedMap;
+import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.core.Maybe;
+import io.reactivex.rxjava3.core.Scheduler;
+import io.reactivex.rxjava3.core.Single;
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.bookie.LedgerStorage;
+import org.apache.bookkeeper.bookie.LedgerStorage.StorageState;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeperAdmin;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.net.BookieId;
+
+/**
+ * An implementation of the DataIntegrityCheck interface.
+ */
+@Slf4j
+public class DataIntegrityCheckImpl implements DataIntegrityCheck {
+    private static final int MAX_INFLIGHT = 300;
+    private static final int MAX_ENTRIES_INFLIGHT = 3000;
+    private static final int ZK_TIMEOUT_S = 30;
+    private final BookieId bookieId;
+    private final LedgerManager ledgerManager;
+    private final LedgerStorage ledgerStorage;
+    private final EntryCopier entryCopier;
+    private final BookKeeperAdmin admin;
+    private final Scheduler scheduler;
+    private final AtomicReference<Map<Long, LedgerMetadata>> ledgersCacheRef =
+        new AtomicReference<>(null);
+    private CompletableFuture<Void> preBootFuture;
+
+    public DataIntegrityCheckImpl(BookieId bookieId,
+                                  LedgerManager ledgerManager,
+                                  LedgerStorage ledgerStorage,
+                                  EntryCopier entryCopier,
+                                  BookKeeperAdmin admin,
+                                  Scheduler scheduler) {
+        this.bookieId = bookieId;
+        this.ledgerManager = ledgerManager;
+        this.ledgerStorage = ledgerStorage;
+        this.entryCopier = entryCopier;
+        this.admin = admin;
+        this.scheduler = scheduler;
+    }
+
+    @Override
+    public CompletableFuture<Void> runPreBoot(String reason) {
+        // we only run this once, it could be kicked off by different checks
+        synchronized (this) {
+            if (preBootFuture == null) {
+                preBootFuture = runPreBootSequence(reason);
+            }
+        }
+        return preBootFuture;
+
+    }
+
+    private CompletableFuture<Void> runPreBootSequence(String reason) {
+        String runId = UUID.randomUUID().toString();
+        log.info("Event: {}, RunId: {}, Reason: {}", Events.PREBOOT_START, runId, reason);
+        try {
+            this.ledgerStorage.setStorageStateFlag(StorageState.NEEDS_INTEGRITY_CHECK);
+        } catch (IOException ioe) {
+            log.error("Event: {}, RunId: {}", Events.PREBOOT_ERROR, runId, ioe);
+            return FutureUtils.exception(ioe);
+        }
+
+        MetadataAsyncIterator iter = new MetadataAsyncIterator(scheduler,
+                ledgerManager, MAX_INFLIGHT, ZK_TIMEOUT_S, TimeUnit.SECONDS);
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        Map<Long, LedgerMetadata> ledgersCache =

Review comment:
       we can keep this here, and address the problem in #1949

##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/datainteg/DataIntegrityCheckImpl.java
##########
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.bookie.datainteg;
+
+import com.google.common.collect.ImmutableSortedMap;
+import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.core.Maybe;
+import io.reactivex.rxjava3.core.Scheduler;
+import io.reactivex.rxjava3.core.Single;
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.bookie.LedgerStorage;
+import org.apache.bookkeeper.bookie.LedgerStorage.StorageState;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeperAdmin;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.net.BookieId;
+
+/**
+ * An implementation of the DataIntegrityCheck interface.
+ */
+@Slf4j
+public class DataIntegrityCheckImpl implements DataIntegrityCheck {
+    private static final int MAX_INFLIGHT = 300;
+    private static final int MAX_ENTRIES_INFLIGHT = 3000;
+    private static final int ZK_TIMEOUT_S = 30;
+    private final BookieId bookieId;
+    private final LedgerManager ledgerManager;
+    private final LedgerStorage ledgerStorage;
+    private final EntryCopier entryCopier;
+    private final BookKeeperAdmin admin;
+    private final Scheduler scheduler;
+    private final AtomicReference<Map<Long, LedgerMetadata>> ledgersCacheRef =
+        new AtomicReference<>(null);
+    private CompletableFuture<Void> preBootFuture;
+
+    public DataIntegrityCheckImpl(BookieId bookieId,
+                                  LedgerManager ledgerManager,
+                                  LedgerStorage ledgerStorage,
+                                  EntryCopier entryCopier,
+                                  BookKeeperAdmin admin,
+                                  Scheduler scheduler) {
+        this.bookieId = bookieId;
+        this.ledgerManager = ledgerManager;
+        this.ledgerStorage = ledgerStorage;
+        this.entryCopier = entryCopier;
+        this.admin = admin;
+        this.scheduler = scheduler;
+    }
+
+    @Override
+    public CompletableFuture<Void> runPreBoot(String reason) {
+        // we only run this once, it could be kicked off by different checks
+        synchronized (this) {
+            if (preBootFuture == null) {
+                preBootFuture = runPreBootSequence(reason);
+            }
+        }
+        return preBootFuture;
+
+    }
+
+    private CompletableFuture<Void> runPreBootSequence(String reason) {
+        String runId = UUID.randomUUID().toString();
+        log.info("Event: {}, RunId: {}, Reason: {}", Events.PREBOOT_START, runId, reason);
+        try {
+            this.ledgerStorage.setStorageStateFlag(StorageState.NEEDS_INTEGRITY_CHECK);
+        } catch (IOException ioe) {
+            log.error("Event: {}, RunId: {}", Events.PREBOOT_ERROR, runId, ioe);
+            return FutureUtils.exception(ioe);
+        }
+
+        MetadataAsyncIterator iter = new MetadataAsyncIterator(scheduler,
+                ledgerManager, MAX_INFLIGHT, ZK_TIMEOUT_S, TimeUnit.SECONDS);
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        Map<Long, LedgerMetadata> ledgersCache =
+            new ConcurrentSkipListMap<>(Comparator.<Long>naturalOrder().reversed());
+        iter.forEach((ledgerId, metadata) -> {
+                if (ensemblesContainBookie(metadata, bookieId)) {
+                    ledgersCache.put(ledgerId, metadata);
+                    try {
+                        if (!ledgerStorage.ledgerExists(ledgerId)) {
+                            ledgerStorage.setMasterKey(ledgerId, new byte[0]);
+                        }
+                    } catch (IOException ioe) {
+                        return FutureUtils.exception(ioe);

Review comment:
       what about logging ledgerId...or other useful context ?

##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/UncleanShutdownDetectionImpl.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.File;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used to determine if the prior shutdown was unclean or not. It does so
+ * by adding a file to each ledger directory after successful start-up
+ * and removing the file on graceful shutdown.
+ * Any abrupt termination will cause one or more of these files to not be cleared
+ * and so on the subsequent boot-up, the presence of any of these files will
+ * indicate an unclean shutdown.
+ */
+public class UncleanShutdownDetectionImpl implements UncleanShutdownDetection {
+    private static final Logger LOG = LoggerFactory.getLogger(UncleanShutdownDetectionImpl.class);
+    private final LedgerDirsManager ledgerDirsManager;
+    static final String DirtyFileName = "DIRTY";
+
+    public UncleanShutdownDetectionImpl(LedgerDirsManager ledgerDirsManager) {
+        this.ledgerDirsManager = ledgerDirsManager;
+    }
+
+    @Override
+    public void registerStartUp() {
+        for (File ledgerDir : ledgerDirsManager.getAllLedgerDirs()) {
+            try {
+                File dirtyFile = new File(ledgerDir, DirtyFileName);
+                dirtyFile.createNewFile();
+            } catch (Throwable t) {
+                LOG.error("Unable to register start-up (so an unclean shutdown cannot"
+                        + " be detected). Dirty file of ledger dir {} could not be created",
+                        ledgerDir.getAbsolutePath(), t);
+            }
+        }
+    }
+
+    @Override
+    public void registerCleanShutdown() {
+        for (File ledgerDir : ledgerDirsManager.getAllLedgerDirs()) {
+            try {
+                File dirtyFile = new File(ledgerDir, DirtyFileName);
+                dirtyFile.delete();
+            } catch (Throwable t) {
+                LOG.error("Unable to register a clean shutdown, dirty file of "
+                        + " ledger dir {} could not be deleted",
+                        ledgerDir.getAbsolutePath(), t);
+            }
+        }
+    }
+
+    @Override
+    public boolean lastShutdownWasUnclean() {
+        try {
+            for (File ledgerDir : ledgerDirsManager.getAllLedgerDirs()) {
+                File dirtyFile = new File(ledgerDir, DirtyFileName);
+                if (dirtyFile.exists()) {
+                    return true;

Review comment:
       can we log something, like "Detected un clean shutdown as file xxx exists"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] Vanlightly commented on a change in pull request #2936: BP-46: Data integrity check for running without journal

Posted by GitBox <gi...@apache.org>.
Vanlightly commented on a change in pull request #2936:
URL: https://github.com/apache/bookkeeper/pull/2936#discussion_r772238051



##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
##########
@@ -2288,6 +2288,8 @@ private int statusCodeToExceptionCode(StatusCode status) {
                 return BKException.Code.WriteOnReadOnlyBookieException;
             case ETOOMANYREQUESTS:
                 return BKException.Code.TooManyRequestsException;
+            case EUNKNOWN:

Review comment:
       Yes I like that, done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] eolivelli merged pull request #2936: BP-46: Data integrity check for running without journal

Posted by GitBox <gi...@apache.org>.
eolivelli merged pull request #2936:
URL: https://github.com/apache/bookkeeper/pull/2936


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] eolivelli commented on pull request #2936: BP-46: Data integrity check for running without journal

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #2936:
URL: https://github.com/apache/bookkeeper/pull/2936#issuecomment-1008361887


   @Vanlightly this patch has been around for a while, I believe it is time to commit it.
   
   Can you please resolve the conflicts?
   I will be happy to merge


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] Vanlightly commented on a change in pull request #2936: BP-46: Data integrity check for running without journal

Posted by GitBox <gi...@apache.org>.
Vanlightly commented on a change in pull request #2936:
URL: https://github.com/apache/bookkeeper/pull/2936#discussion_r772209490



##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
##########
@@ -327,6 +328,13 @@ public boolean ledgerExists(long ledgerId) throws IOException {
         return ledgerCache.ledgerExists(ledgerId);
     }
 
+    @Override
+    public boolean entryExists(long ledgerId, long entryId) throws IOException {
+        //Implementation should be as simple as what's below, but this needs testing
+        //return ledgerCache.getEntryOffset(ledgerId, entryId) > 0;
+        throw new UnsupportedOperationException("entry exists not supported");

Review comment:
       Given the bookie is not working anyway, I think the UnsupportedOperationException more clearly communicates to the person reading this code that this methods needs implementing for this ledger storage.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] Vanlightly commented on a change in pull request #2936: BP-46: Data integrity check for running without journal

Posted by GitBox <gi...@apache.org>.
Vanlightly commented on a change in pull request #2936:
URL: https://github.com/apache/bookkeeper/pull/2936#discussion_r772203540



##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/datainteg/DataIntegrityCookieValidation.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.bookie.datainteg;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.bookie.BookieImpl;
+import org.apache.bookkeeper.bookie.Cookie;
+import org.apache.bookkeeper.bookie.CookieValidation;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.discover.RegistrationManager;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An implementation of the CookieValidation interface that allows for auto-stamping
+ * cookies when configured and used in conjunction with the data integrity service.
+ * Because the data integrity service can heal a bookie with lost data due to a disk
+ * failure, a bookie can auto stamp new cookies as part of the healing process.
+ */
+public class DataIntegrityCookieValidation implements CookieValidation {
+    private static final Logger log = LoggerFactory.getLogger(DataIntegrityCookieValidation.class);
+    private final ServerConfiguration conf;
+    private final BookieId bookieId;
+    private final RegistrationManager registrationManager;
+    private final DataIntegrityCheck dataIntegCheck;
+
+    public DataIntegrityCookieValidation(ServerConfiguration conf,
+                                         RegistrationManager registrationManager,
+                                         DataIntegrityCheck dataIntegCheck)
+            throws UnknownHostException {
+        this.conf = conf;
+        this.registrationManager = registrationManager;
+        this.bookieId = BookieImpl.getBookieId(conf);
+        this.dataIntegCheck = dataIntegCheck;
+    }
+
+    private Optional<Versioned<Cookie>> getRegManagerCookie() throws BookieException {
+        try {
+            return Optional.of(Cookie.readFromRegistrationManager(registrationManager, bookieId));
+        } catch (BookieException.CookieNotFoundException noCookieException) {
+            return Optional.empty();
+        }
+    }
+
+    private List<Optional<Cookie>> collectDirectoryCookies(List<File> directories) throws BookieException {
+        List<Optional<Cookie>> cookies = new ArrayList<>();
+        for (File d : directories) {
+            try {
+                cookies.add(Optional.of(Cookie.readFromDirectory(d)));
+            } catch (FileNotFoundException fnfe) {
+                cookies.add(Optional.empty());
+            } catch (IOException ioe) {
+                throw new BookieException.InvalidCookieException(ioe);
+            }
+        }
+        return cookies;
+    }
+
+    private void stampCookie(Cookie masterCookie, Version expectedVersion, List<File> directories)
+            throws BookieException {
+        // stamp to ZK first as it's the authoritive cookie. If this fails part way through
+        // stamping the directories, then a data integrity check will occur.
+        log.info("Stamping cookie to ZK");
+        masterCookie.writeToRegistrationManager(registrationManager, conf, expectedVersion);
+        for (File d : directories) {
+            try {
+                log.info("Stamping cookie to directory {}", d);
+                masterCookie.writeToDirectory(d);
+            } catch (IOException ioe) {
+                log.error("Exception writing cookie to {}", ioe);
+                throw new BookieException.InvalidCookieException(ioe);
+            }
+        }
+    }
+
+    @Override
+    public void checkCookies(List<File> directories)
+            throws BookieException, InterruptedException {
+        String instanceId = registrationManager.getClusterInstanceId();
+        if (instanceId == null) {
+            throw new BookieException.InvalidCookieException("Cluster instance ID unavailable");
+        }
+        Cookie masterCookie;
+        try {
+            masterCookie = Cookie.generateCookie(conf).setInstanceId(instanceId).build();
+        } catch (UnknownHostException uhe) {
+            throw new BookieException.InvalidCookieException(uhe);
+        }
+
+        // collect existing cookies
+        Optional<Versioned<Cookie>> regManagerCookie = getRegManagerCookie();
+        List<Optional<Cookie>> directoryCookies = collectDirectoryCookies(directories);
+
+        // if master is empty, everything must be empty, otherwise the cluster is messed up
+        if (!regManagerCookie.isPresent()) {
+            // if everything is empty, it's a new install, just stamp the cookies
+            if (directoryCookies.stream().noneMatch(Optional::isPresent)) {
+                log.info("New environment found. Stamping cookies");
+                stampCookie(masterCookie, Version.NEW, directories);
+            } else {
+                String errorMsg =
+                    "Cookie missing from ZK. Either it was manually deleted, "
+                    + "or the bookie was started pointing to a different ZK cluster "
+                    + "than the one it was originally started with. "
+                    + "This requires manual intervention to fix";
+                log.error(errorMsg);
+                throw new BookieException.InvalidCookieException(errorMsg);
+            }
+        } else if (!regManagerCookie.get().getValue().equals(masterCookie)
+                   || !directoryCookies.stream().allMatch(c -> c.map(masterCookie::equals).orElse(false))) {
+            if (conf.isDataIntegrityStampMissingCookiesEnabled()) {
+                log.warn("ZK cookie({}) or directory cookies({}) do not match master cookie ({}), running check",
+                        regManagerCookie, directoryCookies, masterCookie);
+                try {
+                    dataIntegCheck.runPreBoot("INVALID_COOKIE").get();
+                } catch (ExecutionException ee) {
+                    if (ee.getCause() instanceof BookieException) {
+                        throw (BookieException) ee.getCause();
+                    } else {
+                        throw new BookieException.InvalidCookieException(ee.getCause());
+                    }
+                }
+                log.info("Environment should be in a sane state. Stamp new cookies");
+                stampCookie(masterCookie, regManagerCookie.get().getVersion(), directories);
+            } else {
+                String errorMsg = MessageFormat.format(
+                        "ZK cookie({0}) or directory cookies({1}) do not match master cookie ({2})"
+                                + " and missing cookie stamping is disabled.",
+                        regManagerCookie, directoryCookies, masterCookie);
+                log.error(errorMsg);
+                throw new BookieException.InvalidCookieException(errorMsg);
+            }
+        } // else all cookies match the masterCookie, meaning nothing has changed in the configuration

Review comment:
       The line above logs the error.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] dlg99 commented on a change in pull request #2936: BP-46: Data integrity check for running without journal

Posted by GitBox <gi...@apache.org>.
dlg99 commented on a change in pull request #2936:
URL: https://github.com/apache/bookkeeper/pull/2936#discussion_r779107568



##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
##########
@@ -1013,4 +1077,91 @@ public void diskJustWritable(File disk) {
             }
         };
     }
+
+    @Override
+    public void setLimboState(long ledgerId) throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug("setLimboState. ledger: {}", ledgerId);
+        }
+        ledgerIndex.setLimbo(ledgerId);
+    }
+
+    @Override
+    public boolean hasLimboState(long ledgerId) throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug("hasLimboState. ledger: {}", ledgerId);
+        }
+        return ledgerIndex.get(ledgerId).getLimbo();
+    }
+
+    @Override
+    public void clearLimboState(long ledgerId) throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug("clearLimboState. ledger: {}", ledgerId);
+        }
+        ledgerIndex.clearLimbo(ledgerId);
+    }
+
+    private void throwIfLimbo(long ledgerId) throws IOException, BookieException {

Review comment:
       this is mostly for the convenience of the IDE code generation (+ documentation) and a rather generic discussion:
   if the method throws BookieException, the IDE-generated catch will put one catch block and developer may need to chase code down to understand what specifically can happen. 
   If the exceptions in throws are specified as "throws DataUnknownException, OperationRejectedException, MetadataStoreException" one can easily see that exception handling might need to be different (or use catch BookieException if it does not).
   This one is a private method with limited scope of use but, if its exception is rethrown, more specific exception in throws lets one define more specific exception in the throws of method that uses it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] eolivelli commented on pull request #2936: BP-46: Data integrity check for running without journal

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #2936:
URL: https://github.com/apache/bookkeeper/pull/2936#issuecomment-1008729059


   The conflict is about an ExceptionCode:
   
     int DataUnknownException = -110;
   vs
     int EntryLogMetadataMapException = -110;
     
   How can we solve the conflict ?
   if `DataUnknownException` is already running in production somewhere I would keep DataUnknownException as -110
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org