You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/29 09:39:00 UTC
[pulsar] 04/14: Catch NPE and detect state doesn't move (#7401)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5b3889c114b1e95275f5e1998d21c527b0f24c29
Author: Sijie Guo <si...@apache.org>
AuthorDate: Tue Jun 30 15:32:53 2020 -0700
Catch NPE and detect state doesn't move (#7401)
(cherry picked from commit 86e2610bebb46f479ea221a69782cd8575a04b16)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 49 +++++++++++++++-------
1 file changed, 33 insertions(+), 16 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index ea0614d..810b3f8 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -404,7 +404,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
};
// Create a new ledger to start writing
- this.lastLedgerCreationInitiationTimestamp = System.nanoTime();
+ this.lastLedgerCreationInitiationTimestamp = System.currentTimeMillis();
mbean.startDataLedgerCreateOp();
asyncCreateLedger(bookKeeper, config, digestType, (rc, lh, ctx) -> {
@@ -596,13 +596,19 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
if (log.isDebugEnabled()) {
log.debug("[{}] Queue addEntry request", name);
}
+ if (State.CreatingLedger == state) {
+ long elapsedMs = System.currentTimeMillis() - this.lastLedgerCreationInitiationTimestamp;
+ if (elapsedMs > TimeUnit.SECONDS.toMillis(2 * config.getMetadataOperationsTimeoutSeconds())) {
+ log.info("[{}] Ledger creation was initiated {} ms ago but it never completed" +
+ " and creation timeout task didn't kick in as well. Force to fail the create ledger operation ...");
+ this.createComplete(Code.TimeoutException, null, null);
+ }
+ }
} else if (state == State.ClosedLedger) {
// No ledger and no pending operations. Create a new ledger
- if (log.isDebugEnabled()) {
- log.debug("[{}] Creating a new ledger", name);
- }
+ log.info("[{}] Creating a new ledger", name);
if (STATE_UPDATER.compareAndSet(this, State.ClosedLedger, State.CreatingLedger)) {
- this.lastLedgerCreationInitiationTimestamp = System.nanoTime();
+ this.lastLedgerCreationInitiationTimestamp = System.currentTimeMillis();
mbean.startDataLedgerCreateOp();
asyncCreateLedger(bookKeeper, config, digestType, this, Collections.emptyMap());
}
@@ -1229,8 +1235,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
metadataMutex.unlock();
updateLedgersIdsComplete(stat);
synchronized (ManagedLedgerImpl.this) {
- mbean.addLedgerSwitchLatencySample(System.nanoTime() - lastLedgerCreationInitiationTimestamp,
- TimeUnit.NANOSECONDS);
+ mbean.addLedgerSwitchLatencySample(System.currentTimeMillis() - lastLedgerCreationInitiationTimestamp,
+ TimeUnit.MILLISECONDS);
}
}
@@ -1380,11 +1386,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
if (!pendingAddEntries.isEmpty()) {
// Need to create a new ledger to write pending entries
- if (log.isDebugEnabled()) {
- log.debug("[{}] Creating a new ledger", name);
- }
+ log.info("[{}] Creating a new ledger", name);
STATE_UPDATER.set(this, State.CreatingLedger);
- this.lastLedgerCreationInitiationTimestamp = System.nanoTime();
+ this.lastLedgerCreationInitiationTimestamp = System.currentTimeMillis();
mbean.startDataLedgerCreateOp();
asyncCreateLedger(bookKeeper, config, digestType, this, Collections.emptyMap());
}
@@ -3172,15 +3176,28 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
Map<String, byte[]> finalMetadata = new HashMap<>();
finalMetadata.putAll(ledgerMetadata);
finalMetadata.putAll(metadata);
- if (log.isDebugEnabled()) {
- log.debug("creating ledger, metadata: "+finalMetadata);
- }
- bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), config.getAckQuorumSize(),
+ log.info("[{}] Creating ledger, metadata: {} - metadata ops timeout : {} seconds",
+ name, finalMetadata, config.getMetadataOperationsTimeoutSeconds());
+ try {
+ bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), config.getAckQuorumSize(),
digestType, config.getPassword(), cb, ledgerCreated, finalMetadata);
+ } catch (Throwable cause) {
+ log.error("[{}] Encountered unexpected error when creating ledger",
+ name, cause);
+ cb.createComplete(Code.UnexpectedConditionException, null, ledgerCreated);
+ return;
+ }
scheduledExecutor.schedule(() -> {
if (!ledgerCreated.get()) {
- cb.createComplete(BKException.Code.TimeoutException, null, ledgerCreated);
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Timeout creating ledger", name);
+ }
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Ledger already created when timeout task is triggered", name);
+ }
}
+ cb.createComplete(BKException.Code.TimeoutException, null, ledgerCreated);
}, config.getMetadataOperationsTimeoutSeconds(), TimeUnit.SECONDS);
}