You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2018/06/18 07:05:12 UTC

[bookkeeper] branch master updated: BP-14 force() API - client side implementation

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new f7dce11  BP-14 force() API - client side implementation
f7dce11 is described below

commit f7dce110fdca47c93664d1d9c71cda8cb08c9cc2
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Mon Jun 18 09:05:11 2018 +0200

    BP-14 force() API - client side implementation
    
    - Introduce the client side force() API
    - Implementation on the client side wire protocol for FORCE_LEDGER RPC
    - Disable ensemble changes for DEFERRED_SYNC writers
    - Prevent v2 client from using force() API.
    
    The force() API enables the client (usually with DEFERRED_SYNC write flags) to require a point of synchronization with all the bookies in the ensemble, to have guarantees about durability of previously written entries (and ackknowledgerd), this way LastAddConfirmed is able to advance.
    
    For DEFERRED_SYNC writers LastAddConfirmed will advance only using this API
    
    Author: Enrico Olivelli <eo...@apache.org>
    
    Reviewers: Jia Zhai <None>, Sijie Guo <si...@apache.org>, Venkateswararao Jujjuri (JV) <None>
    
    This closes #1436 from eolivelli/bp14-force-client-api
---
 .../org/apache/bookkeeper/client/BookKeeper.java   |  19 ++
 .../bookkeeper/client/BookKeeperClientStats.java   |   3 +
 .../bookkeeper/client/DistributionSchedule.java    |  13 +-
 .../apache/bookkeeper/client/ForceLedgerOp.java    | 124 +++++++++++++
 .../org/apache/bookkeeper/client/LedgerHandle.java |  83 ++++++++-
 .../org/apache/bookkeeper/client/PendingAddOp.java |   3 +-
 .../client/ReadLastConfirmedAndEntryOp.java        |   2 +-
 .../client/RoundRobinDistributionSchedule.java     |   9 +-
 .../apache/bookkeeper/client/api/BKException.java  |   2 +-
 .../bookkeeper/client/api/ForceableHandle.java     |  50 ++++++
 .../bookkeeper/client/api/WriteAdvHandle.java      |   2 +-
 .../apache/bookkeeper/client/api/WriteFlag.java    |   2 +
 .../apache/bookkeeper/client/api/WriteHandle.java  |   2 +-
 .../org/apache/bookkeeper/proto/BookieClient.java  |  25 +++
 .../proto/BookkeeperInternalCallbacks.java         |   7 +
 .../bookkeeper/proto/PerChannelBookieClient.java   |  87 ++++++++++
 .../bookkeeper/bookie/BookieDeferredSyncTest.java  | 191 +++++++++++++++++++++
 .../apache/bookkeeper/client/BookKeeperTest.java   |  17 ++
 .../apache/bookkeeper/client/DeferredSyncTest.java | 181 ++++++++++++++++++-
 .../apache/bookkeeper/client/ExplicitLacTest.java  |  81 +++++++++
 .../bookkeeper/client/MockBookKeeperTestCase.java  |  85 +++++++--
 21 files changed, 960 insertions(+), 28 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index c3a5728..44f970f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -117,6 +117,7 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
     private OpStatsLogger readLacAndEntryOpLogger;
     private OpStatsLogger readLacAndEntryRespLogger;
     private OpStatsLogger addOpLogger;
+    private OpStatsLogger forceOpLogger;
     private OpStatsLogger writeLacOpLogger;
     private OpStatsLogger readLacOpLogger;
     private OpStatsLogger recoverAddEntriesStats;
@@ -736,6 +737,20 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
                     throw new IllegalArgumentException("Unable to convert digest type " + digestType);
             }
         }
+        public org.apache.bookkeeper.client.api.DigestType toApiDigestType() {
+            switch (this) {
+                case MAC:
+                    return org.apache.bookkeeper.client.api.DigestType.MAC;
+                case CRC32:
+                    return org.apache.bookkeeper.client.api.DigestType.CRC32;
+                case CRC32C:
+                    return org.apache.bookkeeper.client.api.DigestType.CRC32C;
+                case DUMMY:
+                    return org.apache.bookkeeper.client.api.DigestType.DUMMY;
+                default:
+                    throw new IllegalArgumentException("Unable to convert digest type " + this);
+            }
+        }
     }
 
     boolean shouldReorderReadSequence() {
@@ -1493,6 +1508,7 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
         readLacAndEntryRespLogger = stats.getOpStatsLogger(
                 BookKeeperClientStats.READ_LAST_CONFIRMED_AND_ENTRY_RESPONSE);
         addOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.ADD_OP);
+        forceOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.FORCE_OP);
         addOpUrCounter = stats.getCounter(BookKeeperClientStats.ADD_OP_UR);
         writeLacOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.WRITE_LAC_OP);
         readLacOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.READ_LAC_OP);
@@ -1526,6 +1542,9 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
     OpStatsLogger getAddOpLogger() {
         return addOpLogger;
     }
+    OpStatsLogger getForceOpLogger() {
+        return forceOpLogger;
+    }
     OpStatsLogger getWriteLacOpLogger() {
         return writeLacOpLogger;
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
index 749ac9c..30d14f0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
@@ -41,6 +41,7 @@ public interface BookKeeperClientStats {
 
     String ADD_OP = "ADD_ENTRY";
     String ADD_OP_UR = "ADD_ENTRY_UR"; // Under Replicated during AddEntry.
+    String FORCE_OP = "FORCE"; // Number of force ledger operations
     String READ_OP = "READ_ENTRY";
     // Corrupted entry (Digest Mismatch/ Under Replication) detected during ReadEntry
     String READ_OP_DM = "READ_ENTRY_DM";
@@ -64,7 +65,9 @@ public interface BookKeeperClientStats {
     String CHANNEL_ADD_OP = "ADD_ENTRY";
     String CHANNEL_TIMEOUT_ADD = "TIMEOUT_ADD_ENTRY";
     String CHANNEL_WRITE_LAC_OP = "WRITE_LAC";
+    String CHANNEL_FORCE_OP = "FORCE";
     String CHANNEL_TIMEOUT_WRITE_LAC = "TIMEOUT_WRITE_LAC";
+    String CHANNEL_TIMEOUT_FORCE = "TIMEOUT_FORCE";
     String CHANNEL_READ_LAC_OP = "READ_LAC";
     String CHANNEL_TIMEOUT_READ_LAC = "TIMEOUT_READ_LAC";
     String TIMEOUT_GET_BOOKIE_INFO = "TIMEOUT_GET_BOOKIE_INFO";
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
index 2bd2a99..d53129d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
@@ -142,12 +142,12 @@ public interface DistributionSchedule {
     WriteSet getWriteSet(long entryId);
 
     /**
-     * Return the set of bookies indices to send the messages to for longpoll reads.
+     * Return the set of bookies indices to send the messages to the whole ensemble.
      *
-     * @param entryId expected next entry id to read.
-     * @return the set of bookies indices to read from.
+     * @param entryId entry id used to calculate the ensemble.
+     * @return the set of bookies indices to send the request.
      */
-    WriteSet getWriteSetForLongPoll(long entryId);
+    WriteSet getEnsembleSet(long entryId);
 
     /**
      * An ack set represents the set of bookies from which
@@ -197,6 +197,11 @@ public interface DistributionSchedule {
      */
     AckSet getAckSet();
 
+    /**
+     * Returns an ackset object useful to wait for all bookies in the ensemble,
+     * responses should be checked against this.
+     */
+    AckSet getEnsembleAckSet();
 
     /**
      * Interface to keep track of which bookies in an ensemble, an action
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ForceLedgerOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ForceLedgerOp.java
new file mode 100644
index 0000000..cd60848
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ForceLedgerOp.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import static com.google.common.base.Preconditions.checkState;
+import java.util.ArrayList;
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ForceLedgerCallback;
+import org.apache.bookkeeper.util.SafeRunnable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This represents a request to sync the ledger on every bookie.
+ */
+class ForceLedgerOp extends SafeRunnable implements ForceLedgerCallback {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ForceLedgerOp.class);
+    final CompletableFuture<Void> cb;
+
+    DistributionSchedule.AckSet ackSet;
+    boolean completed = false;
+    boolean errored = false;
+    int lastSeenError = BKException.Code.WriteException;
+    ArrayList<BookieSocketAddress> currentEnsemble;
+
+    long currentNonDurableLastAddConfirmed = LedgerHandle.INVALID_ENTRY_ID;
+
+    final LedgerHandle lh;
+
+    ForceLedgerOp(LedgerHandle lh, CompletableFuture<Void> cb) {
+        this.lh = lh;
+        this.cb = cb;
+    }
+
+    void sendForceLedgerRequest(int bookieIndex) {
+        lh.bk.getBookieClient().forceLedger(currentEnsemble.get(bookieIndex), lh.ledgerId, this, bookieIndex);
+    }
+
+    @Override
+    public void safeRun() {
+        initiate();
+    }
+
+    void initiate() {
+
+        // capture currentNonDurableLastAddConfirmed
+        // remember that we are inside OrderedExecutor, this induces a strict ordering
+        // on the sequence of events
+        this.currentNonDurableLastAddConfirmed = lh.pendingAddsSequenceHead;
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("force {} clientNonDurableLac {}", lh.ledgerId, currentNonDurableLastAddConfirmed);
+        }
+        // we need to send the request to every bookie in the ensamble
+        this.currentEnsemble = lh.metadata.currentEnsemble;
+        this.ackSet = lh.distributionSchedule.getEnsembleAckSet();
+
+        DistributionSchedule.WriteSet writeSet = lh.getDistributionSchedule()
+                                                   .getEnsembleSet(currentNonDurableLastAddConfirmed);
+        try {
+            for (int i = 0; i < writeSet.size(); i++) {
+                sendForceLedgerRequest(writeSet.get(i));
+            }
+        } finally {
+            writeSet.recycle();
+        }
+    }
+
+    @Override
+    public void forceLedgerComplete(int rc, long ledgerId, BookieSocketAddress addr, Object ctx) {
+        int bookieIndex = (Integer) ctx;
+
+        checkState(!completed, "We are waiting for all the bookies, it is not expected an early exit");
+
+        if (errored) {
+            // already failed, do not fire error callbacks twice
+            return;
+        }
+
+        if (BKException.Code.OK != rc) {
+            lastSeenError = rc;
+        }
+
+        if (rc == BKException.Code.OK) {
+            if (ackSet.completeBookieAndCheck(bookieIndex)) {
+                completed = true;
+                // we are able to say that every bookie sync'd its own journal
+                // for every ackknowledged entry before issuing the force() call
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("After force on ledger {} updating LastAddConfirmed to {} ",
+                              ledgerId, currentNonDurableLastAddConfirmed);
+                }
+                lh.updateLastConfirmed(currentNonDurableLastAddConfirmed, lh.getLength());
+                FutureUtils.complete(cb, null);
+            }
+        } else {
+            // at least one bookie failed, as we are waiting for all the bookies
+            // we can fail immediately
+            LOG.info("ForceLedger did not succeed: Ledger {} on {}", ledgerId, addr);
+            errored = true;
+
+            // notify the failure
+            FutureUtils.completeExceptionally(cb, BKException.create(lastSeenError));
+        }
+
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index a2a37e9..a79a01e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -1109,6 +1109,68 @@ public class LedgerHandle implements WriteHandle {
     }
 
     /**
+     * {@inheritDoc}
+     */
+    @Override
+    public CompletableFuture<Void> force() {
+        CompletableFuture<Void> result = new CompletableFuture<>();
+        ForceLedgerOp op = new ForceLedgerOp(this, result);
+        boolean wasClosed = false;
+        synchronized (this) {
+            // synchronized on this to ensure that
+            // the ledger isn't closed between checking and
+            // updating lastAddPushed
+            if (metadata.isClosed()) {
+                wasClosed = true;
+            }
+        }
+
+        if (wasClosed) {
+            // make sure the callback is triggered in main worker pool
+            try {
+                bk.getMainWorkerPool().executeOrdered(ledgerId, new SafeRunnable() {
+                    @Override
+                    public void safeRun() {
+                        LOG.warn("Force() attempted on a closed ledger: {}", ledgerId);
+                        result.completeExceptionally(new BKException.BKLedgerClosedException());
+                    }
+
+                    @Override
+                    public String toString() {
+                        return String.format("force(lid=%d)", ledgerId);
+                    }
+                });
+            } catch (RejectedExecutionException e) {
+                result.completeExceptionally(new BKException.BKInterruptedException());
+            }
+            return result;
+        }
+
+        // early exit: no write has been issued yet
+        if (pendingAddsSequenceHead == INVALID_ENTRY_ID) {
+            bk.getMainWorkerPool().executeOrdered(ledgerId, new SafeRunnable() {
+                    @Override
+                    public void safeRun() {
+                        FutureUtils.complete(result, null);
+                    }
+
+                    @Override
+                    public String toString() {
+                        return String.format("force(lid=%d)", ledgerId);
+                    }
+                });
+            return result;
+        }
+
+        try {
+            bk.getMainWorkerPool().executeOrdered(ledgerId, op);
+        } catch (RejectedExecutionException e) {
+            result.completeExceptionally(new BKException.BKInterruptedException());
+        }
+        return result;
+    }
+
+    /**
      * Make a recovery add entry request. Recovery adds can add to a ledger even
      * if it has been fenced.
      *
@@ -1229,7 +1291,7 @@ public class LedgerHandle implements WriteHandle {
         if (wasClosed) {
             // make sure the callback is triggered in main worker pool
             try {
-                bk.getMainWorkerPool().submit(new SafeRunnable() {
+                bk.getMainWorkerPool().executeOrdered(ledgerId, new SafeRunnable() {
                     @Override
                     public void safeRun() {
                         LOG.warn("Attempt to add to closed ledger: {}", ledgerId);
@@ -1799,6 +1861,14 @@ public class LedgerHandle implements WriteHandle {
             }
             return;
         }
+        if (writeFlags.contains(WriteFlag.DEFERRED_SYNC)) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Cannot perform ensemble change with writeflags {}."
+                        + "Failed bookies {} for ledger {}.",
+                        writeFlags, delayedWriteFailedBookies, ledgerId);
+            }
+            return;
+        }
         synchronized (metadata) {
             try {
                 EnsembleInfo ensembleInfo = replaceBookieInMetadata(delayedWriteFailedBookies, curNumEnsembleChanges);
@@ -1830,6 +1900,17 @@ public class LedgerHandle implements WriteHandle {
             return;
         }
 
+        if (writeFlags.contains(WriteFlag.DEFERRED_SYNC)) {
+            blockAddCompletions.decrementAndGet();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Cannot perform ensemble change with write flags {}. "
+                        + "Failed bookies {} for ledger {}.",
+                    writeFlags, failedBookies, ledgerId);
+            }
+            handleUnrecoverableErrorDuringAdd(WriteException);
+            return;
+        }
+
         int curNumEnsembleChanges = numEnsembleChanges.incrementAndGet();
 
         // when the ensemble changes are too frequent, close handle
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index 0153d44..3c3a1dd 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -284,7 +284,8 @@ class PendingAddOp extends SafeRunnable implements WriteCallback {
                 // Got an error after satisfying AQ. This means we are under replicated at the create itself.
                 // Update the stat to reflect it.
                 addOpUrCounter.inc();
-                if (!lh.bk.getDisableEnsembleChangeFeature().isAvailable() && !lh.bk.delayEnsembleChange) {
+                if (!lh.bk.getDisableEnsembleChangeFeature().isAvailable()
+                        && !lh.bk.delayEnsembleChange) {
                     lh.getDelayedWriteFailedBookies().putIfAbsent(bookieIndex, addr);
                 }
             }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
index 1b3fa70..1436990 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
@@ -81,7 +81,7 @@ class ReadLastConfirmedAndEntryOp implements BookkeeperInternalCallbacks.ReadEnt
         ReadLACAndEntryRequest(ArrayList<BookieSocketAddress> ensemble, long lId, long eId) {
             this.entryImpl = LedgerEntryImpl.create(lId, eId);
             this.ensemble = ensemble;
-            this.writeSet = lh.getDistributionSchedule().getWriteSetForLongPoll(eId);
+            this.writeSet = lh.getDistributionSchedule().getEnsembleSet(eId);
             if (lh.getBk().shouldReorderReadSequence()) {
                 this.orderedEnsemble = lh.getBk().getPlacementPolicy().reorderReadLACSequence(ensemble,
                         lh.getBookiesHealthInfo(), writeSet.copy());
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
index e399b01..d079408 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
@@ -54,8 +54,8 @@ class RoundRobinDistributionSchedule implements DistributionSchedule {
     }
 
     @Override
-    public WriteSet getWriteSetForLongPoll(long entryId) {
-        // for long poll reads, we are trying all the bookies in the ensemble
+    public WriteSet getEnsembleSet(long entryId) {
+        // for long poll reads and force ledger , we are trying all the bookies in the ensemble
         // so we create a `WriteSet` with `writeQuorumSize == ensembleSize`.
         return WriteSetImpl.create(ensembleSize, ensembleSize /* writeQuorumSize */, entryId);
     }
@@ -252,6 +252,11 @@ class RoundRobinDistributionSchedule implements DistributionSchedule {
         return AckSetImpl.create(ensembleSize, writeQuorumSize, ackQuorumSize);
     }
 
+    @Override
+    public AckSet getEnsembleAckSet() {
+        return AckSetImpl.create(ensembleSize, ensembleSize, ensembleSize);
+    }
+
     private static class AckSetImpl implements AckSet {
         private int writeQuorumSize;
         private int ackQuorumSize;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BKException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BKException.java
index b81f33d..09bb8f3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BKException.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BKException.java
@@ -97,7 +97,7 @@ public class BKException extends Exception {
         case Code.QuorumException:
             return "Invalid quorum size on ensemble size";
         case Code.NoBookieAvailableException:
-            return "Invalid quorum size on ensemble size";
+            return "No bookie available";
         case Code.DigestNotInitializedException:
             return "Digest engine not initialized";
         case Code.DigestMatchException:
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ForceableHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ForceableHandle.java
new file mode 100644
index 0000000..48d7acb
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ForceableHandle.java
@@ -0,0 +1,50 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client.api;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public;
+import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
+
+/**
+ * Provide the ability to enforce durability guarantees to the writer.
+ *
+ * @see WriteAdvHandle
+ * @see WriteHandle
+ *
+ * @since 4.8
+ */
+@Public
+@Unstable
+public interface ForceableHandle {
+
+    /**
+     * Enforce durability to the entries written by this handle.
+     * <p>This API is useful with {@link WriteFlag#DEFERRED_SYNC}, because with
+     * that flag writes are acknowledged by the bookie without waiting for a
+     * durable write
+     * </p>
+     *
+     * @return an handle to the result
+     */
+    CompletableFuture<Void> force();
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteAdvHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteAdvHandle.java
index 37f45b9..c24c6d0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteAdvHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteAdvHandle.java
@@ -39,7 +39,7 @@ import org.apache.bookkeeper.common.concurrent.FutureUtils;
  */
 @Public
 @Unstable
-public interface WriteAdvHandle extends ReadHandle {
+public interface WriteAdvHandle extends ReadHandle, ForceableHandle {
 
     /**
      * Add entry asynchronously to an open ledger.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteFlag.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteFlag.java
index 30199c2..6914abe 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteFlag.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteFlag.java
@@ -32,6 +32,8 @@ public enum WriteFlag {
     /**
      * Writes will be acknowledged after writing to the filesystem
      * but not yet been persisted to disks.
+     *
+     * @see ForceableHandle#force()
      */
     DEFERRED_SYNC(0x1 << 0);
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteHandle.java
index b2c0459..edad5f4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteHandle.java
@@ -38,7 +38,7 @@ import org.apache.bookkeeper.common.concurrent.FutureUtils;
  */
 @Public
 @Unstable
-public interface WriteHandle extends ReadHandle {
+public interface WriteHandle extends ReadHandle, ForceableHandle {
 
     /**
      * Add entry asynchronously to an open ledger.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index 3dd837c..197483d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -55,6 +55,7 @@ import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.common.util.SafeRunnable;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ForceLedgerCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
@@ -193,6 +194,30 @@ public class BookieClient implements PerChannelBookieClientFactory {
         return clientPool;
     }
 
+    public void forceLedger(final BookieSocketAddress addr, final long ledgerId,
+            final ForceLedgerCallback cb, final Object ctx) {
+        final PerChannelBookieClientPool client = lookupClient(addr);
+        if (client == null) {
+            cb.forceLedgerComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
+                              ledgerId, addr, ctx);
+            return;
+        }
+
+        client.obtain((rc, pcbc) -> {
+            if (rc != BKException.Code.OK) {
+                try {
+                    executor.executeOrdered(ledgerId, safeRun(() -> {
+                        cb.forceLedgerComplete(rc, ledgerId, addr, ctx);
+                    }));
+                } catch (RejectedExecutionException re) {
+                    cb.forceLedgerComplete(getRc(BKException.Code.InterruptedException), ledgerId, addr, ctx);
+                }
+            } else {
+                pcbc.forceLedger(ledgerId, cb, ctx);
+            }
+        }, ledgerId);
+    }
+
     public void writeLac(final BookieSocketAddress addr, final long ledgerId, final byte[] masterKey,
             final long lac, final ByteBufList toSend, final WriteLacCallback cb, final Object ctx) {
         final PerChannelBookieClientPool client = lookupClient(addr);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
index 04a4546..cd87ff1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
@@ -91,6 +91,13 @@ public class BookkeeperInternalCallbacks {
     }
 
     /**
+     * Force callback interface.
+     */
+    public interface ForceLedgerCallback {
+        void forceLedgerComplete(int rc, long ledgerId, BookieSocketAddress addr, Object ctx);
+    }
+
+    /**
      * A callback interface for a STARTTLS command.
      */
     public interface StartTLSCallback {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 39436d4..51c875b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -92,6 +92,7 @@ import org.apache.bookkeeper.client.api.WriteFlag;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ForceLedgerCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
@@ -103,6 +104,8 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ForceLedgerRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ForceLedgerResponse;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.GetBookieInfoRequest;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.GetBookieInfoResponse;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType;
@@ -174,9 +177,11 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
     private final OpStatsLogger readTimeoutOpLogger;
     private final OpStatsLogger addEntryOpLogger;
     private final OpStatsLogger writeLacOpLogger;
+    private final OpStatsLogger forceLedgerOpLogger;
     private final OpStatsLogger readLacOpLogger;
     private final OpStatsLogger addTimeoutOpLogger;
     private final OpStatsLogger writeLacTimeoutOpLogger;
+    private final OpStatsLogger forceLedgerTimeoutOpLogger;
     private final OpStatsLogger readLacTimeoutOpLogger;
     private final OpStatsLogger getBookieInfoOpLogger;
     private final OpStatsLogger getBookieInfoTimeoutOpLogger;
@@ -281,11 +286,13 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
         readEntryOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_READ_OP);
         addEntryOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_ADD_OP);
         writeLacOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_WRITE_LAC_OP);
+        forceLedgerOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_FORCE_OP);
         readLacOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_READ_LAC_OP);
         getBookieInfoOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.GET_BOOKIE_INFO_OP);
         readTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_READ);
         addTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_ADD);
         writeLacTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_WRITE_LAC);
+        forceLedgerTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_FORCE);
         readLacTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_READ_LAC);
         getBookieInfoTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.TIMEOUT_GET_BOOKIE_INFO);
         startTLSOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_START_TLS_OP);
@@ -566,6 +573,37 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
         writeAndFlush(channel, completionKey, writeLacRequest);
     }
 
+    void forceLedger(final long ledgerId, ForceLedgerCallback cb, Object ctx) {
+        if (useV2WireProtocol) {
+                LOG.error("force is not allowed with v2 protocol");
+                executor.executeOrdered(ledgerId, () -> {
+                    cb.forceLedgerComplete(BKException.Code.IllegalOpException, ledgerId, addr, ctx);
+                });
+                return;
+        }
+        final long txnId = getTxnId();
+        final CompletionKey completionKey = new V3CompletionKey(txnId,
+                                                                OperationType.FORCE_LEDGER);
+        // force is mostly like addEntry hence uses addEntryTimeout
+        completionObjects.put(completionKey,
+                              new ForceLedgerCompletion(completionKey, cb,
+                                                     ctx, ledgerId));
+
+        // Build the request
+        BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
+                .setVersion(ProtocolVersion.VERSION_THREE)
+                .setOperation(OperationType.FORCE_LEDGER)
+                .setTxnId(txnId);
+        ForceLedgerRequest.Builder writeLacBuilder = ForceLedgerRequest.newBuilder()
+                .setLedgerId(ledgerId);
+
+        final Request forceLedgerRequest = Request.newBuilder()
+                .setHeader(headerBuilder)
+                .setForceLedgerRequest(writeLacBuilder)
+                .build();
+        writeAndFlush(channel, completionKey, forceLedgerRequest);
+    }
+
     /**
      * This method should be called only after connection has been checked for
      * {@link #connectIfNeededAndDoOp(GenericCallback)}.
@@ -1553,6 +1591,55 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
         }
     }
 
+    class ForceLedgerCompletion extends CompletionValue {
+        final ForceLedgerCallback cb;
+
+        public ForceLedgerCompletion(final CompletionKey key,
+                                  final ForceLedgerCallback originalCallback,
+                                  final Object originalCtx,
+                                  final long ledgerId) {
+            super("ForceLedger",
+                  originalCtx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED,
+                  forceLedgerOpLogger, forceLedgerTimeoutOpLogger);
+            this.cb = new ForceLedgerCallback() {
+                    @Override
+                    public void forceLedgerComplete(int rc, long ledgerId,
+                                                 BookieSocketAddress addr,
+                                                 Object ctx) {
+                        logOpResult(rc);
+                        originalCallback.forceLedgerComplete(rc, ledgerId,
+                                                          addr, originalCtx);
+                        key.release();
+                    }
+                };
+        }
+
+        @Override
+        public void errorOut() {
+            errorOut(BKException.Code.BookieHandleNotAvailableException);
+        }
+
+        @Override
+        public void errorOut(final int rc) {
+            errorOutAndRunCallback(
+                    () -> cb.forceLedgerComplete(rc, ledgerId, addr, ctx));
+        }
+
+        @Override
+        public void handleV3Response(BookkeeperProtocol.Response response) {
+            ForceLedgerResponse forceLedgerResponse = response.getForceLedgerResponse();
+            StatusCode status = response.getStatus() == StatusCode.EOK
+                ? forceLedgerResponse.getStatus() : response.getStatus();
+            long ledgerId = forceLedgerResponse.getLedgerId();
+
+            if (LOG.isDebugEnabled()) {
+                logResponse(status, "ledger", ledgerId);
+            }
+            int rc = convertStatus(status, BKException.Code.WriteException);
+            cb.forceLedgerComplete(rc, ledgerId, addr, ctx);
+        }
+    }
+
     // visible for testing
     class ReadLacCompletion extends CompletionValue {
         final ReadLacCallback cb;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieDeferredSyncTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieDeferredSyncTest.java
new file mode 100644
index 0000000..cce445e
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieDeferredSyncTest.java
@@ -0,0 +1,191 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+import static org.junit.Assert.assertEquals;
+
+import java.util.EnumSet;
+import org.apache.bookkeeper.client.api.DigestType;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.api.WriteFlag;
+import org.apache.bookkeeper.client.api.WriteHandle;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Test;
+
+/**
+ * Test the bookie journal without sync, driven by client with
+ * {@link WriteFlag#DEFERRED_SYNC} write flag.
+ */
+public class BookieDeferredSyncTest extends BookKeeperClusterTestCase {
+
+    public BookieDeferredSyncTest() {
+        super(1);
+    }
+
+    @Test
+    public void testWriteAndRecovery() throws Exception {
+        // this WriteHandle will not be closed
+        WriteHandle lh = result(bkc.newCreateLedgerOp()
+                .withEnsembleSize(1)
+                .withWriteQuorumSize(1)
+                .withAckQuorumSize(1)
+                .withWriteFlags(WriteFlag.DEFERRED_SYNC)
+                .withDigestType(DigestType.CRC32C)
+                .withPassword(new byte[0])
+                .execute());
+
+        int n = 10;
+
+        long ledgerId = lh.getId();
+
+        for (int i = 0; i < n; i++) {
+            lh.append(("entry-" + i).getBytes(UTF_8));
+        }
+
+        try (ReadHandle readLh = result(bkc.newOpenLedgerOp()
+                .withLedgerId(ledgerId)
+                .withRecovery(true)
+                .withPassword(new byte[0])
+                .execute());) {
+
+            try (LedgerEntries entries = readLh.read(0, n - 1)) {
+                for (int i = 0; i < n; i++) {
+                    org.apache.bookkeeper.client.api.LedgerEntry entry = entries.getEntry(i);
+                    assertEquals("entry-" + i, new String(entry.getEntryBytes()));
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testCloseNoForce() throws Exception {
+        testClose(true);
+    }
+
+    @Test
+    public void testCloseWithForce() throws Exception {
+        testClose(false);
+    }
+
+    private void testClose(boolean force) throws Exception {
+        final int n = 10;
+        long ledgerId;
+        try (WriteHandle lh = result(bkc.newCreateLedgerOp()
+                .withEnsembleSize(1)
+                .withWriteQuorumSize(1)
+                .withAckQuorumSize(1)
+                .withWriteFlags(WriteFlag.DEFERRED_SYNC)
+                .withDigestType(DigestType.CRC32C)
+                .withPassword(new byte[0])
+                .execute())) {
+
+            ledgerId = lh.getId();
+            for (int i = 0; i < n; i++) {
+                lh.append(("entry-" + i).getBytes(UTF_8));
+            }   if (force) {
+                // with force() LastAddConfirmed is updated
+                result(lh.force());
+                // on close metadata will have LastAddConfirmed = n - 1
+                assertEquals(n - 1, lh.getLastAddConfirmed());
+            } else {
+                // on close metadata will have LastAddConfirmed = -1
+                assertEquals(-1, lh.getLastAddConfirmed());
+            }
+        }
+
+        if (force) {
+            // the reader will be able to read
+            try (ReadHandle readLh = result(bkc.newOpenLedgerOp()
+                    .withLedgerId(ledgerId)
+                    .withRecovery(true)
+                    .withPassword(new byte[0])
+                    .execute());) {
+
+                try (LedgerEntries entries = readLh.read(0, n - 1)) {
+                    for (int i = 0; i < n; i++) {
+                        LedgerEntry entry = entries.getEntry(i);
+                        assertEquals("entry-" + i, new String(entry.getEntryBytes()));
+                    }
+                }
+
+                try (LedgerEntries entries = readLh.readUnconfirmed(0, n - 1)) {
+                    for (int i = 0; i < n; i++) {
+                        LedgerEntry entry = entries.getEntry(i);
+                        assertEquals("entry-" + i, new String(entry.getEntryBytes()));
+                    }
+                }
+            }
+        } else {
+            // reader will see LastAddConfirmed = -1
+            try (ReadHandle readLh = result(bkc.newOpenLedgerOp()
+                    .withLedgerId(ledgerId)
+                    .withRecovery(true)
+                    .withPassword(new byte[0])
+                    .execute());) {
+                assertEquals(-1, readLh.getLastAddConfirmed());
+
+                // entry will be readable with readUnconfirmed
+                try (LedgerEntries entries = readLh.readUnconfirmed(0, n - 1)) {
+                    for (int i = 0; i < n; i++) {
+                        LedgerEntry entry = entries.getEntry(i);
+                        assertEquals("entry-" + i, new String(entry.getEntryBytes()));
+                    }
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testForceWithDeferredSyncWriteFlags() throws Exception {
+        testForce(EnumSet.of(WriteFlag.DEFERRED_SYNC));
+    }
+
+    @Test
+    public void testForceNoWriteFlag() throws Exception {
+        // force API will work even without DEFERRED_SYNC flag
+        testForce(WriteFlag.NONE);
+    }
+
+    private void testForce(EnumSet<WriteFlag> writeFlags) throws Exception {
+        try (WriteHandle lh = result(bkc.newCreateLedgerOp()
+                .withEnsembleSize(1)
+                .withWriteQuorumSize(1)
+                .withAckQuorumSize(1)
+                .withWriteFlags(writeFlags)
+                .withDigestType(DigestType.CRC32C)
+                .withPassword(new byte[0])
+                .execute());) {
+            int n = 10;
+            for (int i = 0; i < n; i++) {
+                lh.append(("entry-" + i).getBytes(UTF_8));
+            }
+            result(lh.force());
+            assertEquals(n - 1, lh.getLastAddConfirmed());
+
+            lh.close();
+        }
+    }
+
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
index f1521f4..1a22e61 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
@@ -801,4 +801,21 @@ public class BookKeeperTest extends BookKeeperClusterTestCase {
         }
     }
 
+    @Test(expected = BKIllegalOpException.class)
+    public void testCannotUseForceOnV2Protocol() throws Exception {
+        ClientConfiguration conf = new ClientConfiguration(baseClientConf);
+        conf.setUseV2WireProtocol(true);
+        try (BookKeeperTestClient bkc = new BookKeeperTestClient(conf);) {
+            try (WriteHandle wh = result(bkc.newCreateLedgerOp()
+                    .withEnsembleSize(3)
+                    .withWriteQuorumSize(3)
+                    .withAckQuorumSize(2)
+                    .withPassword("".getBytes())
+                    .withWriteFlags(WriteFlag.NONE)
+                    .execute())) {
+               result(wh.appendAsync("".getBytes()));
+               result(wh.force());
+            }
+        }
+    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/DeferredSyncTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/DeferredSyncTest.java
index 4bd5a8c..aaa7645 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/DeferredSyncTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/DeferredSyncTest.java
@@ -19,11 +19,16 @@ package org.apache.bookkeeper.client;
 
 import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.client.api.WriteAdvHandle;
 import org.apache.bookkeeper.client.api.WriteFlag;
 import org.apache.bookkeeper.client.api.WriteHandle;
+import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.junit.Test;
 
 /**
@@ -49,10 +54,180 @@ public class DeferredSyncTest extends MockBookKeeperTestCase {
             }
             long lastEntryID = result(wh.appendAsync(DATA));
             assertEquals(NUM_ENTRIES - 1, lastEntryID);
+            assertEquals(NUM_ENTRIES - 1, wh.getLastAddPushed());
+            assertEquals(-1, wh.getLastAddConfirmed());
+        }
+    }
+
+    @Test
+    public void testAddEntryLastAddConfirmedAdvanceWithForce() throws Exception {
+        try (WriteHandle wh = result(newCreateLedgerOp()
+                .withEnsembleSize(3)
+                .withWriteQuorumSize(3)
+                .withAckQuorumSize(2)
+                .withPassword(PASSWORD)
+                .withWriteFlags(WriteFlag.DEFERRED_SYNC)
+                .execute())) {
+            for (int i = 0; i < NUM_ENTRIES - 1; i++) {
+                result(wh.appendAsync(DATA));
+            }
+            long lastEntryID = result(wh.appendAsync(DATA));
+            assertEquals(NUM_ENTRIES - 1, lastEntryID);
+            assertEquals(NUM_ENTRIES - 1, wh.getLastAddPushed());
+            assertEquals(-1, wh.getLastAddConfirmed());
+            result(wh.force());
+            assertEquals(NUM_ENTRIES - 1, wh.getLastAddConfirmed());
+        }
+    }
+
+    @Test
+    public void testForceOnWriteAdvHandle() throws Exception {
+        try (WriteAdvHandle wh = result(newCreateLedgerOp()
+                .withEnsembleSize(3)
+                .withWriteQuorumSize(3)
+                .withAckQuorumSize(2)
+                .withPassword(PASSWORD)
+                .withWriteFlags(WriteFlag.DEFERRED_SYNC)
+                .makeAdv()
+                .execute())) {
+            CompletableFuture<Long> w0 = wh.writeAsync(0, DATA);
+            CompletableFuture<Long> w2 = wh.writeAsync(2, DATA);
+            CompletableFuture<Long> w3 = wh.writeAsync(3, DATA);
+            result(w0);
+            result(wh.force());
+            assertEquals(0, wh.getLastAddConfirmed());
+            CompletableFuture<Long> w1 = wh.writeAsync(1, DATA);
+            result(w3);
+            assertTrue(w1.isDone());
+            assertTrue(w2.isDone());
+            CompletableFuture<Long> w5 = wh.writeAsync(5, DATA);
+            result(wh.force());
+            assertEquals(3, wh.getLastAddConfirmed());
+            wh.writeAsync(4, DATA);
+            result(w5);
+            result(wh.force());
+            assertEquals(5, wh.getLastAddConfirmed());
+        }
+    }
+
+    @Test
+    public void testForceRequiresFullEnsemble() throws Exception {
+        try (WriteHandle wh = result(newCreateLedgerOp()
+                .withEnsembleSize(3)
+                .withWriteQuorumSize(2)
+                .withAckQuorumSize(2)
+                .withPassword(PASSWORD)
+                .withWriteFlags(WriteFlag.DEFERRED_SYNC)
+                .execute())) {
+            for (int i = 0; i < NUM_ENTRIES - 1; i++) {
+                result(wh.appendAsync(DATA));
+            }
+            long lastEntryID = result(wh.appendAsync(DATA));
+            assertEquals(NUM_ENTRIES - 1, lastEntryID);
+            assertEquals(NUM_ENTRIES - 1, wh.getLastAddPushed());
+            assertEquals(-1, wh.getLastAddConfirmed());
+
+            BookieSocketAddress bookieAddress = wh.getLedgerMetadata().getEnsembleAt(wh.getLastAddPushed()).get(0);
+            killBookie(bookieAddress);
+
+            // write should succeed (we still have 2 bookies out of 3)
+            result(wh.appendAsync(DATA));
+
+            // force cannot go, it must be acknowledged by all of the bookies in the ensamble
+            try {
+                result(wh.force());
+            } catch (BKException.BKBookieException failed) {
+            }
+            // bookie comes up again, force must succeed
+            startKilledBookie(bookieAddress);
+            result(wh.force());
+        }
+    }
+
+    @Test
+    public void testForceWillAdvanceLacOnlyUpToLastAcknoledgedWrite() throws Exception {
+        try (WriteHandle wh = result(newCreateLedgerOp()
+                .withEnsembleSize(3)
+                .withWriteQuorumSize(3)
+                .withAckQuorumSize(3)
+                .withPassword(PASSWORD)
+                .withWriteFlags(WriteFlag.DEFERRED_SYNC)
+                .execute())) {
+            for (int i = 0; i < NUM_ENTRIES - 1; i++) {
+                result(wh.appendAsync(DATA));
+            }
+            long lastEntryIdBeforeSuspend = result(wh.appendAsync(DATA));
+            assertEquals(NUM_ENTRIES - 1, lastEntryIdBeforeSuspend);
+            assertEquals(-1, wh.getLastAddConfirmed());
+
+            // one bookie will stop sending acks for forceLedger
+            BookieSocketAddress bookieAddress = wh.getLedgerMetadata().getEnsembleAt(wh.getLastAddPushed()).get(0);
+            suspendBookieForceLedgerAcks(bookieAddress);
+
+            // start and complete a force, lastAddConfirmed cannot be "lastAddPushedAfterSuspendedWrite"
+            // because the write has not yet been acknowledged by AckQuorumSize Bookies
+            CompletableFuture<?> forceResult = wh.force();
+            assertEquals(-1, wh.getLastAddConfirmed());
+
+            // send an entry and receive ack
+            long lastEntry = wh.append(DATA);
+
+            // receive the ack for forceLedger
+            resumeBookieWriteAcks(bookieAddress);
+            result(forceResult);
+
+            // now LastAddConfirmed will be equals to the last confirmed entry
+            // before force() started
+            assertEquals(lastEntryIdBeforeSuspend, wh.getLastAddConfirmed());
+
+            result(wh.force());
+            assertEquals(lastEntry, wh.getLastAddConfirmed());
+        }
+    }
+
+    @Test
+    public void testForbiddenEnsembleChange() throws Exception {
+        try (WriteHandle wh = result(newCreateLedgerOp()
+                .withEnsembleSize(1)
+                .withWriteQuorumSize(1)
+                .withAckQuorumSize(1)
+                .withPassword(PASSWORD)
+                .withWriteFlags(WriteFlag.DEFERRED_SYNC)
+                .execute())) {
+            for (int i = 0; i < NUM_ENTRIES - 1; i++) {
+                wh.append(DATA);
+            }
+
+            assertEquals(1, availableBookies.size());
+            // kill the only bookie in the ensamble
+            killBookie(wh.getLedgerMetadata().getEnsembleAt(wh.getLastAddPushed()).get(0));
+            assertEquals(0, availableBookies.size());
+            startNewBookie();
+            assertEquals(1, availableBookies.size());
+
+            try {
+                // we cannot switch to the new bookie with DEFERRED_SYNC
+                wh.append(DATA);
+                fail("since ensemble change is disable we cannot be able to write any more");
+            } catch (BKException.BKWriteException ex) {
+                // expected
+            }
             LedgerHandle lh = (LedgerHandle) wh;
-            assertEquals(NUM_ENTRIES - 1, lh.getLastAddPushed());
-            assertEquals(-1, lh.getLastAddConfirmed());
+            assertTrue(lh.getDelayedWriteFailedBookies().isEmpty());
         }
     }
 
-}
\ No newline at end of file
+    @Test(expected = BKException.BKLedgerClosedException.class)
+    public void testCannotIssueForceOnClosedLedgerHandle() throws Exception {
+        WriteHandle wh = result(newCreateLedgerOp()
+                .withEnsembleSize(1)
+                .withWriteQuorumSize(1)
+                .withAckQuorumSize(1)
+                .withPassword(PASSWORD)
+                .withWriteFlags(WriteFlag.DEFERRED_SYNC)
+                .execute());
+        wh.close();
+        result(wh.force());
+    }
+
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ExplicitLacTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ExplicitLacTest.java
index fd661c2..256f051 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ExplicitLacTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ExplicitLacTest.java
@@ -29,6 +29,7 @@ import org.apache.bookkeeper.bookie.LedgerStorage;
 import org.apache.bookkeeper.bookie.SortedLedgerStorage;
 import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.api.WriteFlag;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.util.TestUtils;
@@ -190,5 +191,85 @@ public class ExplicitLacTest extends BookKeeperClusterTestCase {
         bkcWithExplicitLAC.close();
     }
 
+    @Test
+    public void testReadHandleWithExplicitLACAndDeferredSync() throws Exception {
+        ClientConfiguration confWithExplicitLAC = new ClientConfiguration();
+        confWithExplicitLAC.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+        int explicitLacIntervalMillis = 1000;
+        confWithExplicitLAC.setExplictLacInterval(explicitLacIntervalMillis);
+
+        BookKeeper bkcWithExplicitLAC = new BookKeeper(confWithExplicitLAC);
+
+        LedgerHandle wlh = (LedgerHandle) bkcWithExplicitLAC.newCreateLedgerOp()
+                .withEnsembleSize(1)
+                .withWriteQuorumSize(1)
+                .withAckQuorumSize(1)
+                .withWriteFlags(WriteFlag.DEFERRED_SYNC)
+                .withDigestType(digestType.toApiDigestType())
+                .withPassword("testPasswd".getBytes())
+                .execute()
+                .get();
+        long ledgerId = wlh.getId();
+
+        // start like testReadHandleWithExplicitLAC
+        int numOfEntries = 5;
+        for (int i = 0; i < numOfEntries; i++) {
+            // if you perform force() + addEntry() you will piggy back LAC as usual
+            wlh.force().get();
+            wlh.addEntry(("foobar" + i).getBytes());
+        }
+
+        LedgerHandle rlh = bkcWithExplicitLAC.openLedgerNoRecovery(ledgerId, digestType, "testPasswd".getBytes());
+
+        assertTrue(
+                "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(),
+                (rlh.getLastAddConfirmed() == (numOfEntries - 2)));
+
+        for (int i = numOfEntries; i < 2 * numOfEntries; i++) {
+            wlh.addEntry(("foobar" + i).getBytes());
+        }
+
+        // running a force() will update local LAC on the writer
+        // ExplicitLAC timer will send the value even without writes
+        wlh.force().get();
+
+        // wait for explicit lac to be sent to bookies
+        TestUtils.waitUntilExplicitLacUpdated(rlh, 2 * numOfEntries - 2);
+
+        // we need to wait for atleast 2 explicitlacintervals,
+        // since in writehandle for the first call
+        // lh.getExplicitLastAddConfirmed() will be <
+        // lh.getPiggyBackedLastAddConfirmed(),
+        // so it wont make explicit writelac in the first run
+        TestUtils.waitUntilLacUpdated(rlh, 2 * numOfEntries - 2);
+
+        assertTrue(
+                "Expected LAC of wlh: " + (2 * numOfEntries - 1) + " actual LAC of wlh: " + wlh.getLastAddConfirmed(),
+                (wlh.getLastAddConfirmed() == (2 * numOfEntries - 1)));
+
+        long explicitlac = TestUtils.waitUntilExplicitLacUpdated(rlh, 2 * numOfEntries - 1);
+        assertTrue("Expected Explicit LAC of rlh: " + (2 * numOfEntries - 1)
+                + " actual ExplicitLAC of rlh: " + explicitlac,
+                (explicitlac == (2 * numOfEntries - 1)));
+        // readExplicitLastConfirmed updates the lac of rlh.
+        assertTrue(
+                "Expected LAC of rlh: " + (2 * numOfEntries - 1) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(),
+                (rlh.getLastAddConfirmed() == (2 * numOfEntries - 1)));
+
+        Enumeration<LedgerEntry> entries = rlh.readEntries(numOfEntries, 2 * numOfEntries - 1);
+        int entryId = numOfEntries;
+        while (entries.hasMoreElements()) {
+            LedgerEntry entry = entries.nextElement();
+            String entryString = new String(entry.getEntry());
+            assertTrue("Expected entry String: " + ("foobar" + entryId) + " actual entry String: " + entryString,
+                    entryString.equals("foobar" + entryId));
+            entryId++;
+        }
+
+        rlh.close();
+        wlh.close();
+        bkcWithExplicitLAC.close();
+    }
+
 
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
index b853614..6e1557a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
@@ -17,6 +17,7 @@
  */
 package org.apache.bookkeeper.client;
 
+import static com.google.common.base.Preconditions.checkState;
 import static org.apache.bookkeeper.client.api.BKException.Code.NoBookieAvailableException;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
@@ -36,6 +37,7 @@ import io.netty.buffer.Unpooled;
 
 import java.security.GeneralSecurityException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
@@ -44,7 +46,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -95,6 +97,9 @@ public abstract class MockBookKeeperTestCase {
     protected ConcurrentSkipListSet<Long> fencedLedgers;
     protected ConcurrentMap<Long, Map<BookieSocketAddress, Map<Long, MockEntry>>> mockLedgerData;
 
+    private Map<BookieSocketAddress, List<Runnable>> deferredBookieForceLedgerResponses;
+    private Set<BookieSocketAddress> suspendedBookiesForForceLedgerAcks;
+
     List<BookieSocketAddress> failedBookies;
     Set<BookieSocketAddress> availableBookies;
     private int lastIndexForBK;
@@ -129,6 +134,8 @@ public abstract class MockBookKeeperTestCase {
 
     @Before
     public void setup() throws Exception {
+        deferredBookieForceLedgerResponses = new ConcurrentHashMap<>();
+        suspendedBookiesForForceLedgerAcks = Collections.synchronizedSet(new HashSet<>());
         mockLedgerMetadataRegistry = new ConcurrentHashMap<>();
         mockLedgerData = new ConcurrentHashMap<>();
         mockNextLedgerId = new AtomicLong(1);
@@ -174,6 +181,7 @@ public abstract class MockBookKeeperTestCase {
         setupBookieWatcherForEnsembleChange();
         setupBookieClientReadEntry();
         setupBookieClientAddEntry();
+        setupBookieClientForceLedger();
     }
 
     protected void mockBookKeeperGetConf(ClientConfiguration conf) {
@@ -235,6 +243,25 @@ public abstract class MockBookKeeperTestCase {
         availableBookies.remove(killedBookieSocketAddress);
     }
 
+    protected void startKilledBookie(BookieSocketAddress killedBookieSocketAddress) {
+        checkState(failedBookies.contains(killedBookieSocketAddress));
+        checkState(!availableBookies.contains(killedBookieSocketAddress));
+        failedBookies.remove(killedBookieSocketAddress);
+        availableBookies.add(killedBookieSocketAddress);
+    }
+
+    protected void suspendBookieForceLedgerAcks(BookieSocketAddress address) {
+        suspendedBookiesForForceLedgerAcks.add(address);
+    }
+
+    protected void resumeBookieWriteAcks(BookieSocketAddress address) {
+        suspendedBookiesForForceLedgerAcks.remove(address);
+        List<Runnable> pendingResponses = deferredBookieForceLedgerResponses.remove(address);
+        if (pendingResponses != null) {
+            pendingResponses.forEach(Runnable::run);
+        }
+    }
+
     protected BookieSocketAddress startNewBookie() {
         BookieSocketAddress address = generateBookieSocketAddress(lastIndexForBK++);
         availableBookies.add(address);
@@ -287,13 +314,6 @@ public abstract class MockBookKeeperTestCase {
                     }
                 });
     }
-    private void submit(Runnable operation) {
-        try {
-            scheduler.submit(operation);
-        } catch (RejectedExecutionException rejected) {
-            operation.run();
-        }
-    }
 
     protected void registerMockEntryForRead(long ledgerId, long entryId, BookieSocketAddress bookieSocketAddress,
         byte[] entryData, long lastAddConfirmed) {
@@ -496,7 +516,8 @@ public abstract class MockBookKeeperTestCase {
                 try {
                     entry = extractEntryPayload(ledgerId, entryId, toSend);
                 } catch (BKDigestMatchException e) {
-                    callback.writeComplete(Code.DigestMatchException, ledgerId, entryId, bookieSocketAddress, ctx);
+                    callback.writeComplete(Code.DigestMatchException,
+                            ledgerId, entryId, bookieSocketAddress, ctx);
                     return;
                 }
                 boolean fenced = fencedLedgers.contains(ledgerId);
@@ -505,17 +526,20 @@ public abstract class MockBookKeeperTestCase {
                         ledgerId, entryId, bookieSocketAddress, ctx);
                 } else {
                     if (failedBookies.contains(bookieSocketAddress)) {
-                        callback.writeComplete(NoBookieAvailableException, ledgerId, entryId, bookieSocketAddress, ctx);
+                        callback.writeComplete(NoBookieAvailableException,
+                                ledgerId, entryId, bookieSocketAddress, ctx);
                         return;
                     }
                     if (getMockLedgerContentsInBookie(ledgerId, bookieSocketAddress).isEmpty()) {
-                            registerMockEntryForRead(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, bookieSocketAddress,
-                                    new byte[0], BookieProtocol.INVALID_ENTRY_ID);
+                            registerMockEntryForRead(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED,
+                                    bookieSocketAddress, new byte[0], BookieProtocol.INVALID_ENTRY_ID);
                     }
                     registerMockEntryForRead(ledgerId, entryId, bookieSocketAddress, entry, ledgerId);
-                    callback.writeComplete(BKException.Code.OK, ledgerId, entryId, bookieSocketAddress, ctx);
+                    callback.writeComplete(BKException.Code.OK,
+                            ledgerId, entryId, bookieSocketAddress, ctx);
                 }
             });
+
             return null;
         });
 
@@ -526,4 +550,39 @@ public abstract class MockBookKeeperTestCase {
                 any(), anyInt(), anyBoolean(), any(EnumSet.class));
     }
 
+    @SuppressWarnings("unchecked")
+    protected void setupBookieClientForceLedger() {
+        final Stubber stub = doAnswer(invokation -> {
+            Object[] args = invokation.getArguments();
+            BookieSocketAddress bookieSocketAddress = (BookieSocketAddress) args[0];
+            long ledgerId = (Long) args[1];
+            BookkeeperInternalCallbacks.ForceLedgerCallback callback =
+                    (BookkeeperInternalCallbacks.ForceLedgerCallback) args[2];
+            Object ctx = args[3];
+
+            Runnable activity = () -> {
+                executor.executeOrdered(ledgerId, () -> {
+                    if (failedBookies.contains(bookieSocketAddress)) {
+                        callback.forceLedgerComplete(NoBookieAvailableException, ledgerId, bookieSocketAddress, ctx);
+                        return;
+                    }
+                    callback.forceLedgerComplete(BKException.Code.OK, ledgerId, bookieSocketAddress, ctx);
+                });
+            };
+            if (suspendedBookiesForForceLedgerAcks.contains(bookieSocketAddress)) {
+                List<Runnable> queue = deferredBookieForceLedgerResponses.computeIfAbsent(bookieSocketAddress,
+                        (k) -> new CopyOnWriteArrayList<>());
+                queue.add(activity);
+            } else {
+                activity.run();
+            }
+            return null;
+        });
+
+        stub.when(bookieClient).forceLedger(any(BookieSocketAddress.class),
+                anyLong(),
+                any(BookkeeperInternalCallbacks.ForceLedgerCallback.class),
+                any());
+    }
+
 }

-- 
To stop receiving notification emails like this one, please contact
eolivelli@apache.org.