You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2018/01/24 21:38:42 UTC

[incubator-pulsar] branch master updated: Introduce config to skip non-recoverable data-ledger (#1046)

This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f5a2ec7  Introduce config to skip non-recoverable data-ledger (#1046)
f5a2ec7 is described below

commit f5a2ec74e834e50f1970927f8fa6c107f7a3453c
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Wed Jan 24 13:38:39 2018 -0800

    Introduce config to skip non-recoverable data-ledger (#1046)
---
 conf/broker.conf                                   |   3 +
 conf/standalone.conf                               |   3 +
 .../bookkeeper/mledger/ManagedLedgerConfig.java    |  15 +++
 .../bookkeeper/mledger/ManagedLedgerException.java |  13 +++
 .../bookkeeper/mledger/impl/EntryCacheImpl.java    |  11 +-
 .../bookkeeper/mledger/impl/EntryCacheManager.java |   3 +-
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  25 +++--
 .../mledger/impl/ManagedLedgerFactoryImpl.java     |   3 +-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  50 +++++++--
 .../bookkeeper/mledger/impl/OpReadEntry.java       |  25 ++++-
 .../apache/pulsar/broker/ServiceConfiguration.java |  12 ++
 .../pulsar/broker/service/BrokerService.java       |  26 +++++
 .../broker/service/BrokerBkEnsemblesTests.java     | 125 ++++++++++++++++++++-
 site/_data/config/broker.yaml                      |   3 +
 site/_data/config/standalone.yaml                  |   2 +
 15 files changed, 287 insertions(+), 32 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 20da0a9..d983b3f 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -300,6 +300,9 @@ managedLedgerMaxUnackedRangesToPersist=10000
 # zookeeper.
 managedLedgerMaxUnackedRangesToPersistInZooKeeper=1000
 
+# Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list. It helps when data-ledgers gets
+# corrupted at bookkeeper and managed-cursor is stuck at that ledger.
+autoSkipNonRecoverableData=false
 
 ### --- Load balancer --- ###
 
diff --git a/conf/standalone.conf b/conf/standalone.conf
index ce35f2e..ec400fb 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -266,6 +266,9 @@ managedLedgerMaxUnackedRangesToPersist=10000
 # zookeeper.
 managedLedgerMaxUnackedRangesToPersistInZooKeeper=1000
 
+# Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list. It helps when data-ledgers gets
+# corrupted at bookkeeper and managed-cursor is stuck at that ledger.
+autoSkipNonRecoverableData=false
 
 ### --- Load balancer --- ###
 
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index 391a484..6f9847b 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -51,6 +51,7 @@ public class ManagedLedgerConfig {
     private double throttleMarkDelete = 0;
     private long retentionTimeMs = 0;
     private long retentionSizeInMB = 0;
+    private boolean autoSkipNonRecoverableData;
 
     private DigestType digestType = DigestType.MAC;
     private byte[] password = "".getBytes(Charsets.UTF_8);
@@ -354,6 +355,20 @@ public class ManagedLedgerConfig {
     }
 
     /**
+     * Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list. It helps when data-ledgers gets
+     * corrupted at bookkeeper and managed-cursor is stuck at that ledger.
+     * 
+     * @param autoSkipNonRecoverableData
+     */
+    public boolean isAutoSkipNonRecoverableData() {
+        return autoSkipNonRecoverableData;
+    }
+
+    public void setAutoSkipNonRecoverableData(boolean skipNonRecoverableData) {
+        this.autoSkipNonRecoverableData = skipNonRecoverableData;
+    }
+
+    /**
      * @return max unacked message ranges that will be persisted and recovered.
      *
      */
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
index 1817aaf..f5c4243 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
@@ -31,6 +31,13 @@ public class ManagedLedgerException extends Exception {
         super(e);
     }
 
+    public static ManagedLedgerException getManagedLedgerException(Throwable e) {
+        if (e instanceof ManagedLedgerException) {
+            return (ManagedLedgerException) e;
+        }
+        return new ManagedLedgerException(e);
+    }
+    
     public static class MetaStoreException extends ManagedLedgerException {
         public MetaStoreException(Exception e) {
             super(e);
@@ -89,6 +96,12 @@ public class ManagedLedgerException extends Exception {
         }
     }
 
+    public static class NonRecoverableLedgerException extends ManagedLedgerException {
+        public NonRecoverableLedgerException(String msg) {
+            super(msg);
+        }
+    }
+
     public static class InvalidReplayPositionException extends ManagedLedgerException {
         public InvalidReplayPositionException(String msg) {
             super(msg);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
index b89dfb5..469a0e9 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
@@ -25,14 +25,13 @@ import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 import java.util.Collection;
 import java.util.List;
 
-import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
-import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
+import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
 import org.apache.bookkeeper.mledger.util.Pair;
 import org.apache.bookkeeper.mledger.util.RangeCache;
 import org.apache.bookkeeper.mledger.util.RangeCache.Weighter;
@@ -184,7 +183,7 @@ public class EntryCacheImpl implements EntryCache {
             lh.asyncReadEntries(position.getEntryId(), position.getEntryId(), (rc, ledgerHandle, sequence, obj) -> {
                 if (rc != BKException.Code.OK) {
                     ml.invalidateLedgerHandle(ledgerHandle, rc);
-                    callback.readEntryFailed(new ManagedLedgerException(BKException.create(rc)), obj);
+                    callback.readEntryFailed(createManagedLedgerException(rc), obj);
                     return;
                 }
 
@@ -253,11 +252,11 @@ public class EntryCacheImpl implements EntryCache {
 
                 if (rc != BKException.Code.OK) {
                     if (rc == BKException.Code.TooManyRequestsException) {
-                        callback.readEntriesFailed(new TooManyRequestsException("Too many request error from bookies"),
-                                ctx);
+                        callback.readEntriesFailed(createManagedLedgerException(rc), ctx);
                     } else {
                         ml.invalidateLedgerHandle(lh1, rc);
-                        callback.readEntriesFailed(new ManagedLedgerException(BKException.getMessage(rc)), ctx);
+                        ManagedLedgerException mlException = createManagedLedgerException(rc);
+                        callback.readEntriesFailed(mlException, ctx);
                     }
                     return;
                 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
index a1e4219..7c536b7 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
+import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
 import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 
 import java.util.Enumeration;
@@ -197,7 +198,7 @@ public class EntryCacheManager {
             lh.asyncReadEntries(firstEntry, lastEntry, new ReadCallback() {
                 public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object bkctx) {
                     if (rc != BKException.Code.OK) {
-                        callback.readEntriesFailed(new ManagedLedgerException(BKException.create(rc)), ctx);
+                        callback.readEntriesFailed(createManagedLedgerException(rc), ctx);
                         return;
                     }
 
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 07f6f7d..22cdf3d 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -82,6 +82,8 @@ import com.google.common.util.concurrent.RateLimiter;
 import com.google.protobuf.InvalidProtocolBufferException;
 import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_RETRIES;
 import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC;
+import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
+import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException;
 
 public class ManagedCursorImpl implements ManagedCursor {
 
@@ -281,7 +283,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                     log.warn("[{}] Error reading from metadata ledger {} for consumer {}: {}", ledger.getName(),
                             ledgerId, name, BKException.getMessage(rc1));
 
-                    callback.operationFailed(new ManagedLedgerException(BKException.getMessage(rc1)));
+                    callback.operationFailed(createManagedLedgerException(rc1));
                     return;
                 }
 
@@ -330,8 +332,12 @@ public class ManagedCursorImpl implements ManagedCursor {
         // if the position was at a ledger that didn't exist (since it will be deleted if it was previously empty),
         // we need to move to the next existing ledger
         if (!ledger.ledgerExists(position.getLedgerId())) {
-            long nextExistingLedger = ledger.getNextValidLedger(position.getLedgerId());
-            position = PositionImpl.get(nextExistingLedger, -1);
+            Long nextExistingLedger = ledger.getNextValidLedger(position.getLedgerId());
+            if (nextExistingLedger == null) {
+                log.info("[{}-{}] Couldn't find next next valid ledger for recovery {}", ledger.getName(), name,
+                        position);
+            }
+            position = nextExistingLedger != null ? PositionImpl.get(nextExistingLedger, -1) : position;
         }
         log.info("[{}] Cursor {} recovered to position {}", ledger.getName(), name, position);
 
@@ -1311,7 +1317,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         try {
             newPosition = setAcknowledgedPosition(newPosition);
         } catch (IllegalArgumentException e) {
-            callback.markDeleteFailed(new ManagedLedgerException(e), ctx);
+            callback.markDeleteFailed(getManagedLedgerException(e), ctx);
             return;
         } finally {
             lock.writeLock().unlock();
@@ -1555,7 +1561,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         } catch (Exception e) {
             log.warn("[{}] [{}] Error while updating individualDeletedMessages [{}]", ledger.getName(), name,
                     e.getMessage(), e);
-            callback.deleteFailed(new ManagedLedgerException(e), ctx);
+            callback.deleteFailed(getManagedLedgerException(e), ctx);
             return;
         } finally {
             lock.writeLock().unlock();
@@ -2040,7 +2046,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                 // If we've had a write error, the ledger will be automatically closed, we need to create a new one,
                 // in the meantime the mark-delete will be queued.
                 STATE_UPDATER.compareAndSet(ManagedCursorImpl.this, State.Open, State.NoLedger);
-                callback.operationFailed(new ManagedLedgerException(BKException.getMessage(rc)));
+                callback.operationFailed(createManagedLedgerException(rc));
             }
         }, null);
     }
@@ -2127,7 +2133,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                 if (rc == BKException.Code.OK) {
                     callback.closeComplete(ctx);
                 } else {
-                    callback.closeFailed(new ManagedLedgerException(BKException.getMessage(rc)), ctx);
+                    callback.closeFailed(createManagedLedgerException(rc), ctx);
                 }
             }
         }, ctx);
@@ -2301,6 +2307,11 @@ public class ManagedCursorImpl implements ManagedCursor {
         return position.getNext();
     }
 
+    public Position getNextLedgerPosition(long currentLedgerId) {
+        Long nextExistingLedger = ledger.getNextValidLedger(currentLedgerId);
+        return nextExistingLedger!=null ? PositionImpl.get(nextExistingLedger, 0) : null;
+    }
+
     public boolean isIndividuallyDeletedEntriesEmpty() {
         lock.readLock().lock();
         try {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 07c2411..d461f9a 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -71,6 +71,7 @@ import com.google.common.base.Predicates;
 import com.google.common.collect.Maps;
 
 import io.netty.util.concurrent.DefaultThreadFactory;
+import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException;
 
 public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
     private final MetaStore store;
@@ -432,7 +433,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
                             // Completed all the cursors info
                             callback.getInfoComplete(info, ctx);
                         }).exceptionally((ex) -> {
-                            callback.getInfoFailed(new ManagedLedgerException(ex), ctx);
+                            callback.getInfoFailed(getManagedLedgerException(ex.getCause()), ctx);
                             return null;
                         });
                     }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 84a4d8e..767d3dc 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -21,6 +21,7 @@ package org.apache.bookkeeper.mledger.impl;
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.lang.Math.min;
 import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC;
+import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
 import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 
 import java.util.Iterator;
@@ -61,6 +62,8 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.BadVersionException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerFencedException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerTerminatedException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
+import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
 import org.apache.bookkeeper.mledger.ManagedLedgerMXBean;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
@@ -89,6 +92,7 @@ import com.google.common.util.concurrent.RateLimiter;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException;
 
 public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     private final static long MegaByte = 1024 * 1024;
@@ -263,7 +267,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                                 initializeBookKeeper(callback);
                             } else {
                                 log.error("[{}] Failed to open ledger {}: {}", name, id, BKException.getMessage(rc));
-                                callback.initializeFailed(new ManagedLedgerException(BKException.getMessage(rc)));
+                                callback.initializeFailed(createManagedLedgerException(rc));
                                 return;
                             }
                         }));
@@ -337,7 +341,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                     executor.submitOrdered(name, safeRun(() -> {
                         mbean.endDataLedgerCreateOp();
                         if (rc != BKException.Code.OK) {
-                            callback.initializeFailed(new ManagedLedgerException(BKException.getMessage(rc)));
+                            callback.initializeFailed(createManagedLedgerException(rc));
                             return;
                         }
 
@@ -908,7 +912,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             }
             mbean.endDataLedgerCloseOp();
             if (rc != BKException.Code.OK) {
-                callback.terminateFailed(new ManagedLedgerException(BKException.getMessage(rc)), ctx);
+                callback.terminateFailed(createManagedLedgerException(rc), ctx);
             } else {
                 lastConfirmedEntry = new PositionImpl(lh.getId(), lh.getLastAddConfirmed());
                 // Store the new state in metadata
@@ -1042,7 +1046,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             }
             mbean.endDataLedgerCloseOp();
             if (rc != BKException.Code.OK) {
-                callback.closeFailed(new ManagedLedgerException(BKException.getMessage(rc)), ctx);
+                callback.closeFailed(createManagedLedgerException(rc), ctx);
                 return;
             }
 
@@ -1062,7 +1066,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         Futures.waitForAll(futures).thenRun(() -> {
             callback.closeComplete(ctx);
         }).exceptionally(exception -> {
-            callback.closeFailed(new ManagedLedgerException(exception), ctx);
+            callback.closeFailed(getManagedLedgerException(exception.getCause()), ctx);
             return null;
         });
     }
@@ -1078,7 +1082,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         mbean.endDataLedgerCreateOp();
         if (rc != BKException.Code.OK) {
             log.error("[{}] Error creating ledger rc={} {}", name, rc, BKException.getMessage(rc));
-            ManagedLedgerException status = new ManagedLedgerException(BKException.getMessage(rc));
+            ManagedLedgerException status = createManagedLedgerException(rc);
 
             // Empty the list of pending requests and make all of them fail
             clearPendingAddEntries(status);
@@ -1278,7 +1282,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             }).exceptionally(ex -> {
                 log.error("[{}] Error opening ledger for reading at position {} - {}", name, opReadEntry.readPosition,
                         ex.getMessage());
-                opReadEntry.readEntriesFailed(new ManagedLedgerException(ex), opReadEntry.ctx);
+                opReadEntry.readEntriesFailed(getManagedLedgerException(ex.getCause()), opReadEntry.ctx);
                 return null;
             });
         }
@@ -1306,7 +1310,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                             if (rc != BKException.Code.OK) {
                                 // Remove the ledger future from cache to give chance to reopen it later
                                 ledgerCache.remove(ledgerId, future);
-                                future.completeExceptionally(new ManagedLedgerException(BKException.getMessage(rc)));
+                                future.completeExceptionally(createManagedLedgerException(rc));
                             } else {
                                 if (log.isDebugEnabled()) {
                                     log.debug("[{}] Successfully opened ledger {} for reading", name, lh.getId());
@@ -1347,7 +1351,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                 entryCache.asyncReadEntry(ledger, position, callback, ctx);
             }).exceptionally(ex -> {
                 log.error("[{}] Error opening ledger for reading at position {} - {}", name, position, ex.getMessage());
-                callback.readEntryFailed(new ManagedLedgerException(ex), ctx);
+                callback.readEntryFailed(getManagedLedgerException(ex.getCause()), ctx);
                 return null;
             });
         }
@@ -1744,7 +1748,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                     int toDelete = ledgersToDelete.get();
                     if (toDelete != -1 && ledgersToDelete.compareAndSet(toDelete, -1)) {
                         // Trigger callback only once
-                        callback.deleteLedgerFailed(new ManagedLedgerException(BKException.getMessage(rc)), ctx);
+                        callback.deleteLedgerFailed(createManagedLedgerException(rc), ctx);
                     }
                 }
             }, null);
@@ -1956,7 +1960,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         return ledgers.get(ledgerId) != null;
     }
 
-    long getNextValidLedger(long ledgerId) {
+    Long getNextValidLedger(long ledgerId) {
         return ledgers.ceilingKey(ledgerId + 1);
     }
 
@@ -2172,6 +2176,30 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         return entryCache.getSize();
     }
 
+    /**
+     * return BK error codes that are considered not likely to be recoverable
+     */
+    private static boolean isBkErrorNotRecoverable(int rc) {
+        switch (rc) {
+        case BKException.Code.NoSuchLedgerExistsException:
+        case BKException.Code.NoSuchEntryException:
+            return true;
+
+        default:
+            return false;
+        }
+    }
+
+    public static ManagedLedgerException createManagedLedgerException(int bkErrorCode) {
+        if (bkErrorCode == BKException.Code.TooManyRequestsException) {
+            return new TooManyRequestsException("Too many request error from bookies");
+        } else if (isBkErrorNotRecoverable(bkErrorCode)) {
+            return new NonRecoverableLedgerException(BKException.getMessage(bkErrorCode));
+        } else {
+            return new ManagedLedgerException(BKException.getMessage(bkErrorCode));
+        }
+    }
+    
     private static final Logger log = LoggerFactory.getLogger(ManagedLedgerImpl.class);
 
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
index e7083b9..aa6cda2 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
@@ -25,6 +25,7 @@ import java.util.List;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
 import org.apache.bookkeeper.mledger.Position;
 import org.slf4j.Logger;
@@ -80,7 +81,7 @@ public class OpReadEntry implements ReadEntriesCallback {
     }
 
     @Override
-    public void readEntriesFailed(ManagedLedgerException status, Object ctx) {
+    public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
         cursor.readOperationCompleted();
 
         if (!entries.isEmpty()) {
@@ -89,10 +90,24 @@ public class OpReadEntry implements ReadEntriesCallback {
                 callback.readEntriesComplete(entries, ctx);
                 recycle();
             }));
+        } else if (cursor.config.isAutoSkipNonRecoverableData() && exception instanceof NonRecoverableLedgerException) {
+            log.warn("[{}][{}] read failed from ledger at position:{} : {}", cursor.ledger.getName(), cursor.getName(),
+                    readPosition, exception.getMessage());
+            // try to find and move to next valid ledger
+            final Position nexReadPosition = cursor.getNextLedgerPosition(readPosition.getLedgerId());
+            // fail callback if it couldn't find next valid ledger
+            if (nexReadPosition == null) {
+                callback.readEntriesFailed(exception, ctx);
+                cursor.ledger.mbean.recordReadEntriesError();
+                recycle();
+                return;
+            }
+            updateReadPosition(nexReadPosition);
+            checkReadCompletion();
         } else {
-            if (!(status instanceof TooManyRequestsException)) {
-                log.warn("[{}][{}] read failed from ledger at position:{} : {}", cursor.ledger.getName(), cursor.getName(),
-                        readPosition, status.getMessage());
+            if (!(exception instanceof TooManyRequestsException)) {
+                log.warn("[{}][{}] read failed from ledger at position:{} : {}", cursor.ledger.getName(),
+                        cursor.getName(), readPosition, exception.getMessage());
             } else {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}][{}] read throttled failed from ledger at position:{}", cursor.ledger.getName(),
@@ -100,7 +115,7 @@ public class OpReadEntry implements ReadEntriesCallback {
                 }
             }
 
-            callback.readEntriesFailed(status, ctx);
+            callback.readEntriesFailed(exception, ctx);
             cursor.ledger.mbean.recordReadEntriesError();
             recycle();
         }
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 069933c..2585f33 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -285,6 +285,10 @@ public class ServiceConfiguration implements PulsarConfiguration {
     // than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into
     // zookeeper.
     private int managedLedgerMaxUnackedRangesToPersistInZooKeeper = 1000;
+    // Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list. It helps when data-ledgers gets
+    // corrupted at bookkeeper and managed-cursor is stuck at that ledger.
+    @FieldContext(dynamic = true)
+    private boolean autoSkipNonRecoverableData = false;
 
     /*** --- Load balancer --- ****/
     // Enable load balancer
@@ -1033,6 +1037,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
         this.managedLedgerMaxUnackedRangesToPersistInZooKeeper = managedLedgerMaxUnackedRangesToPersistInZookeeper;
     }
 
+    public boolean isAutoSkipNonRecoverableData() {
+        return autoSkipNonRecoverableData;
+    }
+
+    public void setAutoSkipNonRecoverableData(boolean skipNonRecoverableLedger) {
+        this.autoSkipNonRecoverableData = skipNonRecoverableLedger;
+    }
+
     public boolean isLoadBalancerEnabled() {
         return loadBalancerEnabled;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index bf7bdbd..74ca915 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1063,6 +1063,10 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
         registerConfigurationListener("dispatchThrottlingRatePerTopicInByte", (dispatchRatePerTopicInByte) -> {
             updateTopicMessageDispatchRate();
         });
+        // add listener to update managed-ledger config to skipNonRecoverableLedgers
+        registerConfigurationListener("autoSkipNonRecoverableData", (skipNonRecoverableLedger) -> {
+            updateManagedLedgerConfig();
+        });
         // add more listeners here
     }
 
@@ -1087,6 +1091,28 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
         });
     }
 
+    private void updateManagedLedgerConfig() {
+        this.pulsar().getExecutor().submit(() -> {
+            // update managed-ledger config of each topic
+            topics.forEach((name, topicFuture) -> {
+                if (topicFuture.isDone()) {
+                    String topicName = null;
+                    try {
+                        if (topicFuture.getNow(null) instanceof PersistentTopic) {
+                            PersistentTopic topic = (PersistentTopic) topicFuture.get();
+                            topicName = topicFuture.get().getName();
+                            // update skipNonRecoverableLedger configuration
+                            topic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(
+                                    pulsar.getConfiguration().isAutoSkipNonRecoverableData());
+                        }
+                    } catch (Exception e) {
+                        log.warn("[{}] failed to update managed-ledger config", topicName, e);
+                    }
+                }
+            });
+        });
+    }
+    
     /**
      * Allows a listener to listen on update of {@link ServiceConfiguration} change, so listener can take appropriate
      * action if any specific config-field value has been changed.
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
index 347ffaf..8e8f31d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
@@ -18,15 +18,23 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
+
 import java.lang.reflect.Field;
 import java.net.URL;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.impl.EntryCache;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.bookkeeper.util.StringUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -35,9 +43,11 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.ClientConfiguration;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.PropertyAdmin;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
@@ -51,7 +61,6 @@ import org.testng.annotations.Test;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
 
 /**
  */
@@ -85,6 +94,7 @@ public class BrokerBkEnsemblesTests {
             config.setAuthenticationEnabled(false);
             config.setManagedLedgerMaxEntriesPerLedger(5);
             config.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
+            config.setAdvertisedAddress("127.0.0.1");
 
             pulsar = new PulsarService(config);
             pulsar.start();
@@ -206,5 +216,118 @@ public class BrokerBkEnsemblesTests {
 
     }
 
+    /**
+     * It verifies broker-configuration using which broker can skip non-recoverable data-ledgers.
+     * 
+     * <pre>
+     * 1. publish messages in 5 data-ledgers each with 20 entries under managed-ledger
+     * 2. delete first 4 data-ledgers
+     * 3. consumer will fail to consume any message as first data-ledger is non-recoverable
+     * 4. enable dynamic config to skip non-recoverable data-ledgers
+     * 5. consumer will be able to consume 20 messages from last non-deleted ledger
+     * 
+     * </pre>
+     * 
+     * @throws Exception
+     */
+    @Test(timeOut = 6000)
+    public void testSkipCorruptDataLedger() throws Exception {
+        ClientConfiguration clientConf = new ClientConfiguration();
+        clientConf.setStatsInterval(0, TimeUnit.SECONDS);
+        PulsarClient client = PulsarClient.create(adminUrl.toString(), clientConf);
+
+        final String ns1 = "prop/usc/crash-broker";
+        final int totalMessages = 100;
+        final int totalDataLedgers = 5;
+        final int entriesPerLedger = totalMessages / totalDataLedgers;
+
+        admin.namespaces().createNamespace(ns1);
+
+        final String dn1 = "persistent://" + ns1 + "/my-topic";
+
+        // Create subscription
+        ConsumerConfiguration consumerConfig = new ConsumerConfiguration();
+        consumerConfig.setReceiverQueueSize(5);
+        Consumer consumer = client.subscribe(dn1, "my-subscriber-name", consumerConfig);
+
+        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(dn1).get();
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) topic.getManagedLedger();
+        ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().iterator().next();
+        Field configField = ManagedCursorImpl.class.getDeclaredField("config");
+        configField.setAccessible(true);
+        // Create multiple data-ledger
+        ManagedLedgerConfig config = (ManagedLedgerConfig) configField.get(cursor);
+        config.setMaxEntriesPerLedger(entriesPerLedger);
+        config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
+        // bookkeeper client
+        Field bookKeeperField = ManagedLedgerImpl.class.getDeclaredField("bookKeeper");
+        bookKeeperField.setAccessible(true);
+        // Create multiple data-ledger
+        BookKeeper bookKeeper = (BookKeeper) bookKeeperField.get(ml);
+
+        // (1) publish messages in 5 data-ledgers each with 20 entries under managed-ledger
+        Producer producer = client.createProducer(dn1);
+        for (int i = 0; i < totalMessages; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        // validate: consumer is able to consume msg and close consumer after reading 1 entry
+        Assert.assertNotNull(consumer.receive(1, TimeUnit.SECONDS));
+        consumer.close();
+
+        NavigableMap<Long, LedgerInfo> ledgerInfo = ml.getLedgersInfo();
+        Assert.assertEquals(ledgerInfo.size(), totalDataLedgers);
+        Entry<Long, LedgerInfo> lastLedger = ledgerInfo.lastEntry();
+
+        // (2) delete first 4 data-ledgers
+        ledgerInfo.entrySet().forEach(entry -> {
+            if (!entry.equals(lastLedger)) {
+                try {
+                    bookKeeper.deleteLedger(entry.getKey());
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        // clean managed-ledger and recreate topic to clean any data from the cache
+        producer.close();
+        pulsar.getBrokerService().removeTopicFromCache(dn1);
+        ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory();
+        Field field = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
+        field.setAccessible(true);
+        @SuppressWarnings("unchecked")
+        ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>> ledgers = (ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>>) field
+                .get(factory);
+        ledgers.clear();
+
+        // (3) consumer will fail to consume any message as first data-ledger is non-recoverable
+        Message msg = null;
+        // start consuming message
+        consumer = client.subscribe(dn1, "my-subscriber-name");
+        msg = consumer.receive(1, TimeUnit.SECONDS);
+        Assert.assertNull(msg);
+        consumer.close();
+
+        // (4) enable dynamic config to skip non-recoverable data-ledgers
+        admin.brokers().updateDynamicConfiguration("autoSkipNonRecoverableData", "true");
+
+        retryStrategically((test) -> config.isAutoSkipNonRecoverableData(), 5, 100);
+
+        // (5) consumer will be able to consume 20 messages from last non-deleted ledger
+        consumer = client.subscribe(dn1, "my-subscriber-name");
+        for (int i = 0; i < entriesPerLedger; i++) {
+            msg = consumer.receive(5, TimeUnit.SECONDS);
+            System.out.println(i);
+            consumer.acknowledge(msg);
+        }
+
+        producer.close();
+        consumer.close();
+        client.close();
+
+    }
+    
     private static final Logger LOG = LoggerFactory.getLogger(BrokerBkEnsemblesTests.class);
 }
diff --git a/site/_data/config/broker.yaml b/site/_data/config/broker.yaml
index d7a74ff..77df630 100644
--- a/site/_data/config/broker.yaml
+++ b/site/_data/config/broker.yaml
@@ -206,6 +206,9 @@ configs:
   default: '1000'
   description: |
     Max number of "acknowledgment holes" that are going to be persistently stored. When acknowledging out of order, a consumer will leave holes that are supposed to be quickly filled by acking all the messages. The information of which messages are acknowledged is persisted by compressing in "ranges" of messages that were acknowledged. After the max number of ranges is reached, the information will only be tracked in memory and messages will be redelivered in case of crashes.
+- name: autoSkipNonRecoverableData
+  default: 'false'
+  description: Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list.It helps when data-ledgers gets corrupted at bookkeeper and managed-cursor is stuck at that ledger.
 - name: loadBalancerEnabled
   default: 'true'
   description: Enable load balancer
diff --git a/site/_data/config/standalone.yaml b/site/_data/config/standalone.yaml
index c63811b..22bcceb 100644
--- a/site/_data/config/standalone.yaml
+++ b/site/_data/config/standalone.yaml
@@ -155,6 +155,8 @@ configs:
   default: '50000'
 - name: managedLedgerCursorRolloverTimeInSeconds
   default: '14400'
+- name: autoSkipNonRecoverableData
+  default: 'false'
 - name: loadBalancerEnabled
   default: 'false'
 - name: loadBalancerPlacementStrategy

-- 
To stop receiving notification emails like this one, please contact
rdhabalia@apache.org.