You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/29 09:53:15 UTC
[pulsar] 09/10: [Broker] Timeout opening managed ledger operation … (#7506)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 825fdd4222dd65ef3099f1a975a1555226297379
Author: Sijie Guo <si...@apache.org>
AuthorDate: Thu Jul 16 06:20:16 2020 -0700
[Broker] Timeout opening managed ledger operation … (#7506)
*Motivation*
Currently, broker has a timeout mechanism on loading topics. However, the underlying managed ledger library
doesn't provide a timeout mechanism. This will get into a situation that a TopicLoad operation times out
after 30 seconds. But the CompletableFuture of opening a managed ledger is still kept in the cache of managed ledger
factory. The completable future will never return. So any sub-sequent topic lookups will fail because any
attempts to load a topic will never attempt to re-open a managed ledger.
*Modification*
Introduce a timeout mechanism in the managed ledger factory. If a managed ledger is not open within a given timeout
period, the CompletableFuture will be removed. This allows any subsequent attempts to load topics that can try to
open the managed ledger again.
*Tests*
This problem can be constantly reproduced in a chaos test in Kubernetes by killing k8s worker nodes. It can cause
producer stuck forever until the owner broker pod is restarted. The change has been verified in a chaos testing environment.
(cherry picked from commit 14e3b7ae05e84ca13eefa16026288a384a961e45)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 16 +++--
.../mledger/impl/ManagedLedgerFactoryImpl.java | 70 ++++++++++++++++++----
2 files changed, 72 insertions(+), 14 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 3e0d583..26f1bf3 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -64,6 +64,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
+import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
@@ -323,9 +324,9 @@ public class ManagedCursorImpl implements ManagedCursor {
// a new ledger and write the position into it
ledger.mbean.startCursorLedgerOpenOp();
long ledgerId = info.getCursorsLedgerId();
- bookkeeper.asyncOpenLedger(ledgerId, digestType, config.getPassword(), (rc, lh, ctx) -> {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Opened ledger {} for consumer {}. rc={}", ledger.getName(), ledgerId, name, rc);
+ OpenCallback openCallback = (rc, lh, ctx) -> {
+ if (log.isInfoEnabled()) {
+ log.info("[{}] Opened ledger {} for consumer {}. rc={}", ledger.getName(), ledgerId, name, rc);
}
if (isBkErrorNotRecoverable(rc)) {
log.error("[{}] Error opening metadata ledger {} for consumer {}: {}", ledger.getName(), ledgerId, name,
@@ -399,7 +400,14 @@ public class ManagedCursorImpl implements ManagedCursor {
recoveredCursor(position, recoveredProperties, lh);
callback.operationComplete();
}, null);
- }, null);
+ };
+ try {
+ bookkeeper.asyncOpenLedger(ledgerId, digestType, config.getPassword(), openCallback, null);
+ } catch (Throwable t) {
+ log.error("[{}] Encountered error on opening cursor ledger {} for cursor {}",
+ ledger.getName(), ledgerId, name, t);
+ openCallback.openComplete(BKException.Code.UnexpectedConditionException, null, null);
+ }
}
private void recoverIndividualDeletedMessages(List<MLDataFormats.MessageRange> individualDeletedMessagesList) {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 517c62c..8b470c2 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -50,6 +50,7 @@ import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ManagedLedgerInfoCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
@@ -101,6 +102,8 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
protected final ManagedLedgerFactoryMBeanImpl mbean;
protected final ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>> ledgers = new ConcurrentHashMap<>();
+ protected final ConcurrentHashMap<String, PendingInitializeManagedLedger> pendingInitializeLedgers =
+ new ConcurrentHashMap<>();
private final EntryCacheManager entryCacheManager;
private long lastStatTimestamp = System.nanoTime();
@@ -111,6 +114,18 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
private static final int StatsPeriodSeconds = 60;
+ private static class PendingInitializeManagedLedger {
+
+ private final ManagedLedgerImpl ledger;
+ private final long createTimeMs;
+
+ PendingInitializeManagedLedger(ManagedLedgerImpl ledger) {
+ this.ledger = ledger;
+ this.createTimeMs = System.currentTimeMillis();
+ }
+
+ }
+
public ManagedLedgerFactoryImpl(ClientConfiguration bkClientConfiguration, String zkConnection) throws Exception {
this(bkClientConfiguration, zkConnection, new ManagedLedgerFactoryConfig());
}
@@ -320,18 +335,32 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
// If the ledger state is bad, remove it from the map.
CompletableFuture<ManagedLedgerImpl> existingFuture = ledgers.get(name);
- if (existingFuture != null && existingFuture.isDone()) {
- try {
- ManagedLedgerImpl l = existingFuture.get();
- if (l.getState().equals(State.Fenced.toString()) || l.getState().equals(State.Closed.toString())) {
- // Managed ledger is in unusable state. Recreate it.
- log.warn("[{}] Attempted to open ledger in {} state. Removing from the map to recreate it", name,
+ if (existingFuture != null) {
+ if (existingFuture.isDone()) {
+ try {
+ ManagedLedgerImpl l = existingFuture.get();
+ if (l.getState().equals(State.Fenced.toString()) || l.getState().equals(State.Closed.toString())) {
+ // Managed ledger is in unusable state. Recreate it.
+ log.warn("[{}] Attempted to open ledger in {} state. Removing from the map to recreate it", name,
l.getState());
- ledgers.remove(name, existingFuture);
+ ledgers.remove(name, existingFuture);
+ }
+ } catch (Exception e) {
+ // Unable to get the future
+ log.warn("[{}] Got exception while trying to retrieve ledger", name, e);
}
- } catch (Exception e) {
- // Unable to get the future
- log.warn("[{}] Got exception while trying to retrieve ledger", name, e);
+ } else {
+ PendingInitializeManagedLedger pendingLedger = pendingInitializeLedgers.get(name);
+ if (null != pendingLedger) {
+ long pendingMs = System.currentTimeMillis() - pendingLedger.createTimeMs;
+ if (pendingMs > TimeUnit.SECONDS.toMillis(config.getMetadataOperationsTimeoutSeconds())) {
+ log.warn("[{}] Managed ledger has been pending in initialize state more than {} milliseconds,"
+ + " remove it from cache to retry ...", name, pendingMs);
+ ledgers.remove(name, existingFuture);
+ pendingInitializeLedgers.remove(name, pendingLedger);
+ }
+ }
+
}
}
@@ -345,16 +374,37 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
config.getBookKeeperEnsemblePlacementPolicyProperties())),
store, config, scheduledExecutor,
orderedExecutor, name, mlOwnershipChecker);
+ PendingInitializeManagedLedger pendingLedger = new PendingInitializeManagedLedger(newledger);
+ pendingInitializeLedgers.put(name, pendingLedger);
newledger.initialize(new ManagedLedgerInitializeLedgerCallback() {
@Override
public void initializeComplete() {
+ log.info("[{}] Successfully initialize managed ledger", name);
+ pendingInitializeLedgers.remove(name, pendingLedger);
future.complete(newledger);
}
@Override
public void initializeFailed(ManagedLedgerException e) {
+ log.error("[{}] Failed to initialize managed ledger: {}", name, e.getMessage());
+
// Clean the map if initialization fails
ledgers.remove(name, future);
+
+ if (pendingInitializeLedgers.remove(name, pendingLedger)) {
+ pendingLedger.ledger.asyncClose(new CloseCallback() {
+ @Override
+ public void closeComplete(Object ctx) {
+ // no-op
+ }
+
+ @Override
+ public void closeFailed(ManagedLedgerException exception, Object ctx) {
+ log.warn("[{}] Failed to a pending initialization managed ledger", name, exception);
+ }
+ }, null);
+ }
+
future.completeExceptionally(e);
}
}, null);