You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2017/04/11 18:14:23 UTC
bookkeeper git commit: BOOKKEEPER-1031: close the ledger handle in
ReplicationWorker.rereplicate
Repository: bookkeeper
Updated Branches:
refs/heads/master ab707d2c6 -> 48aa69dd0
BOOKKEEPER-1031: close the ledger handle in ReplicationWorker.rereplicate
\u2026cate
Otherwise, we build up an unbounded set of Listeners in the
AbstractZkLedgerManager listenerSet structure which never go
away.
Signed-off-by: Samuel Just <sjustsalesforce.com>
Author: Samuel Just <sj...@salesforce.com>
Reviewers: Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>
Closes #130 from athanatos/forupstream/BOOKKEEPER-1031
Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/48aa69dd
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/48aa69dd
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/48aa69dd
Branch: refs/heads/master
Commit: 48aa69dd0ba41f5ba7bb2b04f31172c919be4391
Parents: ab707d2
Author: Samuel Just <sj...@salesforce.com>
Authored: Tue Apr 11 11:14:19 2017 -0700
Committer: Sijie Guo <si...@apache.org>
Committed: Tue Apr 11 11:14:19 2017 -0700
----------------------------------------------------------------------
.../replication/ReplicationWorker.java | 110 +++++++++----------
1 file changed, 54 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/48aa69dd/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
index 3f2261f..e6e986f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
@@ -225,9 +225,60 @@ public class ReplicationWorker implements Runnable {
private boolean rereplicate(long ledgerIdToReplicate) throws InterruptedException, BKException,
UnavailableException {
LOG.debug("Going to replicate the fragments of the ledger: {}", ledgerIdToReplicate);
- LedgerHandle lh;
- try {
- lh = admin.openLedgerNoRecovery(ledgerIdToReplicate);
+ try (LedgerHandle lh = admin.openLedgerNoRecovery(ledgerIdToReplicate)) {
+ Set<LedgerFragment> fragments = getUnderreplicatedFragments(lh);
+ LOG.debug("Founds fragments {} for replication from ledger: {}", fragments, ledgerIdToReplicate);
+
+ boolean foundOpenFragments = false;
+ long numFragsReplicated = 0;
+ for (LedgerFragment ledgerFragment : fragments) {
+ if (!ledgerFragment.isClosed()) {
+ foundOpenFragments = true;
+ continue;
+ } else if (isTargetBookieExistsInFragmentEnsemble(lh,
+ ledgerFragment)) {
+ LOG.debug("Target Bookie[{}] found in the fragment ensemble: {}", targetBookie,
+ ledgerFragment.getEnsemble());
+ continue;
+ }
+ try {
+ admin.replicateLedgerFragment(lh, ledgerFragment, targetBookie);
+ numFragsReplicated++;
+ } catch (BKException.BKBookieHandleNotAvailableException e) {
+ LOG.warn("BKBookieHandleNotAvailableException "
+ + "while replicating the fragment", e);
+ } catch (BKException.BKLedgerRecoveryException e) {
+ LOG.warn("BKLedgerRecoveryException "
+ + "while replicating the fragment", e);
+ if (admin.getReadOnlyBookies().contains(targetBookie)) {
+ underreplicationManager.releaseUnderreplicatedLedger(ledgerIdToReplicate);
+ throw new BKException.BKWriteOnReadOnlyBookieException();
+ }
+ }
+ }
+
+ if (numFragsReplicated > 0) {
+ numLedgersReplicated.inc();
+ }
+
+ if (foundOpenFragments || isLastSegmentOpenAndMissingBookies(lh)) {
+ deferLedgerLockRelease(ledgerIdToReplicate);
+ return false;
+ }
+
+ fragments = getUnderreplicatedFragments(lh);
+ if (fragments.size() == 0) {
+ LOG.info("Ledger replicated successfully. ledger id is: "
+ + ledgerIdToReplicate);
+ underreplicationManager.markLedgerReplicated(ledgerIdToReplicate);
+ return true;
+ } else {
+ // Releasing the underReplication ledger lock and compete
+ // for the replication again for the pending fragments
+ underreplicationManager
+ .releaseUnderreplicatedLedger(ledgerIdToReplicate);
+ return false;
+ }
} catch (BKNoSuchLedgerExistsException e) {
// Ledger might have been deleted by user
LOG.info("BKNoSuchLedgerExistsException while opening "
@@ -253,59 +304,6 @@ public class ReplicationWorker implements Runnable {
.releaseUnderreplicatedLedger(ledgerIdToReplicate);
return false;
}
- Set<LedgerFragment> fragments = getUnderreplicatedFragments(lh);
- LOG.debug("Founds fragments {} for replication from ledger: {}", fragments, ledgerIdToReplicate);
-
- boolean foundOpenFragments = false;
- long numFragsReplicated = 0;
- for (LedgerFragment ledgerFragment : fragments) {
- if (!ledgerFragment.isClosed()) {
- foundOpenFragments = true;
- continue;
- } else if (isTargetBookieExistsInFragmentEnsemble(lh,
- ledgerFragment)) {
- LOG.debug("Target Bookie[{}] found in the fragment ensemble: {}", targetBookie,
- ledgerFragment.getEnsemble());
- continue;
- }
- try {
- admin.replicateLedgerFragment(lh, ledgerFragment, targetBookie);
- numFragsReplicated++;
- } catch (BKException.BKBookieHandleNotAvailableException e) {
- LOG.warn("BKBookieHandleNotAvailableException "
- + "while replicating the fragment", e);
- } catch (BKException.BKLedgerRecoveryException e) {
- LOG.warn("BKLedgerRecoveryException "
- + "while replicating the fragment", e);
- if (admin.getReadOnlyBookies().contains(targetBookie)) {
- underreplicationManager.releaseUnderreplicatedLedger(ledgerIdToReplicate);
- throw new BKException.BKWriteOnReadOnlyBookieException();
- }
- }
- }
-
- if (numFragsReplicated > 0) {
- numLedgersReplicated.inc();
- }
-
- if (foundOpenFragments || isLastSegmentOpenAndMissingBookies(lh)) {
- deferLedgerLockRelease(ledgerIdToReplicate);
- return false;
- }
-
- fragments = getUnderreplicatedFragments(lh);
- if (fragments.size() == 0) {
- LOG.info("Ledger replicated successfully. ledger id is: "
- + ledgerIdToReplicate);
- underreplicationManager.markLedgerReplicated(ledgerIdToReplicate);
- return true;
- } else {
- // Releasing the underReplication ledger lock and compete
- // for the replication again for the pending fragments
- underreplicationManager
- .releaseUnderreplicatedLedger(ledgerIdToReplicate);
- return false;
- }
}
/**