You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2018/06/18 07:05:12 UTC
[bookkeeper] branch master updated: BP-14 force() API - client side
implementation
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new f7dce11 BP-14 force() API - client side implementation
f7dce11 is described below
commit f7dce110fdca47c93664d1d9c71cda8cb08c9cc2
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Mon Jun 18 09:05:11 2018 +0200
BP-14 force() API - client side implementation
- Introduce the client side force() API
- Implementation on the client side wire protocol for FORCE_LEDGER RPC
- Disable ensemble changes for DEFERRED_SYNC writers
- Prevent v2 client from using force() API.
The force() API enables the client (usually with DEFERRED_SYNC write flags) to require a point of synchronization with all the bookies in the ensemble, to have guarantees about durability of previously written entries (and ackknowledgerd), this way LastAddConfirmed is able to advance.
For DEFERRED_SYNC writers LastAddConfirmed will advance only using this API
Author: Enrico Olivelli <eo...@apache.org>
Reviewers: Jia Zhai <None>, Sijie Guo <si...@apache.org>, Venkateswararao Jujjuri (JV) <None>
This closes #1436 from eolivelli/bp14-force-client-api
---
.../org/apache/bookkeeper/client/BookKeeper.java | 19 ++
.../bookkeeper/client/BookKeeperClientStats.java | 3 +
.../bookkeeper/client/DistributionSchedule.java | 13 +-
.../apache/bookkeeper/client/ForceLedgerOp.java | 124 +++++++++++++
.../org/apache/bookkeeper/client/LedgerHandle.java | 83 ++++++++-
.../org/apache/bookkeeper/client/PendingAddOp.java | 3 +-
.../client/ReadLastConfirmedAndEntryOp.java | 2 +-
.../client/RoundRobinDistributionSchedule.java | 9 +-
.../apache/bookkeeper/client/api/BKException.java | 2 +-
.../bookkeeper/client/api/ForceableHandle.java | 50 ++++++
.../bookkeeper/client/api/WriteAdvHandle.java | 2 +-
.../apache/bookkeeper/client/api/WriteFlag.java | 2 +
.../apache/bookkeeper/client/api/WriteHandle.java | 2 +-
.../org/apache/bookkeeper/proto/BookieClient.java | 25 +++
.../proto/BookkeeperInternalCallbacks.java | 7 +
.../bookkeeper/proto/PerChannelBookieClient.java | 87 ++++++++++
.../bookkeeper/bookie/BookieDeferredSyncTest.java | 191 +++++++++++++++++++++
.../apache/bookkeeper/client/BookKeeperTest.java | 17 ++
.../apache/bookkeeper/client/DeferredSyncTest.java | 181 ++++++++++++++++++-
.../apache/bookkeeper/client/ExplicitLacTest.java | 81 +++++++++
.../bookkeeper/client/MockBookKeeperTestCase.java | 85 +++++++--
21 files changed, 960 insertions(+), 28 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index c3a5728..44f970f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -117,6 +117,7 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
private OpStatsLogger readLacAndEntryOpLogger;
private OpStatsLogger readLacAndEntryRespLogger;
private OpStatsLogger addOpLogger;
+ private OpStatsLogger forceOpLogger;
private OpStatsLogger writeLacOpLogger;
private OpStatsLogger readLacOpLogger;
private OpStatsLogger recoverAddEntriesStats;
@@ -736,6 +737,20 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
throw new IllegalArgumentException("Unable to convert digest type " + digestType);
}
}
+ public org.apache.bookkeeper.client.api.DigestType toApiDigestType() {
+ switch (this) {
+ case MAC:
+ return org.apache.bookkeeper.client.api.DigestType.MAC;
+ case CRC32:
+ return org.apache.bookkeeper.client.api.DigestType.CRC32;
+ case CRC32C:
+ return org.apache.bookkeeper.client.api.DigestType.CRC32C;
+ case DUMMY:
+ return org.apache.bookkeeper.client.api.DigestType.DUMMY;
+ default:
+ throw new IllegalArgumentException("Unable to convert digest type " + this);
+ }
+ }
}
boolean shouldReorderReadSequence() {
@@ -1493,6 +1508,7 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
readLacAndEntryRespLogger = stats.getOpStatsLogger(
BookKeeperClientStats.READ_LAST_CONFIRMED_AND_ENTRY_RESPONSE);
addOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.ADD_OP);
+ forceOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.FORCE_OP);
addOpUrCounter = stats.getCounter(BookKeeperClientStats.ADD_OP_UR);
writeLacOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.WRITE_LAC_OP);
readLacOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.READ_LAC_OP);
@@ -1526,6 +1542,9 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
OpStatsLogger getAddOpLogger() {
return addOpLogger;
}
+ OpStatsLogger getForceOpLogger() {
+ return forceOpLogger;
+ }
OpStatsLogger getWriteLacOpLogger() {
return writeLacOpLogger;
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
index 749ac9c..30d14f0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
@@ -41,6 +41,7 @@ public interface BookKeeperClientStats {
String ADD_OP = "ADD_ENTRY";
String ADD_OP_UR = "ADD_ENTRY_UR"; // Under Replicated during AddEntry.
+ String FORCE_OP = "FORCE"; // Number of force ledger operations
String READ_OP = "READ_ENTRY";
// Corrupted entry (Digest Mismatch/ Under Replication) detected during ReadEntry
String READ_OP_DM = "READ_ENTRY_DM";
@@ -64,7 +65,9 @@ public interface BookKeeperClientStats {
String CHANNEL_ADD_OP = "ADD_ENTRY";
String CHANNEL_TIMEOUT_ADD = "TIMEOUT_ADD_ENTRY";
String CHANNEL_WRITE_LAC_OP = "WRITE_LAC";
+ String CHANNEL_FORCE_OP = "FORCE";
String CHANNEL_TIMEOUT_WRITE_LAC = "TIMEOUT_WRITE_LAC";
+ String CHANNEL_TIMEOUT_FORCE = "TIMEOUT_FORCE";
String CHANNEL_READ_LAC_OP = "READ_LAC";
String CHANNEL_TIMEOUT_READ_LAC = "TIMEOUT_READ_LAC";
String TIMEOUT_GET_BOOKIE_INFO = "TIMEOUT_GET_BOOKIE_INFO";
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
index 2bd2a99..d53129d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
@@ -142,12 +142,12 @@ public interface DistributionSchedule {
WriteSet getWriteSet(long entryId);
/**
- * Return the set of bookies indices to send the messages to for longpoll reads.
+ * Return the set of bookies indices to send the messages to the whole ensemble.
*
- * @param entryId expected next entry id to read.
- * @return the set of bookies indices to read from.
+ * @param entryId entry id used to calculate the ensemble.
+ * @return the set of bookies indices to send the request.
*/
- WriteSet getWriteSetForLongPoll(long entryId);
+ WriteSet getEnsembleSet(long entryId);
/**
* An ack set represents the set of bookies from which
@@ -197,6 +197,11 @@ public interface DistributionSchedule {
*/
AckSet getAckSet();
+ /**
+ * Returns an ackset object useful to wait for all bookies in the ensemble,
+ * responses should be checked against this.
+ */
+ AckSet getEnsembleAckSet();
/**
* Interface to keep track of which bookies in an ensemble, an action
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ForceLedgerOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ForceLedgerOp.java
new file mode 100644
index 0000000..cd60848
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ForceLedgerOp.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import static com.google.common.base.Preconditions.checkState;
+import java.util.ArrayList;
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ForceLedgerCallback;
+import org.apache.bookkeeper.util.SafeRunnable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This represents a request to sync the ledger on every bookie.
+ */
+class ForceLedgerOp extends SafeRunnable implements ForceLedgerCallback {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ForceLedgerOp.class);
+ final CompletableFuture<Void> cb;
+
+ DistributionSchedule.AckSet ackSet;
+ boolean completed = false;
+ boolean errored = false;
+ int lastSeenError = BKException.Code.WriteException;
+ ArrayList<BookieSocketAddress> currentEnsemble;
+
+ long currentNonDurableLastAddConfirmed = LedgerHandle.INVALID_ENTRY_ID;
+
+ final LedgerHandle lh;
+
+ ForceLedgerOp(LedgerHandle lh, CompletableFuture<Void> cb) {
+ this.lh = lh;
+ this.cb = cb;
+ }
+
+ void sendForceLedgerRequest(int bookieIndex) {
+ lh.bk.getBookieClient().forceLedger(currentEnsemble.get(bookieIndex), lh.ledgerId, this, bookieIndex);
+ }
+
+ @Override
+ public void safeRun() {
+ initiate();
+ }
+
+ void initiate() {
+
+ // capture currentNonDurableLastAddConfirmed
+ // remember that we are inside OrderedExecutor, this induces a strict ordering
+ // on the sequence of events
+ this.currentNonDurableLastAddConfirmed = lh.pendingAddsSequenceHead;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("force {} clientNonDurableLac {}", lh.ledgerId, currentNonDurableLastAddConfirmed);
+ }
+ // we need to send the request to every bookie in the ensamble
+ this.currentEnsemble = lh.metadata.currentEnsemble;
+ this.ackSet = lh.distributionSchedule.getEnsembleAckSet();
+
+ DistributionSchedule.WriteSet writeSet = lh.getDistributionSchedule()
+ .getEnsembleSet(currentNonDurableLastAddConfirmed);
+ try {
+ for (int i = 0; i < writeSet.size(); i++) {
+ sendForceLedgerRequest(writeSet.get(i));
+ }
+ } finally {
+ writeSet.recycle();
+ }
+ }
+
+ @Override
+ public void forceLedgerComplete(int rc, long ledgerId, BookieSocketAddress addr, Object ctx) {
+ int bookieIndex = (Integer) ctx;
+
+ checkState(!completed, "We are waiting for all the bookies, it is not expected an early exit");
+
+ if (errored) {
+ // already failed, do not fire error callbacks twice
+ return;
+ }
+
+ if (BKException.Code.OK != rc) {
+ lastSeenError = rc;
+ }
+
+ if (rc == BKException.Code.OK) {
+ if (ackSet.completeBookieAndCheck(bookieIndex)) {
+ completed = true;
+ // we are able to say that every bookie sync'd its own journal
+ // for every ackknowledged entry before issuing the force() call
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("After force on ledger {} updating LastAddConfirmed to {} ",
+ ledgerId, currentNonDurableLastAddConfirmed);
+ }
+ lh.updateLastConfirmed(currentNonDurableLastAddConfirmed, lh.getLength());
+ FutureUtils.complete(cb, null);
+ }
+ } else {
+ // at least one bookie failed, as we are waiting for all the bookies
+ // we can fail immediately
+ LOG.info("ForceLedger did not succeed: Ledger {} on {}", ledgerId, addr);
+ errored = true;
+
+ // notify the failure
+ FutureUtils.completeExceptionally(cb, BKException.create(lastSeenError));
+ }
+
+ }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index a2a37e9..a79a01e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -1109,6 +1109,68 @@ public class LedgerHandle implements WriteHandle {
}
/**
+ * {@inheritDoc}
+ */
+ @Override
+ public CompletableFuture<Void> force() {
+ CompletableFuture<Void> result = new CompletableFuture<>();
+ ForceLedgerOp op = new ForceLedgerOp(this, result);
+ boolean wasClosed = false;
+ synchronized (this) {
+ // synchronized on this to ensure that
+ // the ledger isn't closed between checking and
+ // updating lastAddPushed
+ if (metadata.isClosed()) {
+ wasClosed = true;
+ }
+ }
+
+ if (wasClosed) {
+ // make sure the callback is triggered in main worker pool
+ try {
+ bk.getMainWorkerPool().executeOrdered(ledgerId, new SafeRunnable() {
+ @Override
+ public void safeRun() {
+ LOG.warn("Force() attempted on a closed ledger: {}", ledgerId);
+ result.completeExceptionally(new BKException.BKLedgerClosedException());
+ }
+
+ @Override
+ public String toString() {
+ return String.format("force(lid=%d)", ledgerId);
+ }
+ });
+ } catch (RejectedExecutionException e) {
+ result.completeExceptionally(new BKException.BKInterruptedException());
+ }
+ return result;
+ }
+
+ // early exit: no write has been issued yet
+ if (pendingAddsSequenceHead == INVALID_ENTRY_ID) {
+ bk.getMainWorkerPool().executeOrdered(ledgerId, new SafeRunnable() {
+ @Override
+ public void safeRun() {
+ FutureUtils.complete(result, null);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("force(lid=%d)", ledgerId);
+ }
+ });
+ return result;
+ }
+
+ try {
+ bk.getMainWorkerPool().executeOrdered(ledgerId, op);
+ } catch (RejectedExecutionException e) {
+ result.completeExceptionally(new BKException.BKInterruptedException());
+ }
+ return result;
+ }
+
+ /**
* Make a recovery add entry request. Recovery adds can add to a ledger even
* if it has been fenced.
*
@@ -1229,7 +1291,7 @@ public class LedgerHandle implements WriteHandle {
if (wasClosed) {
// make sure the callback is triggered in main worker pool
try {
- bk.getMainWorkerPool().submit(new SafeRunnable() {
+ bk.getMainWorkerPool().executeOrdered(ledgerId, new SafeRunnable() {
@Override
public void safeRun() {
LOG.warn("Attempt to add to closed ledger: {}", ledgerId);
@@ -1799,6 +1861,14 @@ public class LedgerHandle implements WriteHandle {
}
return;
}
+ if (writeFlags.contains(WriteFlag.DEFERRED_SYNC)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cannot perform ensemble change with writeflags {}."
+ + "Failed bookies {} for ledger {}.",
+ writeFlags, delayedWriteFailedBookies, ledgerId);
+ }
+ return;
+ }
synchronized (metadata) {
try {
EnsembleInfo ensembleInfo = replaceBookieInMetadata(delayedWriteFailedBookies, curNumEnsembleChanges);
@@ -1830,6 +1900,17 @@ public class LedgerHandle implements WriteHandle {
return;
}
+ if (writeFlags.contains(WriteFlag.DEFERRED_SYNC)) {
+ blockAddCompletions.decrementAndGet();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cannot perform ensemble change with write flags {}. "
+ + "Failed bookies {} for ledger {}.",
+ writeFlags, failedBookies, ledgerId);
+ }
+ handleUnrecoverableErrorDuringAdd(WriteException);
+ return;
+ }
+
int curNumEnsembleChanges = numEnsembleChanges.incrementAndGet();
// when the ensemble changes are too frequent, close handle
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index 0153d44..3c3a1dd 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -284,7 +284,8 @@ class PendingAddOp extends SafeRunnable implements WriteCallback {
// Got an error after satisfying AQ. This means we are under replicated at the create itself.
// Update the stat to reflect it.
addOpUrCounter.inc();
- if (!lh.bk.getDisableEnsembleChangeFeature().isAvailable() && !lh.bk.delayEnsembleChange) {
+ if (!lh.bk.getDisableEnsembleChangeFeature().isAvailable()
+ && !lh.bk.delayEnsembleChange) {
lh.getDelayedWriteFailedBookies().putIfAbsent(bookieIndex, addr);
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
index 1b3fa70..1436990 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
@@ -81,7 +81,7 @@ class ReadLastConfirmedAndEntryOp implements BookkeeperInternalCallbacks.ReadEnt
ReadLACAndEntryRequest(ArrayList<BookieSocketAddress> ensemble, long lId, long eId) {
this.entryImpl = LedgerEntryImpl.create(lId, eId);
this.ensemble = ensemble;
- this.writeSet = lh.getDistributionSchedule().getWriteSetForLongPoll(eId);
+ this.writeSet = lh.getDistributionSchedule().getEnsembleSet(eId);
if (lh.getBk().shouldReorderReadSequence()) {
this.orderedEnsemble = lh.getBk().getPlacementPolicy().reorderReadLACSequence(ensemble,
lh.getBookiesHealthInfo(), writeSet.copy());
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
index e399b01..d079408 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
@@ -54,8 +54,8 @@ class RoundRobinDistributionSchedule implements DistributionSchedule {
}
@Override
- public WriteSet getWriteSetForLongPoll(long entryId) {
- // for long poll reads, we are trying all the bookies in the ensemble
+ public WriteSet getEnsembleSet(long entryId) {
+ // for long poll reads and force ledger , we are trying all the bookies in the ensemble
// so we create a `WriteSet` with `writeQuorumSize == ensembleSize`.
return WriteSetImpl.create(ensembleSize, ensembleSize /* writeQuorumSize */, entryId);
}
@@ -252,6 +252,11 @@ class RoundRobinDistributionSchedule implements DistributionSchedule {
return AckSetImpl.create(ensembleSize, writeQuorumSize, ackQuorumSize);
}
+ @Override
+ public AckSet getEnsembleAckSet() {
+ return AckSetImpl.create(ensembleSize, ensembleSize, ensembleSize);
+ }
+
private static class AckSetImpl implements AckSet {
private int writeQuorumSize;
private int ackQuorumSize;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BKException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BKException.java
index b81f33d..09bb8f3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BKException.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BKException.java
@@ -97,7 +97,7 @@ public class BKException extends Exception {
case Code.QuorumException:
return "Invalid quorum size on ensemble size";
case Code.NoBookieAvailableException:
- return "Invalid quorum size on ensemble size";
+ return "No bookie available";
case Code.DigestNotInitializedException:
return "Digest engine not initialized";
case Code.DigestMatchException:
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ForceableHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ForceableHandle.java
new file mode 100644
index 0000000..48d7acb
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ForceableHandle.java
@@ -0,0 +1,50 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client.api;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public;
+import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
+
+/**
+ * Provide the ability to enforce durability guarantees to the writer.
+ *
+ * @see WriteAdvHandle
+ * @see WriteHandle
+ *
+ * @since 4.8
+ */
+@Public
+@Unstable
+public interface ForceableHandle {
+
+ /**
+ * Enforce durability to the entries written by this handle.
+ * <p>This API is useful with {@link WriteFlag#DEFERRED_SYNC}, because with
+ * that flag writes are acknowledged by the bookie without waiting for a
+ * durable write
+ * </p>
+ *
+ * @return an handle to the result
+ */
+ CompletableFuture<Void> force();
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteAdvHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteAdvHandle.java
index 37f45b9..c24c6d0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteAdvHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteAdvHandle.java
@@ -39,7 +39,7 @@ import org.apache.bookkeeper.common.concurrent.FutureUtils;
*/
@Public
@Unstable
-public interface WriteAdvHandle extends ReadHandle {
+public interface WriteAdvHandle extends ReadHandle, ForceableHandle {
/**
* Add entry asynchronously to an open ledger.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteFlag.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteFlag.java
index 30199c2..6914abe 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteFlag.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteFlag.java
@@ -32,6 +32,8 @@ public enum WriteFlag {
/**
* Writes will be acknowledged after writing to the filesystem
* but not yet been persisted to disks.
+ *
+ * @see ForceableHandle#force()
*/
DEFERRED_SYNC(0x1 << 0);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteHandle.java
index b2c0459..edad5f4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteHandle.java
@@ -38,7 +38,7 @@ import org.apache.bookkeeper.common.concurrent.FutureUtils;
*/
@Public
@Unstable
-public interface WriteHandle extends ReadHandle {
+public interface WriteHandle extends ReadHandle, ForceableHandle {
/**
* Add entry asynchronously to an open ledger.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index 3dd837c..197483d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -55,6 +55,7 @@ import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.SafeRunnable;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ForceLedgerCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
@@ -193,6 +194,30 @@ public class BookieClient implements PerChannelBookieClientFactory {
return clientPool;
}
+ public void forceLedger(final BookieSocketAddress addr, final long ledgerId,
+ final ForceLedgerCallback cb, final Object ctx) {
+ final PerChannelBookieClientPool client = lookupClient(addr);
+ if (client == null) {
+ cb.forceLedgerComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
+ ledgerId, addr, ctx);
+ return;
+ }
+
+ client.obtain((rc, pcbc) -> {
+ if (rc != BKException.Code.OK) {
+ try {
+ executor.executeOrdered(ledgerId, safeRun(() -> {
+ cb.forceLedgerComplete(rc, ledgerId, addr, ctx);
+ }));
+ } catch (RejectedExecutionException re) {
+ cb.forceLedgerComplete(getRc(BKException.Code.InterruptedException), ledgerId, addr, ctx);
+ }
+ } else {
+ pcbc.forceLedger(ledgerId, cb, ctx);
+ }
+ }, ledgerId);
+ }
+
public void writeLac(final BookieSocketAddress addr, final long ledgerId, final byte[] masterKey,
final long lac, final ByteBufList toSend, final WriteLacCallback cb, final Object ctx) {
final PerChannelBookieClientPool client = lookupClient(addr);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
index 04a4546..cd87ff1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
@@ -91,6 +91,13 @@ public class BookkeeperInternalCallbacks {
}
/**
+ * Force callback interface.
+ */
+ public interface ForceLedgerCallback {
+ void forceLedgerComplete(int rc, long ledgerId, BookieSocketAddress addr, Object ctx);
+ }
+
+ /**
* A callback interface for a STARTTLS command.
*/
public interface StartTLSCallback {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 39436d4..51c875b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -92,6 +92,7 @@ import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ForceLedgerCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
@@ -103,6 +104,8 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
import org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse;
import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ForceLedgerRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ForceLedgerResponse;
import org.apache.bookkeeper.proto.BookkeeperProtocol.GetBookieInfoRequest;
import org.apache.bookkeeper.proto.BookkeeperProtocol.GetBookieInfoResponse;
import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType;
@@ -174,9 +177,11 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
private final OpStatsLogger readTimeoutOpLogger;
private final OpStatsLogger addEntryOpLogger;
private final OpStatsLogger writeLacOpLogger;
+ private final OpStatsLogger forceLedgerOpLogger;
private final OpStatsLogger readLacOpLogger;
private final OpStatsLogger addTimeoutOpLogger;
private final OpStatsLogger writeLacTimeoutOpLogger;
+ private final OpStatsLogger forceLedgerTimeoutOpLogger;
private final OpStatsLogger readLacTimeoutOpLogger;
private final OpStatsLogger getBookieInfoOpLogger;
private final OpStatsLogger getBookieInfoTimeoutOpLogger;
@@ -281,11 +286,13 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
readEntryOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_READ_OP);
addEntryOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_ADD_OP);
writeLacOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_WRITE_LAC_OP);
+ forceLedgerOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_FORCE_OP);
readLacOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_READ_LAC_OP);
getBookieInfoOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.GET_BOOKIE_INFO_OP);
readTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_READ);
addTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_ADD);
writeLacTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_WRITE_LAC);
+ forceLedgerTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_FORCE);
readLacTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_READ_LAC);
getBookieInfoTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.TIMEOUT_GET_BOOKIE_INFO);
startTLSOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_START_TLS_OP);
@@ -566,6 +573,37 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
writeAndFlush(channel, completionKey, writeLacRequest);
}
+ void forceLedger(final long ledgerId, ForceLedgerCallback cb, Object ctx) {
+ if (useV2WireProtocol) {
+ LOG.error("force is not allowed with v2 protocol");
+ executor.executeOrdered(ledgerId, () -> {
+ cb.forceLedgerComplete(BKException.Code.IllegalOpException, ledgerId, addr, ctx);
+ });
+ return;
+ }
+ final long txnId = getTxnId();
+ final CompletionKey completionKey = new V3CompletionKey(txnId,
+ OperationType.FORCE_LEDGER);
+ // force is mostly like addEntry hence uses addEntryTimeout
+ completionObjects.put(completionKey,
+ new ForceLedgerCompletion(completionKey, cb,
+ ctx, ledgerId));
+
+ // Build the request
+ BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
+ .setVersion(ProtocolVersion.VERSION_THREE)
+ .setOperation(OperationType.FORCE_LEDGER)
+ .setTxnId(txnId);
+ ForceLedgerRequest.Builder writeLacBuilder = ForceLedgerRequest.newBuilder()
+ .setLedgerId(ledgerId);
+
+ final Request forceLedgerRequest = Request.newBuilder()
+ .setHeader(headerBuilder)
+ .setForceLedgerRequest(writeLacBuilder)
+ .build();
+ writeAndFlush(channel, completionKey, forceLedgerRequest);
+ }
+
/**
* This method should be called only after connection has been checked for
* {@link #connectIfNeededAndDoOp(GenericCallback)}.
@@ -1553,6 +1591,55 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
}
}
+ class ForceLedgerCompletion extends CompletionValue {
+ final ForceLedgerCallback cb;
+
+ public ForceLedgerCompletion(final CompletionKey key,
+ final ForceLedgerCallback originalCallback,
+ final Object originalCtx,
+ final long ledgerId) {
+ super("ForceLedger",
+ originalCtx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED,
+ forceLedgerOpLogger, forceLedgerTimeoutOpLogger);
+ this.cb = new ForceLedgerCallback() {
+ @Override
+ public void forceLedgerComplete(int rc, long ledgerId,
+ BookieSocketAddress addr,
+ Object ctx) {
+ logOpResult(rc);
+ originalCallback.forceLedgerComplete(rc, ledgerId,
+ addr, originalCtx);
+ key.release();
+ }
+ };
+ }
+
+ @Override
+ public void errorOut() {
+ errorOut(BKException.Code.BookieHandleNotAvailableException);
+ }
+
+ @Override
+ public void errorOut(final int rc) {
+ errorOutAndRunCallback(
+ () -> cb.forceLedgerComplete(rc, ledgerId, addr, ctx));
+ }
+
+ @Override
+ public void handleV3Response(BookkeeperProtocol.Response response) {
+ ForceLedgerResponse forceLedgerResponse = response.getForceLedgerResponse();
+ StatusCode status = response.getStatus() == StatusCode.EOK
+ ? forceLedgerResponse.getStatus() : response.getStatus();
+ long ledgerId = forceLedgerResponse.getLedgerId();
+
+ if (LOG.isDebugEnabled()) {
+ logResponse(status, "ledger", ledgerId);
+ }
+ int rc = convertStatus(status, BKException.Code.WriteException);
+ cb.forceLedgerComplete(rc, ledgerId, addr, ctx);
+ }
+ }
+
// visible for testing
class ReadLacCompletion extends CompletionValue {
final ReadLacCallback cb;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieDeferredSyncTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieDeferredSyncTest.java
new file mode 100644
index 0000000..cce445e
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieDeferredSyncTest.java
@@ -0,0 +1,191 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+import static org.junit.Assert.assertEquals;
+
+import java.util.EnumSet;
+import org.apache.bookkeeper.client.api.DigestType;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.api.WriteFlag;
+import org.apache.bookkeeper.client.api.WriteHandle;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Test;
+
+/**
+ * Test the bookie journal without sync, driven by client with
+ * {@link WriteFlag#DEFERRED_SYNC} write flag.
+ */
+public class BookieDeferredSyncTest extends BookKeeperClusterTestCase {
+
+ public BookieDeferredSyncTest() {
+ super(1);
+ }
+
+ @Test
+ public void testWriteAndRecovery() throws Exception {
+ // this WriteHandle will not be closed
+ WriteHandle lh = result(bkc.newCreateLedgerOp()
+ .withEnsembleSize(1)
+ .withWriteQuorumSize(1)
+ .withAckQuorumSize(1)
+ .withWriteFlags(WriteFlag.DEFERRED_SYNC)
+ .withDigestType(DigestType.CRC32C)
+ .withPassword(new byte[0])
+ .execute());
+
+ int n = 10;
+
+ long ledgerId = lh.getId();
+
+ for (int i = 0; i < n; i++) {
+ lh.append(("entry-" + i).getBytes(UTF_8));
+ }
+
+ try (ReadHandle readLh = result(bkc.newOpenLedgerOp()
+ .withLedgerId(ledgerId)
+ .withRecovery(true)
+ .withPassword(new byte[0])
+ .execute());) {
+
+ try (LedgerEntries entries = readLh.read(0, n - 1)) {
+ for (int i = 0; i < n; i++) {
+ org.apache.bookkeeper.client.api.LedgerEntry entry = entries.getEntry(i);
+ assertEquals("entry-" + i, new String(entry.getEntryBytes()));
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testCloseNoForce() throws Exception {
+ testClose(true);
+ }
+
+ @Test
+ public void testCloseWithForce() throws Exception {
+ testClose(false);
+ }
+
+ private void testClose(boolean force) throws Exception {
+ final int n = 10;
+ long ledgerId;
+ try (WriteHandle lh = result(bkc.newCreateLedgerOp()
+ .withEnsembleSize(1)
+ .withWriteQuorumSize(1)
+ .withAckQuorumSize(1)
+ .withWriteFlags(WriteFlag.DEFERRED_SYNC)
+ .withDigestType(DigestType.CRC32C)
+ .withPassword(new byte[0])
+ .execute())) {
+
+ ledgerId = lh.getId();
+ for (int i = 0; i < n; i++) {
+ lh.append(("entry-" + i).getBytes(UTF_8));
+ } if (force) {
+ // with force() LastAddConfirmed is updated
+ result(lh.force());
+ // on close metadata will have LastAddConfirmed = n - 1
+ assertEquals(n - 1, lh.getLastAddConfirmed());
+ } else {
+ // on close metadata will have LastAddConfirmed = -1
+ assertEquals(-1, lh.getLastAddConfirmed());
+ }
+ }
+
+ if (force) {
+ // the reader will be able to read
+ try (ReadHandle readLh = result(bkc.newOpenLedgerOp()
+ .withLedgerId(ledgerId)
+ .withRecovery(true)
+ .withPassword(new byte[0])
+ .execute());) {
+
+ try (LedgerEntries entries = readLh.read(0, n - 1)) {
+ for (int i = 0; i < n; i++) {
+ LedgerEntry entry = entries.getEntry(i);
+ assertEquals("entry-" + i, new String(entry.getEntryBytes()));
+ }
+ }
+
+ try (LedgerEntries entries = readLh.readUnconfirmed(0, n - 1)) {
+ for (int i = 0; i < n; i++) {
+ LedgerEntry entry = entries.getEntry(i);
+ assertEquals("entry-" + i, new String(entry.getEntryBytes()));
+ }
+ }
+ }
+ } else {
+ // reader will see LastAddConfirmed = -1
+ try (ReadHandle readLh = result(bkc.newOpenLedgerOp()
+ .withLedgerId(ledgerId)
+ .withRecovery(true)
+ .withPassword(new byte[0])
+ .execute());) {
+ assertEquals(-1, readLh.getLastAddConfirmed());
+
+ // entry will be readable with readUnconfirmed
+ try (LedgerEntries entries = readLh.readUnconfirmed(0, n - 1)) {
+ for (int i = 0; i < n; i++) {
+ LedgerEntry entry = entries.getEntry(i);
+ assertEquals("entry-" + i, new String(entry.getEntryBytes()));
+ }
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testForceWithDeferredSyncWriteFlags() throws Exception {
+ testForce(EnumSet.of(WriteFlag.DEFERRED_SYNC));
+ }
+
+ @Test
+ public void testForceNoWriteFlag() throws Exception {
+ // force API will work even without DEFERRED_SYNC flag
+ testForce(WriteFlag.NONE);
+ }
+
+ private void testForce(EnumSet<WriteFlag> writeFlags) throws Exception {
+ try (WriteHandle lh = result(bkc.newCreateLedgerOp()
+ .withEnsembleSize(1)
+ .withWriteQuorumSize(1)
+ .withAckQuorumSize(1)
+ .withWriteFlags(writeFlags)
+ .withDigestType(DigestType.CRC32C)
+ .withPassword(new byte[0])
+ .execute());) {
+ int n = 10;
+ for (int i = 0; i < n; i++) {
+ lh.append(("entry-" + i).getBytes(UTF_8));
+ }
+ result(lh.force());
+ assertEquals(n - 1, lh.getLastAddConfirmed());
+
+ lh.close();
+ }
+ }
+
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
index f1521f4..1a22e61 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
@@ -801,4 +801,21 @@ public class BookKeeperTest extends BookKeeperClusterTestCase {
}
}
+ @Test(expected = BKIllegalOpException.class)
+ public void testCannotUseForceOnV2Protocol() throws Exception {
+ ClientConfiguration conf = new ClientConfiguration(baseClientConf);
+ conf.setUseV2WireProtocol(true);
+ try (BookKeeperTestClient bkc = new BookKeeperTestClient(conf);) {
+ try (WriteHandle wh = result(bkc.newCreateLedgerOp()
+ .withEnsembleSize(3)
+ .withWriteQuorumSize(3)
+ .withAckQuorumSize(2)
+ .withPassword("".getBytes())
+ .withWriteFlags(WriteFlag.NONE)
+ .execute())) {
+ result(wh.appendAsync("".getBytes()));
+ result(wh.force());
+ }
+ }
+ }
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/DeferredSyncTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/DeferredSyncTest.java
index 4bd5a8c..aaa7645 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/DeferredSyncTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/DeferredSyncTest.java
@@ -19,11 +19,16 @@ package org.apache.bookkeeper.client;
import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.client.api.WriteAdvHandle;
import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.client.api.WriteHandle;
+import org.apache.bookkeeper.net.BookieSocketAddress;
import org.junit.Test;
/**
@@ -49,10 +54,180 @@ public class DeferredSyncTest extends MockBookKeeperTestCase {
}
long lastEntryID = result(wh.appendAsync(DATA));
assertEquals(NUM_ENTRIES - 1, lastEntryID);
+ assertEquals(NUM_ENTRIES - 1, wh.getLastAddPushed());
+ assertEquals(-1, wh.getLastAddConfirmed());
+ }
+ }
+
+ @Test
+ public void testAddEntryLastAddConfirmedAdvanceWithForce() throws Exception {
+ try (WriteHandle wh = result(newCreateLedgerOp()
+ .withEnsembleSize(3)
+ .withWriteQuorumSize(3)
+ .withAckQuorumSize(2)
+ .withPassword(PASSWORD)
+ .withWriteFlags(WriteFlag.DEFERRED_SYNC)
+ .execute())) {
+ for (int i = 0; i < NUM_ENTRIES - 1; i++) {
+ result(wh.appendAsync(DATA));
+ }
+ long lastEntryID = result(wh.appendAsync(DATA));
+ assertEquals(NUM_ENTRIES - 1, lastEntryID);
+ assertEquals(NUM_ENTRIES - 1, wh.getLastAddPushed());
+ assertEquals(-1, wh.getLastAddConfirmed());
+ result(wh.force());
+ assertEquals(NUM_ENTRIES - 1, wh.getLastAddConfirmed());
+ }
+ }
+
+ @Test
+ public void testForceOnWriteAdvHandle() throws Exception {
+ try (WriteAdvHandle wh = result(newCreateLedgerOp()
+ .withEnsembleSize(3)
+ .withWriteQuorumSize(3)
+ .withAckQuorumSize(2)
+ .withPassword(PASSWORD)
+ .withWriteFlags(WriteFlag.DEFERRED_SYNC)
+ .makeAdv()
+ .execute())) {
+ CompletableFuture<Long> w0 = wh.writeAsync(0, DATA);
+ CompletableFuture<Long> w2 = wh.writeAsync(2, DATA);
+ CompletableFuture<Long> w3 = wh.writeAsync(3, DATA);
+ result(w0);
+ result(wh.force());
+ assertEquals(0, wh.getLastAddConfirmed());
+ CompletableFuture<Long> w1 = wh.writeAsync(1, DATA);
+ result(w3);
+ assertTrue(w1.isDone());
+ assertTrue(w2.isDone());
+ CompletableFuture<Long> w5 = wh.writeAsync(5, DATA);
+ result(wh.force());
+ assertEquals(3, wh.getLastAddConfirmed());
+ wh.writeAsync(4, DATA);
+ result(w5);
+ result(wh.force());
+ assertEquals(5, wh.getLastAddConfirmed());
+ }
+ }
+
+ @Test
+ public void testForceRequiresFullEnsemble() throws Exception {
+ try (WriteHandle wh = result(newCreateLedgerOp()
+ .withEnsembleSize(3)
+ .withWriteQuorumSize(2)
+ .withAckQuorumSize(2)
+ .withPassword(PASSWORD)
+ .withWriteFlags(WriteFlag.DEFERRED_SYNC)
+ .execute())) {
+ for (int i = 0; i < NUM_ENTRIES - 1; i++) {
+ result(wh.appendAsync(DATA));
+ }
+ long lastEntryID = result(wh.appendAsync(DATA));
+ assertEquals(NUM_ENTRIES - 1, lastEntryID);
+ assertEquals(NUM_ENTRIES - 1, wh.getLastAddPushed());
+ assertEquals(-1, wh.getLastAddConfirmed());
+
+ BookieSocketAddress bookieAddress = wh.getLedgerMetadata().getEnsembleAt(wh.getLastAddPushed()).get(0);
+ killBookie(bookieAddress);
+
+ // write should succeed (we still have 2 bookies out of 3)
+ result(wh.appendAsync(DATA));
+
+ // force cannot go, it must be acknowledged by all of the bookies in the ensamble
+ try {
+ result(wh.force());
+ } catch (BKException.BKBookieException failed) {
+ }
+ // bookie comes up again, force must succeed
+ startKilledBookie(bookieAddress);
+ result(wh.force());
+ }
+ }
+
+ @Test
+ public void testForceWillAdvanceLacOnlyUpToLastAcknoledgedWrite() throws Exception {
+ try (WriteHandle wh = result(newCreateLedgerOp()
+ .withEnsembleSize(3)
+ .withWriteQuorumSize(3)
+ .withAckQuorumSize(3)
+ .withPassword(PASSWORD)
+ .withWriteFlags(WriteFlag.DEFERRED_SYNC)
+ .execute())) {
+ for (int i = 0; i < NUM_ENTRIES - 1; i++) {
+ result(wh.appendAsync(DATA));
+ }
+ long lastEntryIdBeforeSuspend = result(wh.appendAsync(DATA));
+ assertEquals(NUM_ENTRIES - 1, lastEntryIdBeforeSuspend);
+ assertEquals(-1, wh.getLastAddConfirmed());
+
+ // one bookie will stop sending acks for forceLedger
+ BookieSocketAddress bookieAddress = wh.getLedgerMetadata().getEnsembleAt(wh.getLastAddPushed()).get(0);
+ suspendBookieForceLedgerAcks(bookieAddress);
+
+ // start and complete a force, lastAddConfirmed cannot be "lastAddPushedAfterSuspendedWrite"
+ // because the write has not yet been acknowledged by AckQuorumSize Bookies
+ CompletableFuture<?> forceResult = wh.force();
+ assertEquals(-1, wh.getLastAddConfirmed());
+
+ // send an entry and receive ack
+ long lastEntry = wh.append(DATA);
+
+ // receive the ack for forceLedger
+ resumeBookieWriteAcks(bookieAddress);
+ result(forceResult);
+
+ // now LastAddConfirmed will be equals to the last confirmed entry
+ // before force() started
+ assertEquals(lastEntryIdBeforeSuspend, wh.getLastAddConfirmed());
+
+ result(wh.force());
+ assertEquals(lastEntry, wh.getLastAddConfirmed());
+ }
+ }
+
+ @Test
+ public void testForbiddenEnsembleChange() throws Exception {
+ try (WriteHandle wh = result(newCreateLedgerOp()
+ .withEnsembleSize(1)
+ .withWriteQuorumSize(1)
+ .withAckQuorumSize(1)
+ .withPassword(PASSWORD)
+ .withWriteFlags(WriteFlag.DEFERRED_SYNC)
+ .execute())) {
+ for (int i = 0; i < NUM_ENTRIES - 1; i++) {
+ wh.append(DATA);
+ }
+
+ assertEquals(1, availableBookies.size());
+ // kill the only bookie in the ensamble
+ killBookie(wh.getLedgerMetadata().getEnsembleAt(wh.getLastAddPushed()).get(0));
+ assertEquals(0, availableBookies.size());
+ startNewBookie();
+ assertEquals(1, availableBookies.size());
+
+ try {
+ // we cannot switch to the new bookie with DEFERRED_SYNC
+ wh.append(DATA);
+ fail("since ensemble change is disable we cannot be able to write any more");
+ } catch (BKException.BKWriteException ex) {
+ // expected
+ }
LedgerHandle lh = (LedgerHandle) wh;
- assertEquals(NUM_ENTRIES - 1, lh.getLastAddPushed());
- assertEquals(-1, lh.getLastAddConfirmed());
+ assertTrue(lh.getDelayedWriteFailedBookies().isEmpty());
}
}
-}
\ No newline at end of file
+ @Test(expected = BKException.BKLedgerClosedException.class)
+ public void testCannotIssueForceOnClosedLedgerHandle() throws Exception {
+ WriteHandle wh = result(newCreateLedgerOp()
+ .withEnsembleSize(1)
+ .withWriteQuorumSize(1)
+ .withAckQuorumSize(1)
+ .withPassword(PASSWORD)
+ .withWriteFlags(WriteFlag.DEFERRED_SYNC)
+ .execute());
+ wh.close();
+ result(wh.force());
+ }
+
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ExplicitLacTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ExplicitLacTest.java
index fd661c2..256f051 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ExplicitLacTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ExplicitLacTest.java
@@ -29,6 +29,7 @@ import org.apache.bookkeeper.bookie.LedgerStorage;
import org.apache.bookkeeper.bookie.SortedLedgerStorage;
import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.util.TestUtils;
@@ -190,5 +191,85 @@ public class ExplicitLacTest extends BookKeeperClusterTestCase {
bkcWithExplicitLAC.close();
}
+ @Test
+ public void testReadHandleWithExplicitLACAndDeferredSync() throws Exception {
+ ClientConfiguration confWithExplicitLAC = new ClientConfiguration();
+ confWithExplicitLAC.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+ int explicitLacIntervalMillis = 1000;
+ confWithExplicitLAC.setExplictLacInterval(explicitLacIntervalMillis);
+
+ BookKeeper bkcWithExplicitLAC = new BookKeeper(confWithExplicitLAC);
+
+ LedgerHandle wlh = (LedgerHandle) bkcWithExplicitLAC.newCreateLedgerOp()
+ .withEnsembleSize(1)
+ .withWriteQuorumSize(1)
+ .withAckQuorumSize(1)
+ .withWriteFlags(WriteFlag.DEFERRED_SYNC)
+ .withDigestType(digestType.toApiDigestType())
+ .withPassword("testPasswd".getBytes())
+ .execute()
+ .get();
+ long ledgerId = wlh.getId();
+
+ // start like testReadHandleWithExplicitLAC
+ int numOfEntries = 5;
+ for (int i = 0; i < numOfEntries; i++) {
+ // if you perform force() + addEntry() you will piggy back LAC as usual
+ wlh.force().get();
+ wlh.addEntry(("foobar" + i).getBytes());
+ }
+
+ LedgerHandle rlh = bkcWithExplicitLAC.openLedgerNoRecovery(ledgerId, digestType, "testPasswd".getBytes());
+
+ assertTrue(
+ "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(),
+ (rlh.getLastAddConfirmed() == (numOfEntries - 2)));
+
+ for (int i = numOfEntries; i < 2 * numOfEntries; i++) {
+ wlh.addEntry(("foobar" + i).getBytes());
+ }
+
+ // running a force() will update local LAC on the writer
+ // ExplicitLAC timer will send the value even without writes
+ wlh.force().get();
+
+ // wait for explicit lac to be sent to bookies
+ TestUtils.waitUntilExplicitLacUpdated(rlh, 2 * numOfEntries - 2);
+
+ // we need to wait for atleast 2 explicitlacintervals,
+ // since in writehandle for the first call
+ // lh.getExplicitLastAddConfirmed() will be <
+ // lh.getPiggyBackedLastAddConfirmed(),
+ // so it wont make explicit writelac in the first run
+ TestUtils.waitUntilLacUpdated(rlh, 2 * numOfEntries - 2);
+
+ assertTrue(
+ "Expected LAC of wlh: " + (2 * numOfEntries - 1) + " actual LAC of wlh: " + wlh.getLastAddConfirmed(),
+ (wlh.getLastAddConfirmed() == (2 * numOfEntries - 1)));
+
+ long explicitlac = TestUtils.waitUntilExplicitLacUpdated(rlh, 2 * numOfEntries - 1);
+ assertTrue("Expected Explicit LAC of rlh: " + (2 * numOfEntries - 1)
+ + " actual ExplicitLAC of rlh: " + explicitlac,
+ (explicitlac == (2 * numOfEntries - 1)));
+ // readExplicitLastConfirmed updates the lac of rlh.
+ assertTrue(
+ "Expected LAC of rlh: " + (2 * numOfEntries - 1) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(),
+ (rlh.getLastAddConfirmed() == (2 * numOfEntries - 1)));
+
+ Enumeration<LedgerEntry> entries = rlh.readEntries(numOfEntries, 2 * numOfEntries - 1);
+ int entryId = numOfEntries;
+ while (entries.hasMoreElements()) {
+ LedgerEntry entry = entries.nextElement();
+ String entryString = new String(entry.getEntry());
+ assertTrue("Expected entry String: " + ("foobar" + entryId) + " actual entry String: " + entryString,
+ entryString.equals("foobar" + entryId));
+ entryId++;
+ }
+
+ rlh.close();
+ wlh.close();
+ bkcWithExplicitLAC.close();
+ }
+
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
index b853614..6e1557a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
@@ -17,6 +17,7 @@
*/
package org.apache.bookkeeper.client;
+import static com.google.common.base.Preconditions.checkState;
import static org.apache.bookkeeper.client.api.BKException.Code.NoBookieAvailableException;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
@@ -36,6 +37,7 @@ import io.netty.buffer.Unpooled;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
@@ -44,7 +46,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -95,6 +97,9 @@ public abstract class MockBookKeeperTestCase {
protected ConcurrentSkipListSet<Long> fencedLedgers;
protected ConcurrentMap<Long, Map<BookieSocketAddress, Map<Long, MockEntry>>> mockLedgerData;
+ private Map<BookieSocketAddress, List<Runnable>> deferredBookieForceLedgerResponses;
+ private Set<BookieSocketAddress> suspendedBookiesForForceLedgerAcks;
+
List<BookieSocketAddress> failedBookies;
Set<BookieSocketAddress> availableBookies;
private int lastIndexForBK;
@@ -129,6 +134,8 @@ public abstract class MockBookKeeperTestCase {
@Before
public void setup() throws Exception {
+ deferredBookieForceLedgerResponses = new ConcurrentHashMap<>();
+ suspendedBookiesForForceLedgerAcks = Collections.synchronizedSet(new HashSet<>());
mockLedgerMetadataRegistry = new ConcurrentHashMap<>();
mockLedgerData = new ConcurrentHashMap<>();
mockNextLedgerId = new AtomicLong(1);
@@ -174,6 +181,7 @@ public abstract class MockBookKeeperTestCase {
setupBookieWatcherForEnsembleChange();
setupBookieClientReadEntry();
setupBookieClientAddEntry();
+ setupBookieClientForceLedger();
}
protected void mockBookKeeperGetConf(ClientConfiguration conf) {
@@ -235,6 +243,25 @@ public abstract class MockBookKeeperTestCase {
availableBookies.remove(killedBookieSocketAddress);
}
+ protected void startKilledBookie(BookieSocketAddress killedBookieSocketAddress) {
+ checkState(failedBookies.contains(killedBookieSocketAddress));
+ checkState(!availableBookies.contains(killedBookieSocketAddress));
+ failedBookies.remove(killedBookieSocketAddress);
+ availableBookies.add(killedBookieSocketAddress);
+ }
+
+ protected void suspendBookieForceLedgerAcks(BookieSocketAddress address) {
+ suspendedBookiesForForceLedgerAcks.add(address);
+ }
+
+ protected void resumeBookieWriteAcks(BookieSocketAddress address) {
+ suspendedBookiesForForceLedgerAcks.remove(address);
+ List<Runnable> pendingResponses = deferredBookieForceLedgerResponses.remove(address);
+ if (pendingResponses != null) {
+ pendingResponses.forEach(Runnable::run);
+ }
+ }
+
protected BookieSocketAddress startNewBookie() {
BookieSocketAddress address = generateBookieSocketAddress(lastIndexForBK++);
availableBookies.add(address);
@@ -287,13 +314,6 @@ public abstract class MockBookKeeperTestCase {
}
});
}
- private void submit(Runnable operation) {
- try {
- scheduler.submit(operation);
- } catch (RejectedExecutionException rejected) {
- operation.run();
- }
- }
protected void registerMockEntryForRead(long ledgerId, long entryId, BookieSocketAddress bookieSocketAddress,
byte[] entryData, long lastAddConfirmed) {
@@ -496,7 +516,8 @@ public abstract class MockBookKeeperTestCase {
try {
entry = extractEntryPayload(ledgerId, entryId, toSend);
} catch (BKDigestMatchException e) {
- callback.writeComplete(Code.DigestMatchException, ledgerId, entryId, bookieSocketAddress, ctx);
+ callback.writeComplete(Code.DigestMatchException,
+ ledgerId, entryId, bookieSocketAddress, ctx);
return;
}
boolean fenced = fencedLedgers.contains(ledgerId);
@@ -505,17 +526,20 @@ public abstract class MockBookKeeperTestCase {
ledgerId, entryId, bookieSocketAddress, ctx);
} else {
if (failedBookies.contains(bookieSocketAddress)) {
- callback.writeComplete(NoBookieAvailableException, ledgerId, entryId, bookieSocketAddress, ctx);
+ callback.writeComplete(NoBookieAvailableException,
+ ledgerId, entryId, bookieSocketAddress, ctx);
return;
}
if (getMockLedgerContentsInBookie(ledgerId, bookieSocketAddress).isEmpty()) {
- registerMockEntryForRead(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, bookieSocketAddress,
- new byte[0], BookieProtocol.INVALID_ENTRY_ID);
+ registerMockEntryForRead(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED,
+ bookieSocketAddress, new byte[0], BookieProtocol.INVALID_ENTRY_ID);
}
registerMockEntryForRead(ledgerId, entryId, bookieSocketAddress, entry, ledgerId);
- callback.writeComplete(BKException.Code.OK, ledgerId, entryId, bookieSocketAddress, ctx);
+ callback.writeComplete(BKException.Code.OK,
+ ledgerId, entryId, bookieSocketAddress, ctx);
}
});
+
return null;
});
@@ -526,4 +550,39 @@ public abstract class MockBookKeeperTestCase {
any(), anyInt(), anyBoolean(), any(EnumSet.class));
}
+ @SuppressWarnings("unchecked")
+ protected void setupBookieClientForceLedger() {
+ final Stubber stub = doAnswer(invokation -> {
+ Object[] args = invokation.getArguments();
+ BookieSocketAddress bookieSocketAddress = (BookieSocketAddress) args[0];
+ long ledgerId = (Long) args[1];
+ BookkeeperInternalCallbacks.ForceLedgerCallback callback =
+ (BookkeeperInternalCallbacks.ForceLedgerCallback) args[2];
+ Object ctx = args[3];
+
+ Runnable activity = () -> {
+ executor.executeOrdered(ledgerId, () -> {
+ if (failedBookies.contains(bookieSocketAddress)) {
+ callback.forceLedgerComplete(NoBookieAvailableException, ledgerId, bookieSocketAddress, ctx);
+ return;
+ }
+ callback.forceLedgerComplete(BKException.Code.OK, ledgerId, bookieSocketAddress, ctx);
+ });
+ };
+ if (suspendedBookiesForForceLedgerAcks.contains(bookieSocketAddress)) {
+ List<Runnable> queue = deferredBookieForceLedgerResponses.computeIfAbsent(bookieSocketAddress,
+ (k) -> new CopyOnWriteArrayList<>());
+ queue.add(activity);
+ } else {
+ activity.run();
+ }
+ return null;
+ });
+
+ stub.when(bookieClient).forceLedger(any(BookieSocketAddress.class),
+ anyLong(),
+ any(BookkeeperInternalCallbacks.ForceLedgerCallback.class),
+ any());
+ }
+
}
--
To stop receiving notification emails like this one, please contact
eolivelli@apache.org.