You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by iv...@apache.org on 2018/08/22 12:14:47 UTC
[bookkeeper] branch master updated: Get currentEnsemble from ledger
rather than directly from metadata
This is an automated email from the ASF dual-hosted git repository.
ivank pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 77b7fb8 Get currentEnsemble from ledger rather than directly from metadata
77b7fb8 is described below
commit 77b7fb870ef0fc2743dbe444c768c6c63bbe6875
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Wed Aug 22 14:14:29 2018 +0200
Get currentEnsemble from ledger rather than directly from metadata
This will allow us to override the currentEnsemble being used during
recovery without modifying the metadata itself.
Master issue: #281
Author: Ivan Kelly <iv...@apache.org>
Reviewers: Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>
This closes #1586 from ivankelly/current-ensemble
---
.../bookkeeper/client/ExplicitLacFlushPolicy.java | 2 +-
.../apache/bookkeeper/client/ForceLedgerOp.java | 5 +-
.../org/apache/bookkeeper/client/LedgerHandle.java | 56 +++++++++++++++-------
.../apache/bookkeeper/client/LedgerHandleAdv.java | 2 +-
.../apache/bookkeeper/client/LedgerMetadata.java | 6 ++-
.../apache/bookkeeper/client/LedgerRecoveryOp.java | 2 +-
.../org/apache/bookkeeper/client/PendingAddOp.java | 21 +++++---
.../apache/bookkeeper/client/PendingReadLacOp.java | 16 ++++---
.../bookkeeper/client/PendingWriteLacOp.java | 8 +++-
.../client/ReadLastConfirmedAndEntryOp.java | 8 +++-
.../bookkeeper/client/ReadLastConfirmedOp.java | 18 +++----
.../bookkeeper/client/ReadOnlyLedgerHandle.java | 2 +-
.../bookkeeper/client/TryReadLastConfirmedOp.java | 13 +++--
.../bookkeeper/client/BookieWriteLedgerTest.java | 4 +-
.../apache/bookkeeper/client/LedgerCloseTest.java | 4 +-
.../bookkeeper/client/LedgerRecoveryTest.java | 16 +++----
.../client/ParallelLedgerRecoveryTest.java | 45 +++++++++--------
.../apache/bookkeeper/client/PendingAddOpTest.java | 2 +-
.../client/ReadLastConfirmedAndEntryOpTest.java | 3 +-
.../apache/bookkeeper/client/SlowBookieTest.java | 2 +-
.../client/TestAddEntryQuorumTimeout.java | 6 +--
.../bookkeeper/client/TestDelayEnsembleChange.java | 30 ++++++------
.../client/TestDisableEnsembleChange.java | 2 +-
.../bookkeeper/client/TestPendingReadLacOp.java | 15 +++---
.../client/TestReadLastConfirmedLongPoll.java | 2 +-
.../client/TestTryReadLastConfirmed.java | 4 +-
.../bookkeeper/client/TestWatchEnsembleChange.java | 2 +-
.../org/apache/bookkeeper/client/LedgerReader.java | 2 +-
28 files changed, 178 insertions(+), 120 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java
index e2b3801..c9e6def 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java
@@ -134,7 +134,7 @@ interface ExplicitLacFlushPolicy {
*/
void asyncExplicitLacFlush(final long explicitLac) {
final LastAddConfirmedCallback cb = LastAddConfirmedCallback.INSTANCE;
- final PendingWriteLacOp op = new PendingWriteLacOp(lh, clientCtx, cb, null);
+ final PendingWriteLacOp op = new PendingWriteLacOp(lh, clientCtx, lh.getCurrentEnsemble(), cb, null);
op.setLac(explicitLac);
try {
if (LOG.isDebugEnabled()) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ForceLedgerOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ForceLedgerOp.java
index f87bfe3..2d785ab 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ForceLedgerOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ForceLedgerOp.java
@@ -40,7 +40,7 @@ class ForceLedgerOp extends SafeRunnable implements ForceLedgerCallback {
boolean completed = false;
boolean errored = false;
int lastSeenError = BKException.Code.WriteException;
- List<BookieSocketAddress> currentEnsemble;
+ final List<BookieSocketAddress> currentEnsemble;
long currentNonDurableLastAddConfirmed = LedgerHandle.INVALID_ENTRY_ID;
@@ -48,10 +48,12 @@ class ForceLedgerOp extends SafeRunnable implements ForceLedgerCallback {
final BookieClient bookieClient;
ForceLedgerOp(LedgerHandle lh, BookieClient bookieClient,
+ List<BookieSocketAddress> ensemble,
CompletableFuture<Void> cb) {
this.lh = lh;
this.bookieClient = bookieClient;
this.cb = cb;
+ this.currentEnsemble = ensemble;
}
void sendForceLedgerRequest(int bookieIndex) {
@@ -73,7 +75,6 @@ class ForceLedgerOp extends SafeRunnable implements ForceLedgerCallback {
LOG.debug("force {} clientNonDurableLac {}", lh.ledgerId, currentNonDurableLastAddConfirmed);
}
// we need to send the request to every bookie in the ensamble
- this.currentEnsemble = lh.getLedgerMetadata().currentEnsemble;
this.ackSet = lh.distributionSchedule.getEnsembleAckSet();
DistributionSchedule.WriteSet writeSet = lh.getDistributionSchedule()
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 4f65ab3..3e9f1c2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -1052,7 +1052,7 @@ public class LedgerHandle implements WriteHandle {
}
public void asyncAddEntry(ByteBuf data, final AddCallback cb, final Object ctx) {
- PendingAddOp op = PendingAddOp.create(this, clientCtx, data, writeFlags, cb, ctx);
+ PendingAddOp op = PendingAddOp.create(this, clientCtx, getCurrentEnsemble(), data, writeFlags, cb, ctx);
doAsyncAddEntry(op);
}
@@ -1135,7 +1135,7 @@ public class LedgerHandle implements WriteHandle {
@Override
public CompletableFuture<Void> force() {
CompletableFuture<Void> result = new CompletableFuture<>();
- ForceLedgerOp op = new ForceLedgerOp(this, clientCtx.getBookieClient(), result);
+ ForceLedgerOp op = new ForceLedgerOp(this, clientCtx.getBookieClient(), getCurrentEnsemble(), result);
boolean wasClosed = false;
synchronized (this) {
// synchronized on this to ensure that
@@ -1202,7 +1202,7 @@ public class LedgerHandle implements WriteHandle {
*/
void asyncRecoveryAddEntry(final byte[] data, final int offset, final int length,
final AddCallback cb, final Object ctx) {
- PendingAddOp op = PendingAddOp.create(this, clientCtx,
+ PendingAddOp op = PendingAddOp.create(this, clientCtx, getCurrentEnsemble(),
Unpooled.wrappedBuffer(data, offset, length),
writeFlags, cb, ctx)
.enableRecoveryAdd();
@@ -1219,8 +1219,9 @@ public class LedgerHandle implements WriteHandle {
final int requiredWritable = sz - allowedNonWritableCount;
int nonWritableCount = 0;
+ List<BookieSocketAddress> currentEnsemble = getCurrentEnsemble();
for (int i = 0; i < sz; i++) {
- if (!clientCtx.getBookieClient().isWritable(getLedgerMetadata().currentEnsemble.get(i), key)) {
+ if (!clientCtx.getBookieClient().isWritable(currentEnsemble.get(i), key)) {
nonWritableCount++;
if (nonWritableCount >= allowedNonWritableCount) {
return false;
@@ -1403,7 +1404,7 @@ public class LedgerHandle implements WriteHandle {
}
};
- new ReadLastConfirmedOp(this, clientCtx.getBookieClient(), innercb).initiate();
+ new ReadLastConfirmedOp(this, clientCtx.getBookieClient(), getCurrentEnsemble(), innercb).initiate();
}
/**
@@ -1449,7 +1450,8 @@ public class LedgerHandle implements WriteHandle {
}
}
};
- new TryReadLastConfirmedOp(this, clientCtx.getBookieClient(), innercb, getLastAddConfirmed()).initiate();
+ new TryReadLastConfirmedOp(this, clientCtx.getBookieClient(), getCurrentEnsemble(),
+ innercb, getLastAddConfirmed()).initiate();
}
/**
@@ -1558,7 +1560,7 @@ public class LedgerHandle implements WriteHandle {
}
}
};
- new ReadLastConfirmedAndEntryOp(this, clientCtx, innercb, entryId - 1, timeOutInMillis)
+ new ReadLastConfirmedAndEntryOp(this, clientCtx, getCurrentEnsemble(), innercb, entryId - 1, timeOutInMillis)
.parallelRead(parallel)
.initiate();
}
@@ -1701,7 +1703,7 @@ public class LedgerHandle implements WriteHandle {
}
}
};
- new PendingReadLacOp(this, clientCtx.getBookieClient(), innercb).initiate();
+ new PendingReadLacOp(this, clientCtx.getBookieClient(), getCurrentEnsemble(), innercb).initiate();
}
/*
@@ -1822,7 +1824,7 @@ public class LedgerHandle implements WriteHandle {
final HashSet<Integer> replacedBookies = new HashSet<Integer>();
final LedgerMetadata metadata = getLedgerMetadata();
synchronized (metadata) {
- newEnsemble.addAll(metadata.currentEnsemble);
+ newEnsemble.addAll(getCurrentEnsemble());
for (Map.Entry<Integer, BookieSocketAddress> entry : failedBookies.entrySet()) {
int idx = entry.getKey();
BookieSocketAddress addr = entry.getValue();
@@ -1861,7 +1863,7 @@ public class LedgerHandle implements WriteHandle {
if (LOG.isDebugEnabled()) {
LOG.debug("[EnsembleChange-L{}-{}] : changing ensemble from: {} to: {} starting at entry: {},"
+ " failed bookies: {}, replaced bookies: {}",
- ledgerId, ensembleChangeIdx, metadata.currentEnsemble, newEnsemble,
+ ledgerId, ensembleChangeIdx, getCurrentEnsemble(), newEnsemble,
(getLastAddConfirmed() + 1), failedBookies, replacedBookies);
}
metadata.addEnsemble(newEnsembleStartEntry, newEnsemble);
@@ -1889,7 +1891,7 @@ public class LedgerHandle implements WriteHandle {
LOG.debug("Ensemble change is disabled. Retry sending to failed bookies {} for ledger {}.",
failedBookies, ledgerId);
}
- unsetSuccessAndSendWriteRequest(failedBookies.keySet());
+ unsetSuccessAndSendWriteRequest(getCurrentEnsemble(), failedBookies.keySet());
return;
}
@@ -1941,7 +1943,7 @@ public class LedgerHandle implements WriteHandle {
// Contains newly reformed ensemble, bookieIndex, failedBookieAddress
static final class EnsembleInfo {
- private final ArrayList<BookieSocketAddress> newEnsemble;
+ final ArrayList<BookieSocketAddress> newEnsemble;
private final Map<Integer, BookieSocketAddress> failedBookies;
final Set<Integer> replacedBookies;
@@ -2016,7 +2018,7 @@ public class LedgerHandle implements WriteHandle {
LOG.info("New Ensemble: {} for ledger: {}", ensembleInfo.newEnsemble, ledgerId);
// the failed bookie has been replaced
- unsetSuccessAndSendWriteRequest(ensembleInfo.replacedBookies);
+ unsetSuccessAndSendWriteRequest(ensembleInfo.newEnsemble, ensembleInfo.replacedBookies);
}
@Override
@@ -2133,7 +2135,7 @@ public class LedgerHandle implements WriteHandle {
// We've successfully changed an ensemble
// the failed bookie has been replaced
int newBlockAddCompletions = blockAddCompletions.decrementAndGet();
- unsetSuccessAndSendWriteRequest(ensembleInfo.replacedBookies);
+ unsetSuccessAndSendWriteRequest(ensembleInfo.newEnsemble, ensembleInfo.replacedBookies);
if (LOG.isDebugEnabled()) {
LOG.info("[EnsembleChange-L{}-{}] : resolved conflicts, block add complectiosn {} => {}.",
ledgerId, ensembleChangeIdx, curBlockAddCompletions, newBlockAddCompletions);
@@ -2155,7 +2157,8 @@ public class LedgerHandle implements WriteHandle {
boolean replaced = true;
for (Integer replacedBookieIdx : ensembleInfo.replacedBookies) {
BookieSocketAddress failedBookieAddr = ensembleInfo.failedBookies.get(replacedBookieIdx);
- BookieSocketAddress replacedBookieAddr = newMeta.currentEnsemble.get(replacedBookieIdx);
+ BookieSocketAddress replacedBookieAddr = newMeta.getEnsembles()
+ .lastEntry().getValue().get(replacedBookieIdx);
replaced &= !Objects.equal(replacedBookieAddr, failedBookieAddr);
}
return replaced;
@@ -2201,10 +2204,10 @@ public class LedgerHandle implements WriteHandle {
}
}
- void unsetSuccessAndSendWriteRequest(final Set<Integer> bookies) {
+ void unsetSuccessAndSendWriteRequest(List<BookieSocketAddress> ensemble, final Set<Integer> bookies) {
for (PendingAddOp pendingAddOp : pendingAddOps) {
for (Integer bookieIndex: bookies) {
- pendingAddOp.unsetSuccessAndSendWriteRequest(bookieIndex);
+ pendingAddOp.unsetSuccessAndSendWriteRequest(ensemble, bookieIndex);
}
}
}
@@ -2337,4 +2340,23 @@ public class LedgerHandle implements WriteHandle {
}
}
+ /**
+ * Get the current ensemble from the ensemble list. The current ensemble
+ * is the last ensemble in the list. The ledger handle uses this ensemble when
+ * triggering operations which work on the end of the ledger, such as adding new
+ * entries or reading the last add confirmed.
+ *
+ * <p>This method is also used by ReadOnlyLedgerHandle during recovery, and when
+ * tailing a ledger.
+ *
+ * <p>Generally, this method should only be called by LedgerHandle and not by the
+ * operations themselves, to avoid adding more dependencies between the classes.
+ * There are too many already.
+ */
+ List<BookieSocketAddress> getCurrentEnsemble() {
+ // Getting current ensemble from the metadata is only a temporary
+ // thing until metadata is immutable. At that point, current ensemble
+ // becomes a property of the LedgerHandle itself.
+ return metadata.getCurrentEnsemble();
+ }
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
index e5fa0ac..c1d3849 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
@@ -195,7 +195,7 @@ public class LedgerHandleAdv extends LedgerHandle implements WriteAdvHandle {
@Override
public void asyncAddEntry(final long entryId, ByteBuf data,
final AddCallbackWithLatency cb, final Object ctx) {
- PendingAddOp op = PendingAddOp.create(this, clientCtx, data, writeFlags, cb, ctx);
+ PendingAddOp op = PendingAddOp.create(this, clientCtx, getCurrentEnsemble(), data, writeFlags, cb, ctx);
op.setEntryId(entryId);
if ((entryId <= this.lastAddConfirmed) || pendingAddOps.contains(op)) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
index d549f7c..4f58aa8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
@@ -84,7 +84,7 @@ public class LedgerMetadata implements org.apache.bookkeeper.client.api.LedgerMe
private LedgerMetadataFormat.State state;
private TreeMap<Long, ImmutableList<BookieSocketAddress>> ensembles = new TreeMap<>();
- List<BookieSocketAddress> currentEnsemble;
+ private List<BookieSocketAddress> currentEnsemble;
volatile Version version = Version.NEW;
private boolean hasPassword = false;
@@ -336,6 +336,10 @@ public class LedgerMetadata implements org.apache.bookkeeper.client.api.LedgerMe
currentEnsemble = ensemble;
}
+ List<BookieSocketAddress> getCurrentEnsemble() {
+ return currentEnsemble;
+ }
+
public void updateEnsemble(long startEntryId, List<BookieSocketAddress> ensemble) {
checkArgument(ensembles.containsKey(startEntryId));
ensembles.put(startEntryId, ImmutableList.copyOf(ensemble));
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
index 18b47e0..6ab25d6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
@@ -97,7 +97,7 @@ class LedgerRecoveryOp implements ReadEntryListener, AddCallback {
}
public void initiate() {
- ReadLastConfirmedOp rlcop = new ReadLastConfirmedOp(lh, clientCtx.getBookieClient(),
+ ReadLastConfirmedOp rlcop = new ReadLastConfirmedOp(lh, clientCtx.getBookieClient(), lh.getCurrentEnsemble(),
new ReadLastConfirmedOp.LastConfirmedDataCallback() {
public void readLastConfirmedDataComplete(int rc, RecoveryData data) {
if (rc == BKException.Code.OK) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index d10bbe7..89bf0b8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -30,6 +30,7 @@ import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import java.util.EnumSet;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
@@ -77,8 +78,10 @@ class PendingAddOp extends SafeRunnable implements WriteCallback {
boolean hasRun;
EnumSet<WriteFlag> writeFlags;
boolean allowFailFast = false;
+ List<BookieSocketAddress> ensemble;
static PendingAddOp create(LedgerHandle lh, ClientContext clientCtx,
+ List<BookieSocketAddress> ensemble,
ByteBuf payload, EnumSet<WriteFlag> writeFlags,
AddCallbackWithLatency cb, Object ctx) {
PendingAddOp op = RECYCLER.get();
@@ -93,6 +96,7 @@ class PendingAddOp extends SafeRunnable implements WriteCallback {
op.entryLength = payload.readableBytes();
op.completed = false;
+ op.ensemble = ensemble;
op.ackSet = lh.getDistributionSchedule().getAckSet();
op.pendingWriteRequests = 0;
op.callbackTriggered = false;
@@ -131,10 +135,10 @@ class PendingAddOp extends SafeRunnable implements WriteCallback {
return this.entryId;
}
- void sendWriteRequest(int bookieIndex) {
+ void sendWriteRequest(List<BookieSocketAddress> ensemble, int bookieIndex) {
int flags = isRecoveryAdd ? FLAG_RECOVERY_ADD | FLAG_HIGH_PRIORITY : FLAG_NONE;
- clientCtx.getBookieClient().addEntry(lh.getLedgerMetadata().currentEnsemble.get(bookieIndex),
+ clientCtx.getBookieClient().addEntry(ensemble.get(bookieIndex),
lh.ledgerId, lh.ledgerKey, entryId, toSend, this, bookieIndex,
flags, allowFailFast, lh.writeFlags);
++pendingWriteRequests;
@@ -168,7 +172,10 @@ class PendingAddOp extends SafeRunnable implements WriteCallback {
}
}
- void unsetSuccessAndSendWriteRequest(int bookieIndex) {
+ void unsetSuccessAndSendWriteRequest(List<BookieSocketAddress> ensemble, int bookieIndex) {
+ // update the ensemble
+ this.ensemble = ensemble;
+
if (toSend == null) {
// this addOp hasn't yet had its mac computed. When the mac is
// computed, its write requests will be sent, so no need to send it
@@ -216,7 +223,7 @@ class PendingAddOp extends SafeRunnable implements WriteCallback {
completed = false;
}
- sendWriteRequest(bookieIndex);
+ sendWriteRequest(ensemble, bookieIndex);
}
/**
@@ -250,9 +257,10 @@ class PendingAddOp extends SafeRunnable implements WriteCallback {
}
// Iterate over set and trigger the sendWriteRequests
DistributionSchedule.WriteSet writeSet = lh.distributionSchedule.getWriteSet(entryId);
+
try {
for (int i = 0; i < writeSet.size(); i++) {
- sendWriteRequest(writeSet.get(i));
+ sendWriteRequest(ensemble, writeSet.get(i));
}
} finally {
writeSet.recycle();
@@ -264,7 +272,7 @@ class PendingAddOp extends SafeRunnable implements WriteCallback {
int bookieIndex = (Integer) ctx;
--pendingWriteRequests;
- if (!lh.getLedgerMetadata().currentEnsemble.get(bookieIndex).equals(addr)) {
+ if (!ensemble.get(bookieIndex).equals(addr)) {
// ensemble has already changed, failure of this addr is immaterial
if (LOG.isDebugEnabled()) {
LOG.debug("Write did not succeed: " + ledgerId + ", " + entryId + ". But we have already fixed it.");
@@ -460,6 +468,7 @@ class PendingAddOp extends SafeRunnable implements WriteCallback {
}
cb = null;
ctx = null;
+ ensemble = null;
ackSet.recycle();
ackSet = null;
lh = null;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
index 493643c..0dad804 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
@@ -19,7 +19,10 @@ package org.apache.bookkeeper.client;
import io.netty.buffer.ByteBuf;
+import java.util.List;
+
import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
+import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
import org.apache.bookkeeper.proto.checksum.DigestManager.RecoveryData;
@@ -51,6 +54,7 @@ class PendingReadLacOp implements ReadLacCallback {
int lastSeenError = BKException.Code.ReadException;
final DistributionSchedule.QuorumCoverageSet coverageSet;
long maxLac = LedgerHandle.INVALID_ENTRY_ID;
+ final List<BookieSocketAddress> currentEnsemble;
/*
* Wrapper to get Lac from the request
@@ -59,18 +63,18 @@ class PendingReadLacOp implements ReadLacCallback {
void getLacComplete(int rc, long lac);
}
- PendingReadLacOp(LedgerHandle lh, BookieClient bookieClient, LacCallback cb) {
+ PendingReadLacOp(LedgerHandle lh, BookieClient bookieClient, List<BookieSocketAddress> ensemble, LacCallback cb) {
this.lh = lh;
this.bookieClient = bookieClient;
this.cb = cb;
- this.numResponsesPending = lh.getLedgerMetadata().getEnsembleSize();
+ this.numResponsesPending = ensemble.size();
this.coverageSet = lh.distributionSchedule.getCoverageSet();
+ this.currentEnsemble = ensemble;
}
public void initiate() {
- LedgerMetadata metadata = lh.getLedgerMetadata();
- for (int i = 0; i < metadata.currentEnsemble.size(); i++) {
- bookieClient.readLac(metadata.currentEnsemble.get(i), lh.ledgerId, this, i);
+ for (int i = 0; i < currentEnsemble.size(); i++) {
+ bookieClient.readLac(currentEnsemble.get(i), lh.ledgerId, this, i);
}
}
@@ -120,7 +124,7 @@ class PendingReadLacOp implements ReadLacCallback {
// Too bad, this bookie did not give us a valid answer, we
// still might be able to recover. So, continue
LOG.error("Mac mismatch while reading ledger: " + ledgerId + " LAC from bookie: "
- + lh.getLedgerMetadata().currentEnsemble.get(bookieIndex));
+ + currentEnsemble.get(bookieIndex));
rc = BKException.Code.DigestMatchException;
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
index 730ad07..2881d2f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
@@ -18,6 +18,7 @@
package org.apache.bookkeeper.client;
import java.util.BitSet;
+import java.util.List;
import org.apache.bookkeeper.client.AsyncCallback.AddLacCallback;
import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -50,7 +51,9 @@ class PendingWriteLacOp implements WriteLacCallback {
LedgerHandle lh;
ClientContext clientCtx;
- PendingWriteLacOp(LedgerHandle lh, ClientContext clientCtx,
+ final List<BookieSocketAddress> currentEnsemble;
+
+ PendingWriteLacOp(LedgerHandle lh, ClientContext clientCtx, List<BookieSocketAddress> ensemble,
AddLacCallback cb, Object ctx) {
this.lh = lh;
this.clientCtx = clientCtx;
@@ -58,6 +61,7 @@ class PendingWriteLacOp implements WriteLacCallback {
this.ctx = ctx;
this.lac = LedgerHandle.INVALID_ENTRY_ID;
ackSet = lh.distributionSchedule.getAckSet();
+ currentEnsemble = ensemble;
}
void setLac(long lac) {
@@ -70,7 +74,7 @@ class PendingWriteLacOp implements WriteLacCallback {
}
void sendWriteLacRequest(int bookieIndex) {
- clientCtx.getBookieClient().writeLac(lh.getLedgerMetadata().currentEnsemble.get(bookieIndex),
+ clientCtx.getBookieClient().writeLac(currentEnsemble.get(bookieIndex),
lh.ledgerId, lh.ledgerKey, lac, toSend, this, bookieIndex);
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
index d71cc0a..e61e666 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
@@ -63,6 +63,7 @@ class ReadLastConfirmedAndEntryOp implements BookkeeperInternalCallbacks.ReadEnt
private final long prevEntryId;
private long lastAddConfirmed;
private long timeOutInMillis;
+ private final List<BookieSocketAddress> currentEnsemble;
abstract class ReadLACAndEntryRequest implements AutoCloseable {
@@ -425,6 +426,7 @@ class ReadLastConfirmedAndEntryOp implements BookkeeperInternalCallbacks.ReadEnt
ReadLastConfirmedAndEntryOp(LedgerHandle lh,
ClientContext clientCtx,
+ List<BookieSocketAddress> ensemble,
LastConfirmedAndEntryCallback cb,
long prevEntryId,
long timeOutInMillis) {
@@ -435,6 +437,8 @@ class ReadLastConfirmedAndEntryOp implements BookkeeperInternalCallbacks.ReadEnt
this.lastAddConfirmed = lh.getLastAddConfirmed();
this.timeOutInMillis = timeOutInMillis;
this.numResponsesPending = 0;
+
+ this.currentEnsemble = ensemble;
// since long poll is effectively reading lac with waits, lac can be potentially
// be advanced in different write quorums, so we need to make sure to cover enough
// bookies before claiming lac is not advanced.
@@ -480,9 +484,9 @@ class ReadLastConfirmedAndEntryOp implements BookkeeperInternalCallbacks.ReadEnt
public void initiate() {
if (parallelRead) {
- request = new ParallelReadRequest(lh.getLedgerMetadata().currentEnsemble, lh.getId(), prevEntryId + 1);
+ request = new ParallelReadRequest(currentEnsemble, lh.getId(), prevEntryId + 1);
} else {
- request = new SequenceReadRequest(lh.getLedgerMetadata().currentEnsemble, lh.getId(), prevEntryId + 1);
+ request = new SequenceReadRequest(currentEnsemble, lh.getId(), prevEntryId + 1);
}
request.read();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
index b72639d..dfbc167 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
@@ -18,8 +18,10 @@
package org.apache.bookkeeper.client;
import io.netty.buffer.ByteBuf;
+import java.util.List;
import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
+import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
@@ -42,6 +44,7 @@ class ReadLastConfirmedOp implements ReadEntryCallback {
LastConfirmedDataCallback cb;
final DistributionSchedule.QuorumCoverageSet coverageSet;
+ final List<BookieSocketAddress> currentEnsemble;
/**
* Wrapper to get all recovered data from the request.
@@ -51,19 +54,19 @@ class ReadLastConfirmedOp implements ReadEntryCallback {
}
public ReadLastConfirmedOp(LedgerHandle lh, BookieClient bookieClient,
- LastConfirmedDataCallback cb) {
+ List<BookieSocketAddress> ensemble, LastConfirmedDataCallback cb) {
this.cb = cb;
this.bookieClient = bookieClient;
this.maxRecoveredData = new RecoveryData(LedgerHandle.INVALID_ENTRY_ID, 0);
this.lh = lh;
this.numResponsesPending = lh.getLedgerMetadata().getEnsembleSize();
this.coverageSet = lh.distributionSchedule.getCoverageSet();
+ this.currentEnsemble = ensemble;
}
public void initiate() {
- LedgerMetadata metadata = lh.getLedgerMetadata();
- for (int i = 0; i < metadata.currentEnsemble.size(); i++) {
- bookieClient.readEntry(metadata.currentEnsemble.get(i),
+ for (int i = 0; i < currentEnsemble.size(); i++) {
+ bookieClient.readEntry(currentEnsemble.get(i),
lh.ledgerId,
BookieProtocol.LAST_ADD_CONFIRMED,
this, i, BookieProtocol.FLAG_NONE);
@@ -71,9 +74,8 @@ class ReadLastConfirmedOp implements ReadEntryCallback {
}
public void initiateWithFencing() {
- LedgerMetadata metadata = lh.getLedgerMetadata();
- for (int i = 0; i < metadata.currentEnsemble.size(); i++) {
- bookieClient.readEntry(metadata.currentEnsemble.get(i),
+ for (int i = 0; i < currentEnsemble.size(); i++) {
+ bookieClient.readEntry(currentEnsemble.get(i),
lh.ledgerId,
BookieProtocol.LAST_ADD_CONFIRMED,
this, i, BookieProtocol.FLAG_DO_FENCING,
@@ -102,7 +104,7 @@ class ReadLastConfirmedOp implements ReadEntryCallback {
// still might be able to recover though so continue
LOG.error("Mac mismatch for ledger: " + ledgerId + ", entry: " + entryId
+ " while reading last entry from bookie: "
- + lh.getLedgerMetadata().currentEnsemble.get(bookieIndex));
+ + currentEnsemble.get(bookieIndex));
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
index fba7ebd..e9f7900 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
@@ -140,7 +140,7 @@ class ReadOnlyLedgerHandle extends LedgerHandle implements LedgerMetadataListene
}
blockAddCompletions.decrementAndGet();
// the failed bookie has been replaced
- unsetSuccessAndSendWriteRequest(ensembleInfo.replacedBookies);
+ unsetSuccessAndSendWriteRequest(ensembleInfo.newEnsemble, ensembleInfo.replacedBookies);
} catch (BKException.BKNotEnoughBookiesException e) {
LOG.error("Could not get additional bookie to "
+ "remake ensemble, closing ledger: " + ledgerId);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java
index 181b343..1311d31 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java
@@ -18,8 +18,10 @@
package org.apache.bookkeeper.client;
import io.netty.buffer.ByteBuf;
+import java.util.List;
import org.apache.bookkeeper.client.ReadLastConfirmedOp.LastConfirmedDataCallback;
+import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
@@ -43,20 +45,21 @@ class TryReadLastConfirmedOp implements ReadEntryCallback {
volatile boolean hasValidResponse = false;
volatile boolean completed = false;
RecoveryData maxRecoveredData;
+ final List<BookieSocketAddress> currentEnsemble;
TryReadLastConfirmedOp(LedgerHandle lh, BookieClient bookieClient,
- LastConfirmedDataCallback cb, long lac) {
+ List<BookieSocketAddress> ensemble, LastConfirmedDataCallback cb, long lac) {
this.lh = lh;
this.bookieClient = bookieClient;
this.cb = cb;
this.maxRecoveredData = new RecoveryData(lac, 0);
this.numResponsesPending = lh.getLedgerMetadata().getEnsembleSize();
+ this.currentEnsemble = ensemble;
}
public void initiate() {
- LedgerMetadata metadata = lh.getLedgerMetadata();
- for (int i = 0; i < metadata.currentEnsemble.size(); i++) {
- bookieClient.readEntry(metadata.currentEnsemble.get(i),
+ for (int i = 0; i < currentEnsemble.size(); i++) {
+ bookieClient.readEntry(currentEnsemble.get(i),
lh.ledgerId,
BookieProtocol.LAST_ADD_CONFIRMED,
this, i, BookieProtocol.FLAG_NONE);
@@ -88,7 +91,7 @@ class TryReadLastConfirmedOp implements ReadEntryCallback {
} catch (BKException.BKDigestMatchException e) {
LOG.error("Mac mismatch for ledger: " + ledgerId + ", entry: " + entryId
+ " while reading last entry from bookie: "
- + lh.getLedgerMetadata().currentEnsemble.get(bookieIndex));
+ + currentEnsemble.get(bookieIndex));
}
} else if (BKException.Code.UnauthorizedAccessException == rc && !completed) {
cb.readLastConfirmedDataComplete(rc, maxRecoveredData);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
index 85376ea..82a87c3 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
@@ -292,7 +292,7 @@ public class BookieWriteLedgerTest extends
CountDownLatch sleepLatch1 = new CountDownLatch(1);
// get bookie at index-0
- BookieSocketAddress bookie1 = lh.getLedgerMetadata().currentEnsemble.get(0);
+ BookieSocketAddress bookie1 = lh.getCurrentEnsemble().get(0);
sleepBookie(bookie1, sleepLatch1);
int i = numEntriesToWrite;
@@ -331,7 +331,7 @@ public class BookieWriteLedgerTest extends
sleepLatch1.countDown();
// get the bookie at index-0 again, this must be different.
- BookieSocketAddress bookie2 = lh.getLedgerMetadata().currentEnsemble.get(0);
+ BookieSocketAddress bookie2 = lh.getCurrentEnsemble().get(0);
assertFalse(
"Delayed write error must have forced ensemble change",
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
index 3259932..e761f1c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
@@ -145,12 +145,12 @@ public class LedgerCloseTest extends BookKeeperClusterTestCase {
final CountDownLatch recoverDoneLatch = new CountDownLatch(1);
final CountDownLatch failedLatch = new CountDownLatch(1);
// kill first bookie to replace with a unauthorize bookie
- BookieSocketAddress bookie = lh.getLedgerMetadata().currentEnsemble.get(0);
+ BookieSocketAddress bookie = lh.getCurrentEnsemble().get(0);
ServerConfiguration conf = killBookie(bookie);
// replace a unauthorize bookie
startUnauthorizedBookie(conf, addDoneLatch);
// kill second bookie to replace with a dead bookie
- bookie = lh.getLedgerMetadata().currentEnsemble.get(1);
+ bookie = lh.getCurrentEnsemble().get(1);
conf = killBookie(bookie);
// replace a slow dead bookie
startDeadBookie(conf, deadIOLatch);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
index 25da4ba..e126001 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
@@ -188,7 +188,7 @@ public class LedgerRecoveryTest extends BookKeeperClusterTestCase {
// kill first bookie server to start a fake one to simulate a slow bookie
// and failed to add entry on crash
// until write succeed
- BookieSocketAddress host = beforelh.getLedgerMetadata().currentEnsemble.get(slowBookieIdx);
+ BookieSocketAddress host = beforelh.getCurrentEnsemble().get(slowBookieIdx);
ServerConfiguration conf = killBookie(host);
Bookie fakeBookie = new Bookie(conf) {
@@ -262,7 +262,7 @@ public class LedgerRecoveryTest extends BookKeeperClusterTestCase {
bs.add(startBookie(conf, deadBookie1));
// kill first bookie server
- BookieSocketAddress bookie1 = lhbefore.getLedgerMetadata().currentEnsemble.get(0);
+ BookieSocketAddress bookie1 = lhbefore.getCurrentEnsemble().get(0);
ServerConfiguration conf1 = killBookie(bookie1);
// Try to recover and fence the ledger after killing one bookie in the
@@ -277,7 +277,7 @@ public class LedgerRecoveryTest extends BookKeeperClusterTestCase {
// restart the first server, kill the second
bsConfs.add(conf1);
bs.add(startBookie(conf1));
- BookieSocketAddress bookie2 = lhbefore.getLedgerMetadata().currentEnsemble.get(1);
+ BookieSocketAddress bookie2 = lhbefore.getCurrentEnsemble().get(1);
ServerConfiguration conf2 = killBookie(bookie2);
// using async, because this could trigger an assertion
@@ -343,7 +343,7 @@ public class LedgerRecoveryTest extends BookKeeperClusterTestCase {
bs.add(startBookie(conf, deadBookie1));
// kill first bookie server
- BookieSocketAddress bookie1 = lhbefore.getLedgerMetadata().currentEnsemble.get(0);
+ BookieSocketAddress bookie1 = lhbefore.getCurrentEnsemble().get(0);
killBookie(bookie1);
// Try to recover and fence the ledger after killing one bookie in the
@@ -395,9 +395,9 @@ public class LedgerRecoveryTest extends BookKeeperClusterTestCase {
fail("Failed to add " + numEntries + " to ledger handle " + lh.getId());
}
// kill first 2 bookies to replace bookies
- BookieSocketAddress bookie1 = lh.getLedgerMetadata().currentEnsemble.get(0);
+ BookieSocketAddress bookie1 = lh.getCurrentEnsemble().get(0);
ServerConfiguration conf1 = killBookie(bookie1);
- BookieSocketAddress bookie2 = lh.getLedgerMetadata().currentEnsemble.get(1);
+ BookieSocketAddress bookie2 = lh.getCurrentEnsemble().get(1);
ServerConfiguration conf2 = killBookie(bookie2);
// replace these two bookies
@@ -451,8 +451,8 @@ public class LedgerRecoveryTest extends BookKeeperClusterTestCase {
CountDownLatch latch1 = new CountDownLatch(1);
CountDownLatch latch2 = new CountDownLatch(1);
- sleepBookie(lh.getLedgerMetadata().currentEnsemble.get(0), latch1);
- sleepBookie(lh.getLedgerMetadata().currentEnsemble.get(1), latch2);
+ sleepBookie(lh.getCurrentEnsemble().get(0), latch1);
+ sleepBookie(lh.getCurrentEnsemble().get(1), latch2);
int numEntries = (numBookies * 3) + 1;
final AtomicInteger numPendingAdds = new AtomicInteger(numEntries);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
index 3213c46..8b5f5a6 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
@@ -300,8 +300,8 @@ public class ParallelLedgerRecoveryTest extends BookKeeperClusterTestCase {
final LedgerHandle lh = newBk.createLedger(numBookies, 2, 2, digestType, "".getBytes());
CountDownLatch latch1 = new CountDownLatch(1);
CountDownLatch latch2 = new CountDownLatch(1);
- sleepBookie(lh.getLedgerMetadata().currentEnsemble.get(0), latch1);
- sleepBookie(lh.getLedgerMetadata().currentEnsemble.get(1), latch2);
+ sleepBookie(lh.getCurrentEnsemble().get(0), latch1);
+ sleepBookie(lh.getCurrentEnsemble().get(1), latch2);
int numEntries = (numBookies * 3) + 1;
final AtomicInteger numPendingAdds = new AtomicInteger(numEntries);
@@ -428,16 +428,17 @@ public class ParallelLedgerRecoveryTest extends BookKeeperClusterTestCase {
final CountDownLatch addLatch = new CountDownLatch(1);
final AtomicBoolean addSuccess = new AtomicBoolean(false);
LOG.info("Add entry {} with lac = {}", entryId, lac);
- bkc.getBookieClient().addEntry(lh.getLedgerMetadata().currentEnsemble.get(0),
- lh.getId(), lh.ledgerKey, entryId, toSend,
- new WriteCallback() {
- @Override
- public void writeComplete(int rc, long ledgerId, long entryId,
- BookieSocketAddress addr, Object ctx) {
- addSuccess.set(BKException.Code.OK == rc);
- addLatch.countDown();
- }
- }, 0, BookieProtocol.FLAG_NONE, false, WriteFlag.NONE);
+
+ bkc.getBookieClient().addEntry(lh.getCurrentEnsemble().get(0),
+ lh.getId(), lh.ledgerKey, entryId, toSend,
+ new WriteCallback() {
+ @Override
+ public void writeComplete(int rc, long ledgerId, long entryId,
+ BookieSocketAddress addr, Object ctx) {
+ addSuccess.set(BKException.Code.OK == rc);
+ addLatch.countDown();
+ }
+ }, 0, BookieProtocol.FLAG_NONE, false, WriteFlag.NONE);
addLatch.await();
assertTrue("add entry 14 should succeed", addSuccess.get());
@@ -587,7 +588,7 @@ public class ParallelLedgerRecoveryTest extends BookKeeperClusterTestCase {
LOG.info("Create ledger {}", lh0.getId());
// 0) place the bookie with a fake bookie
- BookieSocketAddress address = lh0.getLedgerMetadata().currentEnsemble.get(0);
+ BookieSocketAddress address = lh0.getCurrentEnsemble().get(0);
ServerConfiguration conf = killBookie(address);
conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
DelayResponseBookie fakeBookie = new DelayResponseBookie(conf);
@@ -667,14 +668,16 @@ public class ParallelLedgerRecoveryTest extends BookKeeperClusterTestCase {
final AtomicLong lacHolder = new AtomicLong(-1234L);
final AtomicInteger rcHolder = new AtomicInteger(-1234);
final CountDownLatch doneLatch = new CountDownLatch(1);
- new ReadLastConfirmedOp(readLh, bkc.getBookieClient(), new ReadLastConfirmedOp.LastConfirmedDataCallback() {
- @Override
- public void readLastConfirmedDataComplete(int rc, DigestManager.RecoveryData data) {
- rcHolder.set(rc);
- lacHolder.set(data.getLastAddConfirmed());
- doneLatch.countDown();
- }
- }).initiate();
+
+ new ReadLastConfirmedOp(readLh, bkc.getBookieClient(), readLh.getCurrentEnsemble(),
+ new ReadLastConfirmedOp.LastConfirmedDataCallback() {
+ @Override
+ public void readLastConfirmedDataComplete(int rc, DigestManager.RecoveryData data) {
+ rcHolder.set(rc);
+ lacHolder.set(data.getLastAddConfirmed());
+ doneLatch.countDown();
+ }
+ }).initiate();
doneLatch.await();
assertEquals(BKException.Code.OK, rcHolder.get());
assertEquals(1L, lacHolder.get());
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/PendingAddOpTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/PendingAddOpTest.java
index a6bc088..51d296c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/PendingAddOpTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/PendingAddOpTest.java
@@ -69,7 +69,7 @@ public class PendingAddOpTest {
public void testExecuteAfterCancelled() {
AtomicInteger rcHolder = new AtomicInteger(-0xdead);
PendingAddOp op = PendingAddOp.create(
- lh, mockClientContext,
+ lh, mockClientContext, lh.getCurrentEnsemble(),
payload, WriteFlag.NONE,
(rc, handle, entryId, qwcLatency, ctx) -> {
rcHolder.set(rc);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java
index 7d524a5..3d9c394 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java
@@ -126,6 +126,7 @@ public class ReadLastConfirmedAndEntryOpTest {
this.mockLh = mock(LedgerHandle.class);
when(mockLh.getId()).thenReturn(LEDGERID);
+ when(mockLh.getCurrentEnsemble()).thenReturn(ensemble);
when(mockLh.getLedgerMetadata()).thenReturn(ledgerMetadata);
when(mockLh.getDistributionSchedule()).thenReturn(distributionSchedule);
digestManager = new DummyDigestManager(LEDGERID, false);
@@ -199,7 +200,7 @@ public class ReadLastConfirmedAndEntryOpTest {
};
ReadLastConfirmedAndEntryOp op = new ReadLastConfirmedAndEntryOp(
- mockLh, mockClientCtx, resultCallback, 1L, 10000);
+ mockLh, mockClientCtx, mockLh.getCurrentEnsemble(), resultCallback, 1L, 10000);
op.initiate();
// wait until all speculative requests are sent
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
index 71a9b11..a70ec32 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
@@ -73,7 +73,7 @@ public class SlowBookieTest extends BookKeeperClusterTestCase {
final CountDownLatch b0latch = new CountDownLatch(1);
final CountDownLatch b1latch = new CountDownLatch(1);
final CountDownLatch addEntrylatch = new CountDownLatch(1);
- List<BookieSocketAddress> curEns = lh.getLedgerMetadata().currentEnsemble;
+ List<BookieSocketAddress> curEns = lh.getCurrentEnsemble();
try {
sleepBookie(curEns.get(0), b0latch);
for (int i = 0; i < 10; i++) {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestAddEntryQuorumTimeout.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestAddEntryQuorumTimeout.java
index 44134cc..efbc43d 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestAddEntryQuorumTimeout.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestAddEntryQuorumTimeout.java
@@ -81,7 +81,7 @@ public class TestAddEntryQuorumTimeout extends BookKeeperClusterTestCase impleme
public void testBasicTimeout() throws Exception {
BookKeeperTestClient bkc = new BookKeeperTestClient(baseClientConf);
LedgerHandle lh = bkc.createLedger(3, 3, 3, digestType, testPasswd);
- List<BookieSocketAddress> curEns = lh.getLedgerMetadata().currentEnsemble;
+ List<BookieSocketAddress> curEns = lh.getCurrentEnsemble();
byte[] data = "foobar".getBytes();
lh.addEntry(data);
sleepBookie(curEns.get(0), 5).await();
@@ -105,7 +105,7 @@ public class TestAddEntryQuorumTimeout extends BookKeeperClusterTestCase impleme
public void testTimeoutWithPendingOps() throws Exception {
BookKeeperTestClient bkc = new BookKeeperTestClient(baseClientConf);
LedgerHandle lh = bkc.createLedger(3, 3, 3, digestType, testPasswd);
- List<BookieSocketAddress> curEns = lh.getLedgerMetadata().currentEnsemble;
+ List<BookieSocketAddress> curEns = lh.getCurrentEnsemble();
byte[] data = "foobar".getBytes();
SyncObj syncObj1 = new SyncObj();
@@ -130,7 +130,7 @@ public class TestAddEntryQuorumTimeout extends BookKeeperClusterTestCase impleme
public void testLedgerClosedAfterTimeout() throws Exception {
BookKeeperTestClient bkc = new BookKeeperTestClient(baseClientConf);
LedgerHandle lh = bkc.createLedger(3, 3, 3, digestType, testPasswd);
- List<BookieSocketAddress> curEns = lh.getLedgerMetadata().currentEnsemble;
+ List<BookieSocketAddress> curEns = lh.getCurrentEnsemble();
byte[] data = "foobar".getBytes();
CountDownLatch b0latch = sleepBookie(curEns.get(0), 5);
try {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDelayEnsembleChange.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDelayEnsembleChange.java
index 62be7cb..e8da86e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDelayEnsembleChange.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDelayEnsembleChange.java
@@ -146,8 +146,8 @@ public class TestDelayEnsembleChange extends BookKeeperClusterTestCase {
}
// kill two bookies, but we still have 3 bookies for the ack quorum.
- ServerConfiguration conf0 = killBookie(lh.getLedgerMetadata().currentEnsemble.get(0));
- ServerConfiguration conf1 = killBookie(lh.getLedgerMetadata().currentEnsemble.get(1));
+ ServerConfiguration conf0 = killBookie(lh.getCurrentEnsemble().get(0));
+ ServerConfiguration conf1 = killBookie(lh.getCurrentEnsemble().get(1));
for (int i = numEntries; i < 2 * numEntries; i++) {
lh.addEntry(data);
@@ -213,7 +213,7 @@ public class TestDelayEnsembleChange extends BookKeeperClusterTestCase {
logger.info("Kill bookie 0 and write {} entries.", numEntries);
// kill two bookies, but we still have 3 bookies for the ack quorum.
- ServerConfiguration conf0 = killBookie(lh.getLedgerMetadata().currentEnsemble.get(0));
+ ServerConfiguration conf0 = killBookie(lh.getCurrentEnsemble().get(0));
for (int i = numEntries; i < 2 * numEntries; i++) {
lh.addEntry(data);
@@ -230,7 +230,7 @@ public class TestDelayEnsembleChange extends BookKeeperClusterTestCase {
logger.info("Kill bookie 1 and write another {} entries.", numEntries);
- ServerConfiguration conf1 = killBookie(lh.getLedgerMetadata().currentEnsemble.get(1));
+ ServerConfiguration conf1 = killBookie(lh.getCurrentEnsemble().get(1));
for (int i = 2 * numEntries; i < 3 * numEntries; i++) {
lh.addEntry(data);
@@ -242,7 +242,7 @@ public class TestDelayEnsembleChange extends BookKeeperClusterTestCase {
logger.info("Kill bookie 2 and write another {} entries.", numEntries);
- ServerConfiguration conf2 = killBookie(lh.getLedgerMetadata().currentEnsemble.get(2));
+ ServerConfiguration conf2 = killBookie(lh.getCurrentEnsemble().get(2));
for (int i = 3 * numEntries; i < 4 * numEntries; i++) {
lh.addEntry(data);
@@ -304,9 +304,9 @@ public class TestDelayEnsembleChange extends BookKeeperClusterTestCase {
logger.info("Killed 3 bookies and add {} more entries : {}", numEntries, lh.getLedgerMetadata());
// kill three bookies, but we only have 2 new bookies for ensemble change.
- ServerConfiguration conf0 = killBookie(lh.getLedgerMetadata().currentEnsemble.get(0));
- ServerConfiguration conf1 = killBookie(lh.getLedgerMetadata().currentEnsemble.get(1));
- ServerConfiguration conf2 = killBookie(lh.getLedgerMetadata().currentEnsemble.get(2));
+ ServerConfiguration conf0 = killBookie(lh.getCurrentEnsemble().get(0));
+ ServerConfiguration conf1 = killBookie(lh.getCurrentEnsemble().get(1));
+ ServerConfiguration conf2 = killBookie(lh.getCurrentEnsemble().get(2));
for (int i = numEntries; i < 2 * numEntries; i++) {
lh.addEntry(data);
@@ -360,7 +360,7 @@ public class TestDelayEnsembleChange extends BookKeeperClusterTestCase {
// kill 5 bookies to introduce more bookie failure
List<ServerConfiguration> confs = new ArrayList<ServerConfiguration>(5);
for (int i = 0; i < 5; i++) {
- confs.add(killBookie(lh.getLedgerMetadata().currentEnsemble.get(i)));
+ confs.add(killBookie(lh.getCurrentEnsemble().get(i)));
}
for (int i = numEntries; i < 2 * numEntries; i++) {
@@ -406,7 +406,7 @@ public class TestDelayEnsembleChange extends BookKeeperClusterTestCase {
}
// kill two bookies, but we still have 3 bookies for the ack quorum.
- setBookieToReadOnly(lh.getLedgerMetadata().currentEnsemble.get(0));
+ setBookieToReadOnly(lh.getCurrentEnsemble().get(0));
for (int i = numEntries; i < 2 * numEntries; i++) {
lh.addEntry(data);
@@ -429,8 +429,8 @@ public class TestDelayEnsembleChange extends BookKeeperClusterTestCase {
lh.addEntry(data);
}
- BookieSocketAddress failedBookie = lh.getLedgerMetadata().currentEnsemble.get(0);
- BookieSocketAddress readOnlyBookie = lh.getLedgerMetadata().currentEnsemble.get(1);
+ BookieSocketAddress failedBookie = lh.getCurrentEnsemble().get(0);
+ BookieSocketAddress readOnlyBookie = lh.getCurrentEnsemble().get(1);
ServerConfiguration conf0 = killBookie(failedBookie);
for (int i = 0; i < numEntries; i++) {
@@ -450,9 +450,9 @@ public class TestDelayEnsembleChange extends BookKeeperClusterTestCase {
// ensure there is no ensemble changed
assertEquals("The ensemble should change when a bookie is readonly even if we delay ensemble change.",
2, lh.getLedgerMetadata().getEnsembles().size());
- assertEquals(3, lh.getLedgerMetadata().currentEnsemble.size());
- assertFalse(lh.getLedgerMetadata().currentEnsemble.contains(failedBookie));
- assertFalse(lh.getLedgerMetadata().currentEnsemble.contains(readOnlyBookie));
+ assertEquals(3, lh.getCurrentEnsemble().size());
+ assertFalse(lh.getCurrentEnsemble().contains(failedBookie));
+ assertFalse(lh.getCurrentEnsemble().contains(readOnlyBookie));
}
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDisableEnsembleChange.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDisableEnsembleChange.java
index 26f076a..f114cbf 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDisableEnsembleChange.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDisableEnsembleChange.java
@@ -239,7 +239,7 @@ public class TestDisableEnsembleChange extends BookKeeperClusterTestCase {
lh.addEntry(entry);
}
- List<BookieSocketAddress> curEns = lh.getLedgerMetadata().currentEnsemble;
+ List<BookieSocketAddress> curEns = lh.getCurrentEnsemble();
final CountDownLatch wakeupLatch = new CountDownLatch(1);
final CountDownLatch suspendLatch = new CountDownLatch(1);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java
index e0f3dc6..c9ca508 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java
@@ -51,11 +51,11 @@ public class TestPendingReadLacOp extends BookKeeperClusterTestCase {
lh.append(data);
final CompletableFuture<Long> result = new CompletableFuture<>();
- PendingReadLacOp pro = new PendingReadLacOp(lh, bkc.getBookieClient(),
+ PendingReadLacOp pro = new PendingReadLacOp(lh, bkc.getBookieClient(), lh.getCurrentEnsemble(),
(rc, lac) -> result.complete(lac)) {
@Override
public void initiate() {
- for (int i = 0; i < lh.getLedgerMetadata().currentEnsemble.size(); i++) {
+ for (int i = 0; i < lh.getCurrentEnsemble().size(); i++) {
final int index = i;
ByteBufList buffer = lh.getDigestManager().computeDigestAndPackageForSending(
2,
@@ -71,8 +71,8 @@ public class TestPendingReadLacOp extends BookKeeperClusterTestCase {
index);
}, 0, TimeUnit.SECONDS);
- bookieClient.readLac(lh.getLedgerMetadata().currentEnsemble.get(i),
- lh.ledgerId, this, i);
+ bookieClient.readLac(lh.getCurrentEnsemble().get(i),
+ lh.ledgerId, this, i);
}
}
};
@@ -88,10 +88,11 @@ public class TestPendingReadLacOp extends BookKeeperClusterTestCase {
lh.append(data);
final CompletableFuture<Long> result = new CompletableFuture<>();
- PendingReadLacOp pro = new PendingReadLacOp(lh, bkc.getBookieClient(), (rc, lac) -> result.complete(lac)) {
+ PendingReadLacOp pro = new PendingReadLacOp(lh, bkc.getBookieClient(), lh.getCurrentEnsemble(),
+ (rc, lac) -> result.complete(lac)) {
@Override
public void initiate() {
- for (int i = 0; i < lh.getLedgerMetadata().currentEnsemble.size(); i++) {
+ for (int i = 0; i < lh.getCurrentEnsemble().size(); i++) {
final int index = i;
ByteBufList buffer = lh.getDigestManager().computeDigestAndPackageForSendingLac(1);
bkc.scheduler.schedule(() -> {
@@ -102,7 +103,7 @@ public class TestPendingReadLacOp extends BookKeeperClusterTestCase {
null,
index);
}, 0, TimeUnit.SECONDS);
- bookieClient.readLac(lh.getLedgerMetadata().currentEnsemble.get(i),
+ bookieClient.readLac(lh.getCurrentEnsemble().get(i),
lh.ledgerId, this, i);
}
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedLongPoll.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedLongPoll.java
index f754f6a..d05f864 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedLongPoll.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedLongPoll.java
@@ -153,7 +153,7 @@ public class TestReadLastConfirmedLongPoll extends BookKeeperClusterTestCase {
ServerConfiguration[] confs = new ServerConfiguration[numEntries - 1];
for (int j = 0; j < numEntries - 1; j++) {
int idx = (i + 1 + j) % numEntries;
- confs[j] = killBookie(lh.getLedgerMetadata().currentEnsemble.get(idx));
+ confs[j] = killBookie(lh.getCurrentEnsemble().get(idx));
}
final AtomicBoolean entryAsExpected = new AtomicBoolean(false);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestTryReadLastConfirmed.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestTryReadLastConfirmed.java
index 0117366..a4a63fa 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestTryReadLastConfirmed.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestTryReadLastConfirmed.java
@@ -119,7 +119,7 @@ public class TestTryReadLastConfirmed extends BookKeeperClusterTestCase {
ServerConfiguration[] confs = new ServerConfiguration[ensembleSize - 1];
for (int j = 0; j < ensembleSize - 1; j++) {
int idx = (i + 1 + j) % ensembleSize;
- confs[j] = killBookie(lh.getLedgerMetadata().currentEnsemble.get(idx));
+ confs[j] = killBookie(lh.getCurrentEnsemble().get(idx));
}
final AtomicBoolean success = new AtomicBoolean(false);
@@ -164,7 +164,7 @@ public class TestTryReadLastConfirmed extends BookKeeperClusterTestCase {
lh.addEntry(("data" + i).getBytes());
}
for (int i = 0; i < ensembleSize; i++) {
- killBookie(lh.getLedgerMetadata().currentEnsemble.get(i));
+ killBookie(lh.getCurrentEnsemble().get(i));
}
final AtomicBoolean success = new AtomicBoolean(false);
final AtomicInteger numCallbacks = new AtomicInteger(0);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
index e6daa66..456839a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
@@ -93,7 +93,7 @@ public class TestWatchEnsembleChange extends BookKeeperClusterTestCase {
long lastLAC = readLh.getLastAddConfirmed();
assertEquals(numEntries - 2, lastLAC);
List<BookieSocketAddress> ensemble =
- lh.getLedgerMetadata().currentEnsemble;
+ lh.getCurrentEnsemble();
for (BookieSocketAddress addr : ensemble) {
killBookie(addr);
}
diff --git a/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java b/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
index ffe2d88..3388110 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
@@ -197,7 +197,7 @@ public class LedgerReader {
op.submit();
};
// Read Last AddConfirmed
- new ReadLastConfirmedOp(lh, clientCtx.getBookieClient(), readLACCallback).initiate();
+ new ReadLastConfirmedOp(lh, clientCtx.getBookieClient(), lh.getCurrentEnsemble(), readLACCallback).initiate();
}
public void readLacs(final LedgerHandle lh, long eid,