You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2017/04/11 18:14:23 UTC

bookkeeper git commit: BOOKKEEPER-1031: close the ledger handle in ReplicationWorker.rereplicate

Repository: bookkeeper
Updated Branches:
  refs/heads/master ab707d2c6 -> 48aa69dd0


BOOKKEEPER-1031: close the ledger handle in ReplicationWorker.rereplicate

\u2026cate

Otherwise, we build up an unbounded set of Listeners in the
AbstractZkLedgerManager listenerSet structure which never go
away.

Signed-off-by: Samuel Just <sjustsalesforce.com>

Author: Samuel Just <sj...@salesforce.com>

Reviewers: Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>

Closes #130 from athanatos/forupstream/BOOKKEEPER-1031


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/48aa69dd
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/48aa69dd
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/48aa69dd

Branch: refs/heads/master
Commit: 48aa69dd0ba41f5ba7bb2b04f31172c919be4391
Parents: ab707d2
Author: Samuel Just <sj...@salesforce.com>
Authored: Tue Apr 11 11:14:19 2017 -0700
Committer: Sijie Guo <si...@apache.org>
Committed: Tue Apr 11 11:14:19 2017 -0700

----------------------------------------------------------------------
 .../replication/ReplicationWorker.java          | 110 +++++++++----------
 1 file changed, 54 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/48aa69dd/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
index 3f2261f..e6e986f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
@@ -225,9 +225,60 @@ public class ReplicationWorker implements Runnable {
     private boolean rereplicate(long ledgerIdToReplicate) throws InterruptedException, BKException,
             UnavailableException {
         LOG.debug("Going to replicate the fragments of the ledger: {}", ledgerIdToReplicate);
-        LedgerHandle lh;
-        try {
-            lh = admin.openLedgerNoRecovery(ledgerIdToReplicate);
+        try (LedgerHandle lh = admin.openLedgerNoRecovery(ledgerIdToReplicate)) {
+            Set<LedgerFragment> fragments = getUnderreplicatedFragments(lh);
+            LOG.debug("Founds fragments {} for replication from ledger: {}", fragments, ledgerIdToReplicate);
+
+            boolean foundOpenFragments = false;
+            long numFragsReplicated = 0;
+            for (LedgerFragment ledgerFragment : fragments) {
+                if (!ledgerFragment.isClosed()) {
+                    foundOpenFragments = true;
+                    continue;
+                } else if (isTargetBookieExistsInFragmentEnsemble(lh,
+                        ledgerFragment)) {
+                    LOG.debug("Target Bookie[{}] found in the fragment ensemble: {}", targetBookie,
+                            ledgerFragment.getEnsemble());
+                    continue;
+                }
+                try {
+                    admin.replicateLedgerFragment(lh, ledgerFragment, targetBookie);
+                    numFragsReplicated++;
+                } catch (BKException.BKBookieHandleNotAvailableException e) {
+                    LOG.warn("BKBookieHandleNotAvailableException "
+                            + "while replicating the fragment", e);
+                } catch (BKException.BKLedgerRecoveryException e) {
+                    LOG.warn("BKLedgerRecoveryException "
+                            + "while replicating the fragment", e);
+                    if (admin.getReadOnlyBookies().contains(targetBookie)) {
+                        underreplicationManager.releaseUnderreplicatedLedger(ledgerIdToReplicate);
+                        throw new BKException.BKWriteOnReadOnlyBookieException();
+                    }
+                }
+            }
+
+            if (numFragsReplicated > 0) {
+                numLedgersReplicated.inc();
+            }
+
+            if (foundOpenFragments || isLastSegmentOpenAndMissingBookies(lh)) {
+                deferLedgerLockRelease(ledgerIdToReplicate);
+                return false;
+            }
+
+            fragments = getUnderreplicatedFragments(lh);
+            if (fragments.size() == 0) {
+                LOG.info("Ledger replicated successfully. ledger id is: "
+                        + ledgerIdToReplicate);
+                underreplicationManager.markLedgerReplicated(ledgerIdToReplicate);
+                return true;
+            } else {
+                // Releasing the underReplication ledger lock and compete
+                // for the replication again for the pending fragments
+                underreplicationManager
+                        .releaseUnderreplicatedLedger(ledgerIdToReplicate);
+                return false;
+            }
         } catch (BKNoSuchLedgerExistsException e) {
             // Ledger might have been deleted by user
             LOG.info("BKNoSuchLedgerExistsException while opening "
@@ -253,59 +304,6 @@ public class ReplicationWorker implements Runnable {
                     .releaseUnderreplicatedLedger(ledgerIdToReplicate);
             return false;
         }
-        Set<LedgerFragment> fragments = getUnderreplicatedFragments(lh);
-        LOG.debug("Founds fragments {} for replication from ledger: {}", fragments, ledgerIdToReplicate);
-
-        boolean foundOpenFragments = false;
-        long numFragsReplicated = 0;
-        for (LedgerFragment ledgerFragment : fragments) {
-            if (!ledgerFragment.isClosed()) {
-                foundOpenFragments = true;
-                continue;
-            } else if (isTargetBookieExistsInFragmentEnsemble(lh,
-                    ledgerFragment)) {
-                LOG.debug("Target Bookie[{}] found in the fragment ensemble: {}", targetBookie,
-                        ledgerFragment.getEnsemble());
-                continue;
-            }
-            try {
-                admin.replicateLedgerFragment(lh, ledgerFragment, targetBookie);
-                numFragsReplicated++;
-            } catch (BKException.BKBookieHandleNotAvailableException e) {
-                LOG.warn("BKBookieHandleNotAvailableException "
-                        + "while replicating the fragment", e);
-            } catch (BKException.BKLedgerRecoveryException e) {
-                LOG.warn("BKLedgerRecoveryException "
-                        + "while replicating the fragment", e);
-                if (admin.getReadOnlyBookies().contains(targetBookie)) {
-                    underreplicationManager.releaseUnderreplicatedLedger(ledgerIdToReplicate);
-                    throw new BKException.BKWriteOnReadOnlyBookieException();
-                }
-            }
-        }
-
-        if (numFragsReplicated > 0) {
-            numLedgersReplicated.inc();
-        }
-
-        if (foundOpenFragments || isLastSegmentOpenAndMissingBookies(lh)) {
-            deferLedgerLockRelease(ledgerIdToReplicate);
-            return false;
-        }
-
-        fragments = getUnderreplicatedFragments(lh);
-        if (fragments.size() == 0) {
-            LOG.info("Ledger replicated successfully. ledger id is: "
-                    + ledgerIdToReplicate);
-            underreplicationManager.markLedgerReplicated(ledgerIdToReplicate);
-            return true;
-        } else {
-            // Releasing the underReplication ledger lock and compete
-            // for the replication again for the pending fragments
-            underreplicationManager
-                    .releaseUnderreplicatedLedger(ledgerIdToReplicate);
-            return false;
-        }
     }
 
     /**