You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/08/21 18:43:42 UTC

[bookkeeper] branch branch-4.7 updated: ISSUE #1578: Fixed deadlock in auditor blocking ZK thread

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/branch-4.7 by this push:
     new 51040cf  ISSUE #1578: Fixed deadlock in auditor blocking ZK thread
51040cf is described below

commit 51040cf0942615b915720287b8bb3d9378583ec0
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Aug 21 11:42:14 2018 -0700

    ISSUE #1578: Fixed deadlock in auditor blocking ZK thread
    
    ### Motivation
    
    Fixes #1578
    
    After getting ZK callback from ZK event thread, we need to jump to a background thread before doing synchronous call to `admin.openLedgerNoRecovery(ledgerId);` which will try to make a ZK request a wait for a response (which would be coming through same ZK event thread currently blocked..)
    
    Author: Matteo Merli <mm...@apache.org>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>
    
    This closes #1608 from merlimat/fix-auditor-deadlock, closes #1578
    
    (cherry picked from commit f782a9d818a12479d08c580a68b2566715da4c89)
    Signed-off-by: Sijie Guo <si...@apache.org>
---
 .../org/apache/bookkeeper/replication/Auditor.java | 83 ++++++++++++----------
 1 file changed, 44 insertions(+), 39 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
index 15868d9..8578a5b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
@@ -640,47 +641,51 @@ public class Auditor {
                         return;
                     }
 
-                    LedgerHandle lh = null;
-                    try {
-                        lh = admin.openLedgerNoRecovery(ledgerId);
-                        checker.checkLedger(lh,
-                            new ProcessLostFragmentsCb(lh, callback),
-                            conf.getAuditorLedgerVerificationPercentage());
-                        // we collect the following stats to get a measure of the
-                        // distribution of a single ledger within the bk cluster
-                        // the higher the number of fragments/bookies, the more distributed it is
-                        numFragmentsPerLedger.registerSuccessfulValue(lh.getNumFragments());
-                        numBookiesPerLedger.registerSuccessfulValue(lh.getNumBookies());
-                        numLedgersChecked.inc();
-                    } catch (BKException.BKNoSuchLedgerExistsException bknsle) {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("Ledger was deleted before we could check it", bknsle);
-                        }
-                        callback.processResult(BKException.Code.OK,
-                                               null, null);
-                        return;
-                    } catch (BKException bke) {
-                        LOG.error("Couldn't open ledger " + ledgerId, bke);
-                        callback.processResult(BKException.Code.BookieHandleNotAvailableException,
-                                         null, null);
-                        return;
-                    } catch (InterruptedException ie) {
-                        LOG.error("Interrupted opening ledger", ie);
-                        Thread.currentThread().interrupt();
-                        callback.processResult(BKException.Code.InterruptedException, null, null);
-                        return;
-                    } finally {
-                        if (lh != null) {
-                            try {
-                                lh.close();
-                            } catch (BKException bke) {
-                                LOG.warn("Couldn't close ledger " + ledgerId, bke);
-                            } catch (InterruptedException ie) {
-                                LOG.warn("Interrupted closing ledger " + ledgerId, ie);
-                                Thread.currentThread().interrupt();
+                    // Do not perform blocking calls that involve making ZK calls from within the ZK
+                    // event thread. Jump to background thread instead to avoid deadlock.
+                    ForkJoinPool.commonPool().execute(() -> {
+                        LedgerHandle lh = null;
+                        try {
+                            lh = admin.openLedgerNoRecovery(ledgerId);
+                            checker.checkLedger(lh,
+                                    new ProcessLostFragmentsCb(lh, callback),
+                                    conf.getAuditorLedgerVerificationPercentage());
+                            // we collect the following stats to get a measure of the
+                            // distribution of a single ledger within the bk cluster
+                            // the higher the number of fragments/bookies, the more distributed it is
+                            numFragmentsPerLedger.registerSuccessfulValue(lh.getNumFragments());
+                            numBookiesPerLedger.registerSuccessfulValue(lh.getNumBookies());
+                            numLedgersChecked.inc();
+                        } catch (BKException.BKNoSuchLedgerExistsException bknsle) {
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("Ledger was deleted before we could check it", bknsle);
+                            }
+                            callback.processResult(BKException.Code.OK,
+                                    null, null);
+                            return;
+                        } catch (BKException bke) {
+                            LOG.error("Couldn't open ledger " + ledgerId, bke);
+                            callback.processResult(BKException.Code.BookieHandleNotAvailableException,
+                                    null, null);
+                            return;
+                        } catch (InterruptedException ie) {
+                            LOG.error("Interrupted opening ledger", ie);
+                            Thread.currentThread().interrupt();
+                            callback.processResult(BKException.Code.InterruptedException, null, null);
+                            return;
+                        } finally {
+                            if (lh != null) {
+                                try {
+                                    lh.close();
+                                } catch (BKException bke) {
+                                    LOG.warn("Couldn't close ledger " + ledgerId, bke);
+                                } catch (InterruptedException ie) {
+                                    LOG.warn("Interrupted closing ledger " + ledgerId, ie);
+                                    Thread.currentThread().interrupt();
+                                }
                             }
                         }
-                    }
+                    });
                 }
             };