You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/29 09:39:00 UTC

[pulsar] 04/14: Catch NPE and detect state doesn't move (#7401)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5b3889c114b1e95275f5e1998d21c527b0f24c29
Author: Sijie Guo <si...@apache.org>
AuthorDate: Tue Jun 30 15:32:53 2020 -0700

    Catch NPE and detect state doesn't move (#7401)
    
    
    (cherry picked from commit 86e2610bebb46f479ea221a69782cd8575a04b16)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 49 +++++++++++++++-------
 1 file changed, 33 insertions(+), 16 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index ea0614d..810b3f8 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -404,7 +404,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         };
 
         // Create a new ledger to start writing
-        this.lastLedgerCreationInitiationTimestamp = System.nanoTime();
+        this.lastLedgerCreationInitiationTimestamp = System.currentTimeMillis();
         mbean.startDataLedgerCreateOp();
 
         asyncCreateLedger(bookKeeper, config, digestType, (rc, lh, ctx) -> {
@@ -596,13 +596,19 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Queue addEntry request", name);
             }
+            if (State.CreatingLedger == state) {
+                long elapsedMs = System.currentTimeMillis() - this.lastLedgerCreationInitiationTimestamp;
+                if (elapsedMs > TimeUnit.SECONDS.toMillis(2 * config.getMetadataOperationsTimeoutSeconds())) {
+                    log.info("[{}] Ledger creation was initiated {} ms ago but it never completed" +
+                        " and creation timeout task didn't kick in as well. Force to fail the create ledger operation ...");
+                    this.createComplete(Code.TimeoutException, null, null);
+                }
+            }
         } else if (state == State.ClosedLedger) {
             // No ledger and no pending operations. Create a new ledger
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Creating a new ledger", name);
-            }
+            log.info("[{}] Creating a new ledger", name);
             if (STATE_UPDATER.compareAndSet(this, State.ClosedLedger, State.CreatingLedger)) {
-                this.lastLedgerCreationInitiationTimestamp = System.nanoTime();
+                this.lastLedgerCreationInitiationTimestamp = System.currentTimeMillis();
                 mbean.startDataLedgerCreateOp();
                 asyncCreateLedger(bookKeeper, config, digestType, this, Collections.emptyMap());
             }
@@ -1229,8 +1235,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                     metadataMutex.unlock();
                     updateLedgersIdsComplete(stat);
                     synchronized (ManagedLedgerImpl.this) {
-                        mbean.addLedgerSwitchLatencySample(System.nanoTime() - lastLedgerCreationInitiationTimestamp,
-                                TimeUnit.NANOSECONDS);
+                        mbean.addLedgerSwitchLatencySample(System.currentTimeMillis() - lastLedgerCreationInitiationTimestamp,
+                                TimeUnit.MILLISECONDS);
                     }
                 }
 
@@ -1380,11 +1386,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
 
         if (!pendingAddEntries.isEmpty()) {
             // Need to create a new ledger to write pending entries
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Creating a new ledger", name);
-            }
+            log.info("[{}] Creating a new ledger", name);
             STATE_UPDATER.set(this, State.CreatingLedger);
-            this.lastLedgerCreationInitiationTimestamp = System.nanoTime();
+            this.lastLedgerCreationInitiationTimestamp = System.currentTimeMillis();
             mbean.startDataLedgerCreateOp();
             asyncCreateLedger(bookKeeper, config, digestType, this, Collections.emptyMap());
         }
@@ -3172,15 +3176,28 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         Map<String, byte[]> finalMetadata = new HashMap<>();
         finalMetadata.putAll(ledgerMetadata);
         finalMetadata.putAll(metadata);
-        if (log.isDebugEnabled()) {
-            log.debug("creating ledger, metadata: "+finalMetadata);
-        }
-        bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), config.getAckQuorumSize(),
+        log.info("[{}] Creating ledger, metadata: {} - metadata ops timeout : {} seconds",
+            name, finalMetadata, config.getMetadataOperationsTimeoutSeconds());
+        try {
+            bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), config.getAckQuorumSize(),
                 digestType, config.getPassword(), cb, ledgerCreated, finalMetadata);
+        } catch (Throwable cause) {
+            log.error("[{}] Encountered unexpected error when creating ledger",
+                name, cause);
+            cb.createComplete(Code.UnexpectedConditionException, null, ledgerCreated);
+            return;
+        }
         scheduledExecutor.schedule(() -> {
             if (!ledgerCreated.get()) {
-                cb.createComplete(BKException.Code.TimeoutException, null, ledgerCreated);
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Timeout creating ledger", name);
+                }
+            } else {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Ledger already created when timeout task is triggered", name);
+                }
             }
+            cb.createComplete(BKException.Code.TimeoutException, null, ledgerCreated);
         }, config.getMetadataOperationsTimeoutSeconds(), TimeUnit.SECONDS);
     }