You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2017/11/16 16:15:34 UTC

[GitHub] eolivelli closed pull request #643: BP-14 part 2 - client side changes - OLD PATCH

eolivelli closed pull request #643: BP-14 part 2 - client side changes - OLD PATCH
URL: https://github.com/apache/bookkeeper/pull/643
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
index 9c52788eb..4358c5fc9 100644
--- a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
+++ b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
@@ -28,6 +28,7 @@
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.api.LedgerType;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieClient;
@@ -54,7 +55,7 @@
     static class LatencyCallback implements WriteCallback {
         boolean complete;
         @Override
-        public synchronized void writeComplete(int rc, long ledgerId, long entryId,
+        public synchronized void writeComplete(int rc, long ledgerId, long entryId, long lastAddsynced,
                 BookieSocketAddress addr, Object ctx) {
             if (rc != 0) {
                 LOG.error("Got error " + rc);
@@ -75,7 +76,7 @@ public synchronized void waitForComplete() throws InterruptedException {
     static class ThroughputCallback implements WriteCallback {
         int count;
         int waitingCount = Integer.MAX_VALUE;
-        public synchronized void writeComplete(int rc, long ledgerId, long entryId,
+        public synchronized void writeComplete(int rc, long ledgerId, long entryId, long lastAddsynced,
                 BookieSocketAddress addr, Object ctx) {
             if (rc != 0) {
                 LOG.error("Got error " + rc);
@@ -175,7 +176,7 @@ public static void main(String[] args)
             toSend.writeLong(entry);
             toSend.writerIndex(toSend.capacity());
             bc.addEntry(new BookieSocketAddress(addr, port), ledger, new byte[20],
-                        entry, toSend, tc, null, BookieProtocol.FLAG_NONE);
+                        entry, toSend, tc, null, BookieProtocol.FLAG_NONE, LedgerType.PD_JOURNAL);
         }
         LOG.info("Waiting for warmup");
         tc.waitFor(warmUpCount);
@@ -193,7 +194,7 @@ public static void main(String[] args)
             toSend.writerIndex(toSend.capacity());
             lc.resetComplete();
             bc.addEntry(new BookieSocketAddress(addr, port), ledger, new byte[20],
-                        entry, toSend, lc, null, BookieProtocol.FLAG_NONE);
+                        entry, toSend, lc, null, BookieProtocol.FLAG_NONE, LedgerType.PD_JOURNAL);
             lc.waitForComplete();
         }
         long endTime = System.nanoTime();
@@ -213,7 +214,7 @@ public static void main(String[] args)
             toSend.writeLong(entry);
             toSend.writerIndex(toSend.capacity());
             bc.addEntry(new BookieSocketAddress(addr, port), ledger, new byte[20],
-                        entry, toSend, tc, null, BookieProtocol.FLAG_NONE);
+                        entry, toSend, tc, null, BookieProtocol.FLAG_NONE, LedgerType.PD_JOURNAL);
         }
         tc.waitFor(entryCount);
         endTime = System.currentTimeMillis();
diff --git a/bookkeeper-server/pom.xml b/bookkeeper-server/pom.xml
index a7e70e9e8..004f83e31 100644
--- a/bookkeeper-server/pom.xml
+++ b/bookkeeper-server/pom.xml
@@ -378,6 +378,7 @@
         <version>0.5.0</version>
         <configuration>
           <protocArtifact>com.google.protobuf:protoc:3.4.0:exe:${os.detected.classifier}</protocArtifact>
+          <checkStaleness>true</checkStaleness>
         </configuration>
         <executions>
           <execution>
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 220aa4cb0..f694b4523 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -216,7 +216,7 @@ public long getEntry() {
     // Write Callback do nothing
     static class NopWriteCallback implements WriteCallback {
         @Override
-        public void writeComplete(int rc, long ledgerId, long entryId,
+        public void writeComplete(int rc, long ledgerId, long entryId, long lastAddSyncedEntry,
                                   BookieSocketAddress addr, Object ctx) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Finished writing entry {} @ ledger {} for {} : {}",
@@ -1491,7 +1491,7 @@ public void addEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterK
         SettableFuture<Boolean> result = SettableFuture.create();
 
         @Override
-        public void writeComplete(int rc, long ledgerId, long entryId,
+        public void writeComplete(int rc, long ledgerId, long entryId, long lastAddSyncedEntry,
                                   BookieSocketAddress addr, Object ctx) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Finished writing entry {} @ ledger {} for {} : {}",
@@ -1560,7 +1560,8 @@ public Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC,
         int count;
 
         @Override
-        public synchronized void writeComplete(int rc, long l, long e, BookieSocketAddress addr, Object ctx) {
+        public synchronized void writeComplete(int rc, long l, long e, long lastAddSyncedEntry,
+                                               BookieSocketAddress addr, Object ctx) {
             count--;
             if (count == 0) {
                 notifyAll();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 1f8aef400..074c5ab33 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -39,6 +39,7 @@
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -293,7 +294,9 @@ public void run() {
                 LOG.debug("Acknowledge Ledger: {}, Entry: {}", ledgerId, entryId);
             }
             journalAddEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueTime), TimeUnit.NANOSECONDS);
-            cb.writeComplete(0, ledgerId, entryId, null, ctx);
+            // we are using lastAddSyncedEntry = -1 as mock implementation, next commits will provide an implementation
+            final long lastSyncedEntryId = BookieProtocol.INVALID_ENTRY_ID;
+            cb.writeComplete(0, ledgerId, entryId, lastSyncedEntryId, null, ctx);
         }
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
index af55f6a73..1e568459d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
@@ -122,7 +122,7 @@ ByteBuf getExplicitLac() {
             result = logFenceResult = SettableFuture.create();
         }
         ByteBuf entry = createLedgerFenceEntry(ledgerId);
-        journal.logAddEntry(entry, (rc, ledgerId, entryId, addr, ctx) -> {
+        journal.logAddEntry(entry, (rc, ledgerId, entryId, lastAddSyncedEntry, addr, ctx) -> {
             LOG.debug("Record fenced state for ledger {} in journal with rc {}", ledgerId, rc);
             if (rc == 0) {
                 fenceEntryPersisted.compareAndSet(false, true);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 8716c3a2f..d64a44836 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -50,6 +50,7 @@
 import org.apache.bookkeeper.client.api.BookKeeperBuilder;
 import org.apache.bookkeeper.client.api.CreateBuilder;
 import org.apache.bookkeeper.client.api.DeleteBuilder;
+import org.apache.bookkeeper.client.api.LedgerType;
 import org.apache.bookkeeper.client.api.OpenBuilder;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.feature.Feature;
@@ -62,6 +63,7 @@
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.net.DNSToSwitchMapping;
 import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -101,6 +103,7 @@
     private final StatsLogger statsLogger;
     private OpStatsLogger createOpLogger;
     private OpStatsLogger openOpLogger;
+    private OpStatsLogger syncOpLogger;
     private OpStatsLogger deleteOpLogger;
     private OpStatsLogger recoverOpLogger;
     private OpStatsLogger readOpLogger;
@@ -622,6 +625,11 @@ boolean isReorderReadSequence() {
         return reorderReadSequence;
     }
 
+    @VisibleForTesting
+    boolean isDelayEnsembleChange() {
+        return delayEnsembleChange;
+    }
+
     /**
      * There are 2 digest types that can be used for verification. The CRC32 is
      * cheap to compute but does not protect against byzantine bookies (i.e., a
@@ -757,7 +765,7 @@ public void asyncCreateLedger(final int ensSize, final int writeQuorumSize, fina
                 return;
             }
             new LedgerCreateOp(BookKeeper.this, ensSize, writeQuorumSize,
-                               ackQuorumSize, digestType, passwd, cb, ctx, customMetadata)
+                               ackQuorumSize, digestType, passwd, cb, ctx, customMetadata, LedgerType.PD_JOURNAL)
                 .initiate();
         } finally {
             closeLock.readLock().unlock();
@@ -960,7 +968,8 @@ public void asyncCreateLedgerAdv(final int ensSize, final int writeQuorumSize, f
                 return;
             }
             new LedgerCreateOp(BookKeeper.this, ensSize, writeQuorumSize,
-                               ackQuorumSize, digestType, passwd, cb, ctx, customMetadata).initiateAdv((long)(-1));
+                               ackQuorumSize, digestType, passwd, cb, ctx, customMetadata, LedgerType.PD_JOURNAL)
+                .initiateAdv(BookieProtocol.INVALID_LEDGER_ID);
         } finally {
             closeLock.readLock().unlock();
         }
@@ -1068,7 +1077,8 @@ public void asyncCreateLedgerAdv(final long ledgerId,
                 return;
             }
             new LedgerCreateOp(BookKeeper.this, ensSize, writeQuorumSize,
-                               ackQuorumSize, digestType, passwd, cb, ctx, customMetadata).initiateAdv(ledgerId);
+                               ackQuorumSize, digestType, passwd, cb, ctx, customMetadata, LedgerType.PD_JOURNAL)
+                .initiateAdv(ledgerId);
         } finally {
             closeLock.readLock().unlock();
         }
@@ -1380,6 +1390,7 @@ private final void initOpLoggers(StatsLogger stats) {
         createOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.CREATE_OP);
         deleteOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.DELETE_OP);
         openOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.OPEN_OP);
+        syncOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.SYNC_OP);
         recoverOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.RECOVER_OP);
         readOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.READ_OP);
         readLacAndEntryOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.READ_LAST_CONFIRMED_AND_ENTRY);
@@ -1393,6 +1404,7 @@ private final void initOpLoggers(StatsLogger stats) {
 
     OpStatsLogger getCreateOpLogger() { return createOpLogger; }
     OpStatsLogger getOpenOpLogger() { return openOpLogger; }
+    OpStatsLogger getSyncOpLogger() { return syncOpLogger; }
     OpStatsLogger getDeleteOpLogger() { return deleteOpLogger; }
     OpStatsLogger getRecoverOpLogger() { return recoverOpLogger; }
     OpStatsLogger getReadOpLogger() { return readOpLogger; }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
index 091b920b1..cb2abbed2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
@@ -42,6 +42,7 @@
     public final static String READ_OP = "READ_ENTRY";
     public final static String WRITE_LAC_OP = "WRITE_LAC";
     public final static String READ_LAC_OP = "READ_LAC";
+    public final static String SYNC_OP = "SYNC";
     public final static String READ_LAST_CONFIRMED_AND_ENTRY = "READ_LAST_CONFIRMED_AND_ENTRY";
     public final static String READ_LAST_CONFIRMED_AND_ENTRY_RESPONSE = "READ_LAST_CONFIRMED_AND_ENTRY_RESPONSE";
     public final static String PENDING_ADDS = "NUM_PENDING_ADD";
@@ -55,6 +56,7 @@
 
     public final static String CHANNEL_READ_OP = "READ_ENTRY";
     public final static String CHANNEL_TIMEOUT_READ = "TIMEOUT_READ_ENTRY";
+    public final static String CHANNEL_SYNC = "SYNC";
     public final static String CHANNEL_ADD_OP = "ADD_ENTRY";
     public final static String CHANNEL_TIMEOUT_ADD = "TIMEOUT_ADD_ENTRY";
     public final static String CHANNEL_WRITE_LAC_OP = "WRITE_LAC";
@@ -62,6 +64,7 @@
     public final static String CHANNEL_READ_LAC_OP = "READ_LAC";
     public final static String CHANNEL_TIMEOUT_READ_LAC = "TIMEOUT_READ_LAC";
     public final static String TIMEOUT_GET_BOOKIE_INFO = "TIMEOUT_GET_BOOKIE_INFO";
+    public final static String TIMEOUT_SYNC = "TIMEOUT_SYNC";
     public final static String CHANNEL_START_TLS_OP = "START_TLS";
     public final static String CHANNEL_TIMEOUT_START_TLS_OP = "TIMEOUT_START_TLS";
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LastAddSyncedManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LastAddSyncedManager.java
new file mode 100644
index 000000000..1dfdd16b9
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LastAddSyncedManager.java
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Handles LastAddSynced on the client side
+ */
+class LastAddSyncedManager {
+
+    private final Map<Integer, Long> lastAddSyncedMap = new HashMap<>();
+    private final int writeQuorumSize;
+    private final int ackQuorumSize;
+
+    public LastAddSyncedManager(int writeQuorumSize, int ackQuorumSize) {
+        this.writeQuorumSize = writeQuorumSize;
+        this.ackQuorumSize = ackQuorumSize;
+    }
+
+    /**
+     * Save the lastAddSynced value for a given bookie
+     *
+     */
+    public void updateBookie(int bookieIndexHeardFrom, long lastAddSynced) {
+        lastAddSyncedMap.put(bookieIndexHeardFrom, lastAddSynced);
+    }
+
+    /**
+     * Estimates the LastAddConfirmed entry
+     *
+     * @return the estimated value, considering the lastAddSynced piggybacked value from each bookie
+     */
+    public long calculateCurrentLastAddSynced() {
+        /*
+                Sort the ensemble by its `LastAddSynced` in ascending order
+                LastAddConfirmed = max(ensemble[0..(write_quorum_size - ack_quorum_size)])
+         */
+        List<Long> sorted = new ArrayList<>(lastAddSyncedMap.values());
+        sorted.sort(Comparator.naturalOrder());
+        int maxIndex = writeQuorumSize - ackQuorumSize;
+        if (sorted.size() < maxIndex || sorted.size() < ackQuorumSize) {
+            return -1;
+        }
+        long max = -1;
+        for (int i = 0; i <= maxIndex; i++) {
+            long value = sorted.get(i);
+            if (max < value) {
+                max = value;
+            }
+        }
+        return max;
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
index 4cc62a06e..8536436fe 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
@@ -36,11 +36,13 @@
 import org.apache.bookkeeper.client.SyncCallbackUtils.SyncCreateCallback;
 import org.apache.bookkeeper.client.api.CreateAdvBuilder;
 import org.apache.bookkeeper.client.api.CreateBuilder;
+import org.apache.bookkeeper.client.api.LedgerType;
 import org.apache.bookkeeper.client.api.WriteAdvHandle;
 import org.apache.bookkeeper.client.api.WriteHandle;
 import org.apache.bookkeeper.meta.LedgerIdGenerator;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.DataFormats;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
 import org.slf4j.Logger;
@@ -62,6 +64,7 @@
     final byte[] passwd;
     final BookKeeper bk;
     final DigestType digestType;
+    final LedgerType ledgerType;
     final long startTime;
     final OpStatsLogger createOpLogger;
     boolean adv = false;
@@ -80,6 +83,8 @@
      *       ack quorum size
      * @param digestType
      *       digest type, either MAC or CRC32
+     * @param ledgerType
+     *       ledger type
      * @param passwd
      *       password
      * @param cb
@@ -91,13 +96,15 @@
      *       preserve the order(e.g. sortedMap) upon later retireval.
      */
     LedgerCreateOp(BookKeeper bk, int ensembleSize, int writeQuorumSize, int ackQuorumSize, DigestType digestType,
-            byte[] passwd, CreateCallback cb, Object ctx, final Map<String, byte[]> customMetadata) {
+            byte[] passwd, CreateCallback cb, Object ctx, final Map<String, byte[]> customMetadata, LedgerType ledgerType) {
         this.bk = bk;
-        this.metadata = new LedgerMetadata(ensembleSize, writeQuorumSize, ackQuorumSize, digestType, passwd, customMetadata);
+        this.metadata = new LedgerMetadata(ensembleSize, writeQuorumSize, ackQuorumSize, digestType, passwd,
+            customMetadata, ledgerType);
         this.digestType = digestType;
         this.passwd = passwd;
         this.cb = cb;
         this.ctx = ctx;
+        this.ledgerType = ledgerType;
         this.startTime = MathUtils.nowInNano();
         this.createOpLogger = bk.getCreateOpLogger();
     }
@@ -218,6 +225,7 @@ private void createComplete(int rc, LedgerHandle lh) {
         private byte[] builderPassword;
         private org.apache.bookkeeper.client.api.DigestType builderDigestType
             = org.apache.bookkeeper.client.api.DigestType.CRC32;
+        private LedgerType builderLedgerType = LedgerType.PD_JOURNAL;
         private Map<String, byte[]> builderCustomMetadata = Collections.emptyMap();
 
         CreateBuilderImpl(BookKeeper bk) {
@@ -261,15 +269,31 @@ public CreateBuilder withDigestType(org.apache.bookkeeper.client.api.DigestType
         }
 
         @Override
+        public CreateBuilder withLedgerType(LedgerType ledgerType) {
+            this.builderLedgerType = ledgerType;
+            return this;
+        }
+
+        @Override
         public CreateAdvBuilder makeAdv() {
             return new CreateAdvBuilderImpl(this);
         }
 
         private boolean validate() {
+            if (builderLedgerType == null) {
+                LOG.error("invalid null builderLedgerType");
+                return false;
+            }
+
             if (builderWriteQuorumSize > builderEnsembleSize) {
                 LOG.error("invalid writeQuorumSize {} > ensembleSize {}", builderWriteQuorumSize, builderEnsembleSize);
                 return false;
             }
+            if (builderLedgerType.equals(LedgerType.VD_JOURNAL) && builderWriteQuorumSize != builderEnsembleSize) {
+                LOG.error("invalid writeQuorumSize {} != ensembleSize {} for VD_JOURNAL ledger. Striping is not allowed",
+                    builderWriteQuorumSize, builderEnsembleSize);
+                return false;
+            }
 
             if (builderAckQuorumSize > builderWriteQuorumSize) {
                 LOG.error("invalid ackQuorumSize {} > writeQuorumSize {}", builderAckQuorumSize, builderWriteQuorumSize);
@@ -314,7 +338,7 @@ private void create(CreateCallback cb) {
             }
             LedgerCreateOp op = new LedgerCreateOp(bk, builderEnsembleSize,
                 builderWriteQuorumSize, builderAckQuorumSize, DigestType.fromApiDigestType(builderDigestType),
-                builderPassword, cb, null, builderCustomMetadata);
+                builderPassword, cb, null, builderCustomMetadata, builderLedgerType);
             ReentrantReadWriteLock closeLock = bk.getCloseLock();
             closeLock.readLock().lock();
             try {
@@ -371,7 +395,7 @@ private void create(CreateCallback cb) {
             LedgerCreateOp op = new LedgerCreateOp(parent.bk, parent.builderEnsembleSize,
                     parent.builderWriteQuorumSize, parent.builderAckQuorumSize,
                     DigestType.fromApiDigestType(parent.builderDigestType),
-                    parent.builderPassword, cb, null, parent.builderCustomMetadata);
+                    parent.builderPassword, cb, null, parent.builderCustomMetadata, parent.builderLedgerType);
             ReentrantReadWriteLock closeLock = parent.bk.getCloseLock();
             closeLock.readLock().lock();
             try {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
index 172f9ec5c..09525e1be 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
@@ -291,7 +291,7 @@ public void readComplete(int rc, LedgerHandle lh,
                         new WriteCallback() {
                             @Override
                             public void writeComplete(int rc, long ledgerId,
-                                    long entryId, BookieSocketAddress addr,
+                                    long entryId, long lastAddSyncedEntry, BookieSocketAddress addr,
                                     Object ctx) {
                                 if (rc != BKException.Code.OK) {
                                     LOG.error(
@@ -317,7 +317,7 @@ public void writeComplete(int rc, long ledgerId,
                                 ledgerFragmentEntryMcb.processResult(rc, null,
                                         null);
                             }
-                        }, null, BookieProtocol.FLAG_RECOVERY_ADD);
+                        }, null, BookieProtocol.FLAG_RECOVERY_ADD, lh.getLedgerType());
             }
         }, null);
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index bb7c9549f..416522941 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -22,6 +22,7 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
@@ -56,9 +57,8 @@
 import org.apache.bookkeeper.client.SyncCallbackUtils.SyncCloseCallback;
 import org.apache.bookkeeper.client.SyncCallbackUtils.SyncReadCallback;
 import org.apache.bookkeeper.client.SyncCallbackUtils.SyncReadLastConfirmedCallback;
-import org.apache.bookkeeper.client.api.WriteAdvHandle;
+import org.apache.bookkeeper.client.api.LedgerType;
 import org.apache.bookkeeper.client.api.WriteHandle;
-import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
@@ -84,15 +84,27 @@
     final BookKeeper bk;
     final long ledgerId;
     long lastAddPushed;
+    /**
+      * Last entryId which has been confirmed to be written durably to the bookies.
+      * This value is used by readers, the the LAC protocol
+      */
     volatile long lastAddConfirmed;
 
+     /**
+      * Next entryId which is expected to move forward during {@link #sendAddSuccessCallbacks() }. This is important
+      * in order to have an ordered sequence of addEntry ackknowledged to the writer
+      */
+     volatile long pendingAddsSequenceHead;
+
     long length;
     final DigestManager macManager;
     final DistributionSchedule distributionSchedule;
     final RateLimiter throttler;
+    final LastAddSyncedManager lastAddSyncedManager;
     final LoadingCache<BookieSocketAddress, Long> bookieFailureHistory;
     final boolean enableParallelRecoveryRead;
     final int recoveryReadBatchSize;
+    final LedgerType ledgerType;
 
     /**
      * Invalid entry id. This value is returned from methods which
@@ -124,6 +136,7 @@
             throws GeneralSecurityException, NumberFormatException {
         this.bk = bk;
         this.metadata = metadata;
+        this.ledgerType = metadata.getLedgerType();
         this.pendingAddOps = new ConcurrentLinkedQueue<PendingAddOp>();
         this.enableParallelRecoveryRead = bk.getConf().getEnableParallelRecoveryRead();
         this.recoveryReadBatchSize = bk.getConf().getRecoveryReadBatchSize();
@@ -136,6 +149,8 @@
             length = 0;
         }
 
+        this.pendingAddsSequenceHead = lastAddConfirmed + 1;
+
         this.ledgerId = ledgerId;
 
         if (bk.getConf().getThrottleValue() > 0) {
@@ -151,6 +166,7 @@
         this.ledgerKey = password.length > 0 ? MacDigestManager.genDigest("ledger", password) : emptyLedgerKey;
         distributionSchedule = new RoundRobinDistributionSchedule(
                 metadata.getWriteQuorumSize(), metadata.getAckQuorumSize(), metadata.getEnsembleSize());
+        this.lastAddSyncedManager = new LastAddSyncedManager(metadata.getWriteQuorumSize(), metadata.getAckQuorumSize());
         this.bookieFailureHistory = CacheBuilder.newBuilder()
             .expireAfterWrite(bk.getConf().getBookieFailureHistoryExpirationMSec(), TimeUnit.MILLISECONDS)
             .build(new CacheLoader<BookieSocketAddress, Long>() {
@@ -192,6 +208,15 @@ public long getId() {
     }
 
     /**
+     * The the type of the current ledger
+     *
+     * @return the type of ledger
+     */
+    public LedgerType getLedgerType() {
+        return ledgerType;
+    }
+
+    /**
      * Get the last confirmed entry id on this ledger. It reads
      * the local state of the ledger handle, which is different
      * from the readLastConfirmed call. In the case the ledger
@@ -939,6 +964,27 @@ public String toString() {
         }
     }
 
+    /**
+     * {@inheritDoc }
+     */
+    @Override
+    public CompletableFuture<Long> sync() {
+        CompletableFuture<Long> result = new CompletableFuture<>();
+        final PendingSyncOp op = new PendingSyncOp(this, result);
+        try {
+            bk.getMainWorkerPool().submit(new SafeRunnable() {
+                @Override
+                public void safeRun() {
+                    op.initiate();
+                }
+            });
+        } catch (RejectedExecutionException e) {
+            result.completeExceptionally(BKException.create(BKException.Code.InterruptedException));
+        }
+        return result;
+    }
+
+
     synchronized void updateLastConfirmed(long lac, long len) {
         if (lac > lastAddConfirmed) {
             lastAddConfirmed = lac;
@@ -1329,6 +1375,12 @@ void errorOutPendingAdds(int rc, List<PendingAddOp> ops) {
         }
     }
 
+    synchronized long syncCompleted() {
+        lastAddConfirmed = lastAddSyncedManager.calculateCurrentLastAddSynced();
+        LOG.info("syncCompleted lastAddConfirmed {}", lastAddConfirmed);
+        return lastAddConfirmed;
+    }
+
     void sendAddSuccessCallbacks() {
         // Start from the head of the queue and proceed while there are
         // entries that have had all their responses come back
@@ -1343,17 +1395,27 @@ void sendAddSuccessCallbacks() {
                 return;
             }
             // Check if it is the next entry in the sequence.
-            if (pendingAddOp.entryId != 0 && pendingAddOp.entryId != lastAddConfirmed + 1) {
+            if (pendingAddOp.entryId != 0 && pendingAddOp.entryId != pendingAddsSequenceHead) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Head of the queue entryId: {} is not lac: {} + 1", pendingAddOp.entryId,
-                            lastAddConfirmed);
+                    LOG.debug("Head of the queue entryId: {} is not the expected value: {}", pendingAddOp.entryId,
+                               pendingAddsSequenceHead);
                 }
                 return;
             }
 
-            pendingAddOps.remove();
+
+            PendingAddOp removed = pendingAddOps.remove();
+            Preconditions.checkState(removed == pendingAddOp, "removed unexpected entry %s, expected %s",
+            removed.entryId, pendingAddOp.entryId);
+
             explicitLacFlushPolicy.updatePiggyBackedLac(lastAddConfirmed);
-            lastAddConfirmed = pendingAddOp.entryId;
+            if (ledgerType.equals(LedgerType.VD_JOURNAL)) {
+                this.lastAddConfirmed = lastAddSyncedManager.calculateCurrentLastAddSynced();
+            } else {
+                this.lastAddConfirmed = Math.max(lastAddConfirmed, pendingAddOp.entryId);
+            }
+
+            pendingAddsSequenceHead = pendingAddOp.entryId + 1;
 
             pendingAddOp.submitCallback(BKException.Code.OK);
         }
@@ -1417,7 +1479,7 @@ EnsembleInfo replaceBookieInMetadata(final Map<Integer, BookieSocketAddress> fai
     void handleBookieFailure(final Map<Integer, BookieSocketAddress> failedBookies) {
         int curBlockAddCompletions = blockAddCompletions.incrementAndGet();
 
-        if (bk.disableEnsembleChangeFeature.isAvailable()) {
+        if (bk.disableEnsembleChangeFeature.isAvailable() || ledgerType == LedgerType.VD_JOURNAL) {
             blockAddCompletions.decrementAndGet();
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Ensemble change is disabled. Retry sending to failed bookies {} for ledger {}.",
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
index 5683105de..161cbb31e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
@@ -44,6 +44,8 @@
 
 import com.google.common.base.Optional;
 import com.google.common.collect.Maps;
+import org.apache.bookkeeper.client.api.LedgerType;
+import org.apache.bookkeeper.proto.DataFormats;
 
 /**
  * This class encapsulates all the ledger metadata that is persistently stored
@@ -84,12 +86,14 @@
 
     private boolean hasPassword = false;
     private LedgerMetadataFormat.DigestType digestType;
+    private DataFormats.LedgerType ledgerType;
     private byte[] password;
 
     private Map<String, byte[]> customMetadata = Maps.newHashMap();
 
     public LedgerMetadata(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
-                          BookKeeper.DigestType digestType, byte[] password, Map<String, byte[]> customMetadata) {
+                          BookKeeper.DigestType digestType, byte[] password, Map<String, byte[]> customMetadata,
+                          LedgerType ledgerType) {
         this.ensembleSize = ensembleSize;
         this.writeQuorumSize = writeQuorumSize;
         this.ackQuorumSize = ackQuorumSize;
@@ -106,6 +110,8 @@ public LedgerMetadata(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
 
         this.digestType = digestType.equals(BookKeeper.DigestType.MAC) ?
             LedgerMetadataFormat.DigestType.HMAC : LedgerMetadataFormat.DigestType.CRC32;
+        this.ledgerType = ledgerType.equals(LedgerType.VD_JOURNAL) ?
+            DataFormats.LedgerType.VD_JOURNAL : DataFormats.LedgerType.PD_JOURNAL;
         this.password = Arrays.copyOf(password, password.length);
         this.hasPassword = true;
         if (customMetadata != null) {
@@ -115,7 +121,7 @@ public LedgerMetadata(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
 
     public LedgerMetadata(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
             BookKeeper.DigestType digestType, byte[] password) {
-        this(ensembleSize, writeQuorumSize, ackQuorumSize, digestType, password, null);
+        this(ensembleSize, writeQuorumSize, ackQuorumSize, digestType, password, null, LedgerType.PD_JOURNAL);
     }
 
     /**
@@ -133,6 +139,7 @@ public LedgerMetadata(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
         this.version = other.version;
         this.hasPassword = other.hasPassword;
         this.digestType = other.digestType;
+        this.ledgerType = other.ledgerType;
         this.password = new byte[other.password.length];
         System.arraycopy(other.password, 0, this.password, 0, other.password.length);
         // copy the ensembles
@@ -207,6 +214,15 @@ boolean hasPassword() {
         }
     }
 
+    @VisibleForTesting
+    public LedgerType getLedgerType() {
+        if (ledgerType.equals(DataFormats.LedgerType.VD_JOURNAL)) {
+            return LedgerType.VD_JOURNAL;
+        } else {
+            return LedgerType.PD_JOURNAL;
+        }
+    }
+
     public long getLastEntryId() {
         return lastEntryId;
     }
@@ -300,6 +316,10 @@ void setCustomMetadata(Map<String, byte[]> customMetadata) {
             builder.setDigestType(digestType).setPassword(ByteString.copyFrom(password));
         }
 
+        if (!ledgerType.equals(DataFormats.LedgerType.PD_JOURNAL)) {
+            builder.setLedgerType(ledgerType);
+        }
+
         if (customMetadata != null) {
             LedgerMetadataFormat.cMetadataMapEntry.Builder cMetadataBuilder = LedgerMetadataFormat.cMetadataMapEntry.newBuilder();
             for (Map.Entry<String,byte[]> entry : customMetadata.entrySet()) {
@@ -436,6 +456,12 @@ public static LedgerMetadata parseConfig(byte[] bytes, Version version, Optional
             lc.hasPassword = true;
         }
 
+        if (data.hasLedgerType()) {
+            lc.ledgerType = data.getLedgerType();
+        } else {
+            lc.ledgerType = DataFormats.LedgerType.PD_JOURNAL;
+        }
+
         for (LedgerMetadataFormat.Segment s : data.getSegmentList()) {
             ArrayList<BookieSocketAddress> addrs = new ArrayList<BookieSocketAddress>();
             for (String member : s.getEnsembleMemberList()) {
@@ -460,6 +486,7 @@ static LedgerMetadata parseVersion1Config(LedgerMetadata lc,
             lc.writeQuorumSize = lc.ackQuorumSize = Integer.parseInt(reader.readLine());
             lc.ensembleSize = Integer.parseInt(reader.readLine());
             lc.length = Long.parseLong(reader.readLine());
+            lc.ledgerType = DataFormats.LedgerType.PD_JOURNAL;
 
             String line = reader.readLine();
             while (line != null) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
index df7c84ed4..e582041de 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
@@ -114,6 +114,7 @@ public void readLastConfirmedDataComplete(int rc, RecoveryData data) {
                             synchronized (lh) {
                                 lh.lastAddPushed = lh.lastAddConfirmed = data.lastAddConfirmed;
                                 lh.length = data.length;
+                                lh.pendingAddsSequenceHead = lh.lastAddConfirmed + 1;
                                 startEntryToRead = endEntryToRead = lh.lastAddConfirmed;
                             }
                             // keep a copy of ledger metadata before proceeding
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index af4f35e31..a7a38e924 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -103,7 +103,7 @@ void sendWriteRequest(int bookieIndex) {
         int flags = isRecoveryAdd ? BookieProtocol.FLAG_RECOVERY_ADD : BookieProtocol.FLAG_NONE;
 
         lh.bk.getBookieClient().addEntry(lh.metadata.currentEnsemble.get(bookieIndex), lh.ledgerId, lh.ledgerKey, entryId, toSend,
-                this, bookieIndex, flags);
+                this, bookieIndex, flags, lh.ledgerType);
     }
 
     @Override
@@ -198,7 +198,8 @@ void initiate(ByteBuf toSend, int entryLength) {
     }
 
     @Override
-    public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) {
+    public void writeComplete(int rc, long ledgerId, long entryId, long lastAddSyncedEntry,
+                              BookieSocketAddress addr, Object ctx) {        
         int bookieIndex = (Integer) ctx;
 
         if (!lh.metadata.currentEnsemble.get(bookieIndex).equals(addr)) {
@@ -213,6 +214,10 @@ public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddre
         boolean ackQuorum = false;
         if (BKException.Code.OK == rc) {
             ackQuorum = ackSet.completeBookieAndCheck(bookieIndex);
+            lh.lastAddSyncedManager.updateBookie(bookieIndex, lastAddSyncedEntry);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("writeComplete lastAddSyncedEntry {}", lastAddSyncedEntry);
+            }
         }
 
         if (completed) {
@@ -255,7 +260,7 @@ public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddre
             lh.handleUnrecoverableErrorDuringAdd(rc);
             return;
         default:
-            if (lh.bk.delayEnsembleChange) {
+            if (lh.bk.isDelayEnsembleChange()) {
                 if (ackSet.failBookieAndCheck(bookieIndex, addr) || rc == BKException.Code.WriteOnReadOnlyBookieException) {
                     Map<Integer, BookieSocketAddress> failedBookies = ackSet.getFailedBookies();
                     LOG.warn("Failed to write entry ({}, {}) to bookies {}, handling failures.",
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingSyncOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingSyncOp.java
new file mode 100644
index 000000000..404ce8243
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingSyncOp.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This represents a pending Sync operation. When it has got
+ * success from Ack Quorum bookies, sends success back to the application,
+ * otherwise failure is sent back to the caller.
+ *
+ */
+class PendingSyncOp implements BookkeeperInternalCallbacks.SyncCallback {
+    private final static Logger LOG = LoggerFactory.getLogger(PendingSyncOp.class);
+    final CompletableFuture<Long> cb;
+    final Set<Integer> writeSet;
+    final Set<Integer> receivedResponseSet;
+
+    final DistributionSchedule.AckSet ackSet;
+    boolean completed = false;
+    int lastSeenError = BKException.Code.WriteException;
+    final long lastAddPushed;
+
+    final LedgerHandle lh;
+    final OpStatsLogger syncOpLogger;
+
+    PendingSyncOp(LedgerHandle lh, CompletableFuture<Long> cb) {
+        this.lh = lh;
+        this.cb = cb;
+        this.lastAddPushed = lh.getLastAddPushed();
+        ackSet = lh.distributionSchedule.getAckSet();
+        syncOpLogger = lh.bk.getSyncOpLogger();
+        this.writeSet = new HashSet<>(lh.distributionSchedule.getWriteSet(lastAddPushed));
+        this.receivedResponseSet = new HashSet<>(writeSet);
+    }
+
+    void sendSyncRequest(int bookieIndex) {
+        lh.bk.getBookieClient().sync(lh.metadata.currentEnsemble.get(bookieIndex), lh.ledgerId,
+                                     this, bookieIndex);
+    }
+
+    void initiate() {
+        if (lastAddPushed == -1) {
+            cb.complete(-1L);
+            return;
+        }
+        for (int bookieIndex: writeSet) {
+            sendSyncRequest(bookieIndex);
+        }
+    }
+
+    @Override
+    public void syncComplete(int rc, long ledgerId, long lastSyncedEntryId, BookieSocketAddress addr, Object ctx) {
+        int bookieIndex = (Integer) ctx;
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("syncComplete {} {} {} {}", rc, ledgerId, lastSyncedEntryId, addr);
+        }
+
+        if (completed) {
+            return;
+        }
+
+        if (BKException.Code.OK != rc) {
+            lastSeenError = rc;
+        }
+
+        // We got response.
+        receivedResponseSet.remove(bookieIndex);
+
+        if (rc == BKException.Code.OK) {
+            if (ackSet.completeBookieAndCheck(bookieIndex) && !completed) {
+                lh.lastAddSyncedManager.updateBookie(bookieIndex, lastSyncedEntryId);
+                completed = true;
+                long actualLastAddConfirmed = lh.syncCompleted();
+                cb.complete(actualLastAddConfirmed);
+                return;
+            }
+        } else {
+            LOG.warn("Sync did not succeed: Ledger {} on {} code {}", new Object[] { ledgerId, addr, rc});
+        }
+
+        if (receivedResponseSet.isEmpty()){
+            completed = true;
+            cb.completeExceptionally(BKException.create(lastSeenError));
+        }
+    }
+}
\ No newline at end of file
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
index 755f93dbf..0be510a2c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
@@ -29,6 +29,7 @@
 import org.slf4j.LoggerFactory;
 
 import io.netty.buffer.ByteBuf;
+import org.apache.bookkeeper.proto.BookieProtocol;
 
 /**
  * This represents a pending WriteLac operation. When it has got
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/CreateBuilder.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/CreateBuilder.java
index 6d7d1ee4e..9f9c037e1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/CreateBuilder.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/CreateBuilder.java
@@ -88,6 +88,15 @@
     CreateBuilder withDigestType(DigestType digestType);
 
     /**
+     * Set the Type of ledger. It defaults to {@link LedgerType#PD_JOURNAL}
+     *
+     * @param ledgerType the type of ledger
+     *
+     * @return the builder itself
+     */
+    CreateBuilder withLedgerType(LedgerType ledgerType);
+
+    /**
      * Switch the ledger into 'Advanced' mode. A ledger used in Advanced mode will explicitly generate the sequence of
      * entry identifiers. Advanced ledgers can be created with a client side defined ledgerId
      *
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LedgerType.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LedgerType.java
new file mode 100644
index 000000000..51e232f87
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LedgerType.java
@@ -0,0 +1,46 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client.api;
+
+/**
+ * Describes the type of ledger.
+ * LedgerTypes describes the behaviour of the ledger in respect to durability and provides
+ * hints to the storage of data on Bookies
+ *
+ * @since 4.6
+ */
+public enum LedgerType {
+    /**
+     * Persistent Durability, using Journal.<br>
+     * Each entry is persisted to the journal and every writes receives and acknowledgement only with the guarantee that
+     * it has been persisted durabily to it (data is fsync'd to the disk)
+     */
+    PD_JOURNAL,
+    /**
+     * Volatile Durability, using Journal.<br>
+     * Each entry is persisted to the journal and writes receive acknowledgement without guarantees of persistence (data
+     * is eventually fsync'd to disk).<br>
+     * For this kind of ledgers the client MUST explicitly call {@link LedgerHandle#asyncSync(long, org.apache.bookkeeper.client.AsyncCallback.SyncCallback, java.lang.Object)
+     * }
+     * in order to have guarantees of the durability of writes and in order to advance the LastAddConfirmed entry id
+     */
+    VD_JOURNAL
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/SyncSupported.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/SyncSupported.java
new file mode 100644
index 000000000..934b52115
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/SyncSupported.java
@@ -0,0 +1,54 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client.api;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Marks Handles which support the 'sync' primitive
+ *
+ * @see WriteHandle
+ * @see WriteAdvHandle
+ *
+ * @since 4.6
+ */
+public interface SyncSupported {
+
+    /**
+     * Waits for all data written by this Handle to have been persisted durably on a quorum of bookies and advances
+     * the LastAddConfirmed client side pointer.
+     *
+     * In case of volatile durability ledgers, for instance {@link LedgerType#VD_JOURNAL}, this operation is
+     * required in order to let the LastAddConfirmed pointer to advance.
+     * <p>
+     * <b>Beware that closing a volatile durability ledger does not imply a sync operation</b>
+     * <p>
+     * Even without calling this primitive entry could be readable using {@link ReadHandle#readUnconfirmed(long, long) }
+     * function
+     *     
+     * @return an handle to the result, in case of success it will return the id of last persisted entry id
+     */
+    CompletableFuture<Long> sync();
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteAdvHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteAdvHandle.java
index 87ba49817..bed01c8b9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteAdvHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteAdvHandle.java
@@ -34,7 +34,7 @@
  *
  * @since 4.6
  */
-public interface WriteAdvHandle extends ReadHandle {
+public interface WriteAdvHandle extends ReadHandle, SyncSupported {
 
     /**
      * Add entry asynchronously to an open ledger.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteHandle.java
index 47e1f9cd1..aba513887 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteHandle.java
@@ -32,7 +32,7 @@
  *
  * @since 4.6
  */
-public interface WriteHandle extends ReadHandle {
+public interface WriteHandle extends ReadHandle, SyncSupported {
 
     /**
      * Add entry asynchronously to an open ledger.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index d763f5768..19c85545a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -62,6 +62,7 @@
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
+import org.apache.bookkeeper.client.api.LedgerType;
 
 /**
  * Implements the client-side part of the BookKeeper protocol.
@@ -224,7 +225,9 @@ private void completeAdd(final int rc,
             executor.submitOrdered(ledgerId, new SafeRunnable() {
                 @Override
                 public void safeRun() {
-                    cb.writeComplete(rc, ledgerId, entryId, addr, ctx);
+                    // we are using lastAddSyncedEntry = -1 as mock implementation, next commits will provide an implementation
+                    long lastAddSyncedEntry = BookieProtocol.INVALID_ENTRY_ID;
+                    cb.writeComplete(rc, ledgerId, entryId, lastAddSyncedEntry, addr, ctx);
                 }
                 @Override
                 public String toString() {
@@ -232,7 +235,35 @@ public String toString() {
                 }
             });
         } catch (RejectedExecutionException ree) {
-            cb.writeComplete(getRc(BKException.Code.InterruptedException), ledgerId, entryId, addr, ctx);
+            cb.writeComplete(getRc(BKException.Code.InterruptedException), ledgerId, entryId,
+                             BookieProtocol.INVALID_ENTRY_ID, addr, ctx);
+        }
+    }
+
+    public void sync(final BookieSocketAddress addr,
+                         final long ledgerId,
+                         final BookkeeperInternalCallbacks.SyncCallback cb,
+                         final Object ctx) {
+        closeLock.readLock().lock();
+        try {
+            final PerChannelBookieClientPool client = lookupClient(addr, null);
+            if (client == null) {
+                cb.syncComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
+                                ledgerId, BookieProtocol.INVALID_ENTRY_ID, addr, ctx);
+                return;
+            }
+            client.obtain(new GenericCallback<PerChannelBookieClient>() {
+                @Override
+                public void operationComplete(final int rc, PerChannelBookieClient pcbc) {
+                    if (rc != BKException.Code.OK) {
+                        cb.syncComplete(rc, ledgerId, BookieProtocol.INVALID_ENTRY_ID, addr, ctx);
+                    } else {
+                        pcbc.sync(ledgerId,  cb, ctx);
+                    }                    
+                }
+            }, ledgerId);
+        } finally {
+            closeLock.readLock().unlock();
         }
     }
 
@@ -243,7 +274,8 @@ public void addEntry(final BookieSocketAddress addr,
                          final ByteBuf toSend,
                          final WriteCallback cb,
                          final Object ctx,
-                         final int options) {
+                         final int options,
+                         final LedgerType ledgerType) {
         closeLock.readLock().lock();
         try {
             final PerChannelBookieClientPool client = lookupClient(addr, entryId);
@@ -263,7 +295,7 @@ public void operationComplete(final int rc, PerChannelBookieClient pcbc) {
                     if (rc != BKException.Code.OK) {
                         completeAdd(rc, ledgerId, entryId, addr, cb, ctx);
                     } else {
-                        pcbc.addEntry(ledgerId, masterKey, entryId, toSend, cb, ctx, options);
+                        pcbc.addEntry(ledgerId, masterKey, entryId, toSend, cb, ctx, options, ledgerType);
                     }
                     toSend.release();
                 }
@@ -510,7 +542,8 @@ public static void main(String[] args) throws NumberFormatException, IOException
         }
         WriteCallback cb = new WriteCallback() {
 
-            public void writeComplete(int rc, long ledger, long entry, BookieSocketAddress addr, Object ctx) {
+            public void writeComplete(int rc, long ledger, long entry, long lastAddSyncedEntry,
+                                      BookieSocketAddress addr, Object ctx) {
                 Counter counter = (Counter) ctx;
                 counter.dec();
                 if (rc != 0) {
@@ -531,7 +564,7 @@ public void writeComplete(int rc, long ledger, long entry, BookieSocketAddress a
 
         for (int i = 0; i < 100000; i++) {
             counter.inc();
-            bc.addEntry(addr, ledger, new byte[0], i, Unpooled.wrappedBuffer(hello), cb, counter, 0);
+            bc.addEntry(addr, ledger, new byte[0], i, Unpooled.wrappedBuffer(hello), cb, counter, 0, LedgerType.PD_JOURNAL);
         }
         counter.wait(0);
         System.out.println("Total = " + counter.total());
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
index 99c33fbeb..dd0a7e192 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
@@ -45,11 +45,16 @@
     public static final byte CURRENT_PROTOCOL_VERSION = 2;
 
     /**
-     * Entry Entry ID. To be used when no valid entry id can be assigned.
+     * Invalid Entry ID. To be used when no valid entry id can be assigned.
      */
     public static final long INVALID_ENTRY_ID = -1;
 
     /**
+     * Invalid Ledger ID. To be used when no valid ledger id can be assigned.
+     */
+    public static final long INVALID_LEDGER_ID = -1;
+
+    /**
      * Entry identifier representing a request to obtain the last add entry confirmed
      */
     public static final long LAST_ADD_CONFIRMED = -1;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
index ab160ce81..f9eea5f59 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
@@ -68,7 +68,8 @@
     }
 
     public interface WriteCallback {
-        void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx);
+        void writeComplete(int rc, long ledgerId, long entryId, long lastAddSyncedEntry,
+                           BookieSocketAddress addr, Object ctx);
     }
 
     public interface ReadLacCallback {
@@ -128,6 +129,14 @@ public void operationComplete(int rc, T result) {
     }
 
     /**
+     * Declaration of a callback implementation for calls from BookieClient objects.
+     * Such calls are for replies of sync operations
+     */
+    public interface SyncCallback {
+        void syncComplete(int rc, long ledgerId, long lastSyncedEntryId, BookieSocketAddress addr, Object ctx);
+    }
+
+    /**
      * Listener on entries responded.
      */
     public interface ReadEntryListener {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index dcc58f274..93f9c9df7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -19,16 +19,12 @@
 
 import static org.apache.bookkeeper.client.LedgerHandle.INVALID_ENTRY_ID;
 
-import com.google.common.collect.Sets;
 import com.google.common.base.Joiner;
 import com.google.protobuf.ByteString;
-import com.google.protobuf.ExtensionRegistry;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
-import io.netty.buffer.UnpooledByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
@@ -57,7 +53,6 @@
 import io.netty.util.concurrent.GenericFutureListener;
 
 import java.io.IOException;
-import java.net.SocketAddress;
 import java.nio.channels.ClosedChannelException;
 import java.util.ArrayDeque;
 import java.util.Collections;
@@ -69,7 +64,6 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.bookkeeper.auth.BookKeeperPrincipal;
 import org.apache.bookkeeper.auth.ClientAuthProvider;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeperClientStats;
@@ -116,7 +110,6 @@
 import com.google.protobuf.ExtensionRegistry;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.UnpooledByteBufAllocator;
-import java.net.SocketAddress;
 
 import java.net.SocketAddress;
 import java.security.cert.Certificate;
@@ -125,6 +118,10 @@
 import java.util.List;
 import javax.net.ssl.SSLPeerUnverifiedException;
 import org.apache.bookkeeper.auth.BookKeeperPrincipal;
+import org.apache.bookkeeper.client.api.LedgerType;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.SyncCallback;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.SyncRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.SyncResponse;
 
 /**
  * This class manages all details of connection to a particular bookie. It also
@@ -170,6 +167,8 @@
     private final OpStatsLogger readLacTimeoutOpLogger;
     private final OpStatsLogger getBookieInfoOpLogger;
     private final OpStatsLogger getBookieInfoTimeoutOpLogger;
+    private final OpStatsLogger syncOpLogger;
+    private final OpStatsLogger syncTimeoutOpLogger;
     private final OpStatsLogger startTLSOpLogger;
     private final OpStatsLogger startTLSTimeoutOpLogger;
 
@@ -266,10 +265,12 @@ public PerChannelBookieClient(ClientConfiguration conf, OrderedSafeExecutor exec
         readLacOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_READ_LAC_OP);
         getBookieInfoOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.GET_BOOKIE_INFO_OP);
         readTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_READ);
+        syncOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_SYNC);
         addTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_ADD);
         writeLacTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_WRITE_LAC);
         readLacTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_READ_LAC);
         getBookieInfoTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.TIMEOUT_GET_BOOKIE_INFO);
+        syncTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.TIMEOUT_SYNC);
         startTLSOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_START_TLS_OP);
         startTLSTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_START_TLS_OP);
 
@@ -477,6 +478,29 @@ void connectIfNeededAndDoOp(GenericCallback<PerChannelBookieClient> op) {
 
     }
 
+    void sync(final long ledgerId,
+              BookkeeperInternalCallbacks.SyncCallback cb, final Object ctx) {
+        final long txnId = getTxnId();
+        final CompletionKey completionKey = new V3CompletionKey(txnId,
+                                                                OperationType.SYNC);
+        completionObjects.put(completionKey,
+                              new SyncCompletion(completionKey, cb, ledgerId, ctx));
+
+        // Build the request
+        BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
+                .setVersion(ProtocolVersion.VERSION_THREE)
+                .setOperation(OperationType.WRITE_LAC)
+                .setTxnId(txnId);
+        SyncRequest.Builder syncBuilder = SyncRequest.newBuilder()
+                .setLedgerId(ledgerId);
+
+        final Request syncRequest = Request.newBuilder()
+                .setHeader(headerBuilder)
+                .setSyncRequest(syncBuilder)
+                .build();
+        writeAndFlush(channel, completionKey, syncRequest);
+    }
+
     void writeLac(final long ledgerId, final byte[] masterKey, final long lac, ByteBuf toSend, WriteLacCallback cb,
             Object ctx) {
         final long txnId = getTxnId();
@@ -525,10 +549,18 @@ void writeLac(final long ledgerId, final byte[] masterKey, final long lac, ByteB
      *          Add options
      */
     void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBuf toSend, WriteCallback cb,
-                  Object ctx, final int options) {
+                  Object ctx, final int options, final LedgerType ledgerType) {
         Object request = null;
         CompletionKey completionKey = null;
         if (useV2WireProtocol) {
+            if (!ledgerType.equals(LedgerType.PD_JOURNAL)) {
+                // V2 protocol supports only PD_JOURNAL ledgers
+                LOG.error("invalid ledger type {} for v2 protocol", ledgerType);
+                toSend.release();
+                cb.writeComplete(BKException.Code.IllegalOpException, ledgerId, entryId, BookieProtocol.INVALID_ENTRY_ID,
+                    addr, ctx);
+                return;
+            }
             completionKey = new V2CompletionKey(ledgerId, entryId, OperationType.ADD_ENTRY);
             request = new BookieProtocol.AddRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId,
                     (short) options, masterKey, toSend);
@@ -553,6 +585,18 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBuf
                 addBuilder.setFlag(AddRequest.Flag.RECOVERY_ADD);
             }
 
+            // do not write PD_JOURNAL in order to support writes to old bookies for PD_JOURNAL ledger type
+            if (!ledgerType.equals(LedgerType.PD_JOURNAL)) {
+                if (ledgerType.equals(LedgerType.VD_JOURNAL)) {
+                    addBuilder.setLedgerType(DataFormats.LedgerType.VD_JOURNAL);
+                } else {
+                    LOG.error("invalid ledger type {}", ledgerType);
+                    cb.writeComplete(BKException.Code.IllegalOpException, ledgerId, entryId, BookieProtocol.INVALID_ENTRY_ID,
+                    addr, ctx);
+                    return;
+                }
+            }
+
             request = Request.newBuilder()
                     .setHeader(headerBuilder)
                     .setAddRequest(addBuilder)
@@ -1324,6 +1368,52 @@ public void handleV3Response(BookkeeperProtocol.Response response) {
         }
     }
 
+    private class SyncCompletion extends CompletionValue {
+        final SyncCallback cb;
+
+        public SyncCompletion(CompletionKey key,
+                                  final SyncCallback originalCallback,
+                                  final long ledgerId,
+                                  final Object originalCtx) {
+            super("Sync",
+                  originalCtx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED,
+                  syncOpLogger, syncTimeoutOpLogger,
+                  scheduleTimeout(key, addEntryTimeout));
+            this.cb = new SyncCallback() {
+                @Override
+                public void syncComplete(int rc, long ledgerId, long lastSyncedEntryId, BookieSocketAddress addr, Object ctx) {
+                      cancelTimeoutAndLogOp(rc);
+                      originalCallback.syncComplete(rc, ledgerId, lastSyncedEntryId, addr, originalCtx);
+                };
+            };
+        }
+
+        @Override
+        public void errorOut() {
+            errorOut(BKException.Code.BookieHandleNotAvailableException);
+        }
+
+        @Override
+        public void errorOut(final int rc) {
+            errorOutAndRunCallback(
+                    () -> cb.syncComplete(rc, ledgerId, BookieProtocol.INVALID_ENTRY_ID, addr, ctx));
+        }
+
+        @Override
+        public void handleV3Response(BookkeeperProtocol.Response response) {
+            SyncResponse syncResponse = response.getSyncResponse();
+            StatusCode status = response.getStatus() == StatusCode.EOK ?
+                syncResponse.getStatus() : response.getStatus();
+            long ledgerId = syncResponse.getLedgerId();
+            long lastSynchedEntryId = syncResponse.getLastAddSynced();
+
+            int rc = logAndConvertStatus(status,
+                                         BKException.Code.WriteException,
+                                         "ledger", ledgerId);
+            cb.syncComplete(rc, ledgerId, lastSynchedEntryId, addr, ctx);
+        }
+    }
+
     // visible for testing
     class ReadLacCompletion extends CompletionValue {
         final ReadLacCallback cb;
@@ -1597,10 +1687,10 @@ public AddCompletion(CompletionKey key,
             this.cb = new WriteCallback() {
                 @Override
                 public void writeComplete(int rc, long ledgerId, long entryId,
-                                          BookieSocketAddress addr,
+                                          long lastAddSyncedEntry, BookieSocketAddress addr,
                                           Object ctx) {
                     cancelTimeoutAndLogOp(rc);
-                    originalCallback.writeComplete(rc, ledgerId, entryId,
+                    originalCallback.writeComplete(rc, ledgerId, entryId, lastAddSyncedEntry,
                                                    addr, originalCtx);
                 }
             };
@@ -1614,14 +1704,15 @@ public void errorOut() {
         @Override
         public void errorOut(final int rc) {
             errorOutAndRunCallback(
-                    () -> cb.writeComplete(rc, ledgerId, entryId, addr, ctx));
+                    () -> cb.writeComplete(rc, ledgerId, entryId, BookieProtocol.INVALID_ENTRY_ID,
+                                           addr, ctx));
         }
 
         @Override
         public void handleV2Response(
                 long ledgerId, long entryId, StatusCode status,
                 BookieProtocol.Response response) {
-            handleResponse(ledgerId, entryId, status);
+            handleResponse(ledgerId, entryId, BookieProtocol.INVALID_ENTRY_ID, status);
         }
 
         @Override
@@ -1630,17 +1721,19 @@ public void handleV3Response(
             AddResponse addResponse = response.getAddResponse();
             StatusCode status = response.getStatus() == StatusCode.EOK
                 ? addResponse.getStatus() : response.getStatus();
+            long lastAddSyncedEntry = addResponse.hasLastAddSynced() ?
+                                        addResponse.getLastAddSynced() : BookieProtocol.INVALID_ENTRY_ID;
             handleResponse(addResponse.getLedgerId(), addResponse.getEntryId(),
-                           status);
+                           lastAddSyncedEntry, status);
         }
 
-        private void handleResponse(long ledgerId, long entryId,
+        private void handleResponse(long ledgerId, long entryId, long lastAddSyncedEntry,
                                     StatusCode status) {
             int rc = logAndConvertStatus(status,
                                          BKException.Code.WriteException,
                                          "ledger", ledgerId,
                                          "entry", entryId);
-            cb.writeComplete(rc, ledgerId, entryId, addr, ctx);
+            cb.writeComplete(rc, ledgerId, entryId, lastAddSyncedEntry, addr, ctx);
         }
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
index 827aed986..5a3e12270 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
@@ -89,7 +89,7 @@ protected void processPacket() {
     }
 
     @Override
-    public void writeComplete(int rc, long ledgerId, long entryId,
+    public void writeComplete(int rc, long ledgerId, long entryId, long lastAddSyncedEntry,
                               BookieSocketAddress addr, Object ctx) {
         if (BookieProtocol.EOK == rc) {
             requestProcessor.addEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
index b4e89f8c5..3a0f80dfe 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
@@ -70,7 +70,7 @@ private AddResponse getAddResponse() {
 
         BookkeeperInternalCallbacks.WriteCallback wcb = new BookkeeperInternalCallbacks.WriteCallback() {
             @Override
-            public void writeComplete(int rc, long ledgerId, long entryId,
+            public void writeComplete(int rc, long ledgerId, long entryId, long lastAddSyncedEntry,
                                       BookieSocketAddress addr, Object ctx) {
                 if (BookieProtocol.EOK == rc) {
                     requestProcessor.addEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
@@ -80,6 +80,10 @@ public void writeComplete(int rc, long ledgerId, long entryId,
                             TimeUnit.NANOSECONDS);
                 }
 
+                if (lastAddSyncedEntry!= BookieProtocol.INVALID_ENTRY_ID) {
+                    addResponse.setLastAddSynced(lastAddSyncedEntry);
+                }
+
                 StatusCode status;
                 switch (rc) {
                     case BookieProtocol.EOK:
diff --git a/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto b/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto
index b43e69148..69143949a 100644
--- a/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto
+++ b/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 syntax = "proto2";
+import "DataFormats.proto";
 
 option java_package = "org.apache.bookkeeper.proto";
 option optimize_for = SPEED;
@@ -62,6 +63,7 @@ enum OperationType {
     READ_LAC = 7;
     GET_BOOKIE_INFO = 8;
     START_TLS = 9;
+    SYNC = 10;
 }
 
 /**
@@ -83,6 +85,7 @@ message Request {
     optional ReadLacRequest readLacRequest = 104;
     optional GetBookieInfoRequest getBookieInfoRequest = 105;
     optional StartTLSRequest startTLSRequest = 106;
+    optional SyncRequest syncRequest = 107;
 }
 
 message ReadRequest {
@@ -111,6 +114,7 @@ message AddRequest {
     required int64 entryId = 2;
     required bytes masterKey = 3;
     required bytes body = 4;
+    optional LedgerType ledgerType = 5;
 }
 
 message StartTLSRequest {
@@ -136,6 +140,11 @@ message GetBookieInfoRequest {
     optional int64 requested = 1;
 }
 
+message SyncRequest {
+   required int64 ledgerId = 1;
+   required bytes masterKey = 2;
+}
+
 message Response {
 
     required BKPacketHeader header = 1;
@@ -150,6 +159,7 @@ message Response {
     optional ReadLacResponse readLacResponse = 104;
     optional GetBookieInfoResponse getBookieInfoResponse = 105;
     optional StartTLSResponse startTLSResponse = 106;
+    optional SyncResponse syncResponse = 107;
 }
 
 message ReadResponse {
@@ -160,12 +170,16 @@ message ReadResponse {
     // Piggyback LAC
     optional int64 maxLAC = 5;
     optional int64 lacUpdateTimestamp = 6;
+    // Piggyback LastAddSynced
+    optional int64 lastAddSynced = 7;
 }
 
 message AddResponse {
     required StatusCode status = 1;
     required int64 ledgerId = 2;
     required int64 entryId = 3;
+    // Piggyback LastAddSynced
+    optional int64 lastAddSynced = 4;
 }
 
 message AuthMessage {
@@ -193,3 +207,9 @@ message GetBookieInfoResponse {
 
 message StartTLSResponse {
 }
+
+message SyncResponse {
+    required StatusCode status = 1;
+    required int64 ledgerId = 2;
+    required int64 lastAddSynced = 3;
+}
diff --git a/bookkeeper-server/src/main/proto/DataFormats.proto b/bookkeeper-server/src/main/proto/DataFormats.proto
index cdade9563..75db6cf0e 100644
--- a/bookkeeper-server/src/main/proto/DataFormats.proto
+++ b/bookkeeper-server/src/main/proto/DataFormats.proto
@@ -21,6 +21,14 @@ option java_package = "org.apache.bookkeeper.proto";
 option optimize_for = SPEED;
 
 /**
+ * Ledger Type
+ */
+enum LedgerType {
+    PD_JOURNAL = 0;
+    VD_JOURNAL = 1;
+}
+
+/**
 * Metadata format for storing ledger information
 */
 message LedgerMetadataFormat {
@@ -58,6 +66,8 @@ message LedgerMetadataFormat {
         optional bytes value = 2;
     }
     repeated cMetadataMapEntry customMetadata = 11;
+
+    optional LedgerType ledgerType = 12 [default = PD_JOURNAL];
 }
 
 message LedgerRereplicationLayoutFormat {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperVolatileDurabilityTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperVolatileDurabilityTest.java
new file mode 100644
index 000000000..63a593bad
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperVolatileDurabilityTest.java
@@ -0,0 +1,194 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+import static org.junit.Assert.assertEquals;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.bookkeeper.client.BKException.BKBookieHandleNotAvailableException;
+import org.apache.bookkeeper.client.api.LedgerType;
+import org.apache.bookkeeper.client.api.WriteHandle;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import static org.junit.Assert.fail;
+import org.junit.Test;
+
+/**
+ * Client side tests on volatile durability ledgers
+ */
+public class BookKeeperVolatileDurabilityTest extends MockBookKeeperTestCase {
+
+    final static byte[] password = "password".getBytes();
+    final static ByteBuf data = Unpooled.wrappedBuffer("foobar".getBytes());
+    final static int numEntries = 100;
+
+    @Test
+    public void testAddEntryLastAddConfirmedDoesNotAdvance() throws Exception {
+        try (WriteHandle wh = result(
+            newCreateLedgerOp()
+                .withEnsembleSize(3)
+                .withWriteQuorumSize(3)
+                .withAckQuorumSize(2)
+                .withPassword(password)
+                .withLedgerType(LedgerType.VD_JOURNAL)
+                .execute())) {
+            for (int i = 0; i < numEntries - 1; i++) {
+                result(wh.append(data));
+            }
+            long lastEntryID = result(wh.append(data));
+            assertEquals(numEntries - 1, lastEntryID);
+            LedgerHandle lh = (LedgerHandle) wh;
+            assertEquals(numEntries - 1, lh.getLastAddPushed());
+            assertEquals(-1, lh.getLastAddConfirmed());
+        }
+    }
+
+    @Test
+    public void testSyncAndAddConfirmedAdvances() throws Exception {
+        try (WriteHandle wh = result(
+            newCreateLedgerOp()
+                .withEnsembleSize(3)
+                .withWriteQuorumSize(3)
+                .withAckQuorumSize(2)
+                .withPassword(password)
+                .withLedgerType(LedgerType.VD_JOURNAL)
+                .execute())) {
+            for (int i = 0; i < numEntries - 1; i++) {
+                result(wh.append(data));
+            }
+            long lastEntryID = result(wh.append(data));
+            assertEquals(numEntries - 1, lastEntryID);
+            LedgerHandle lh = (LedgerHandle) wh;
+            assertEquals(numEntries - 1, lh.getLastAddPushed());
+            assertEquals(-1, lh.getLastAddConfirmed());
+
+            long lastSynced = result(wh.sync());
+            assertEquals(lastSynced, lh.getLastAddConfirmed());
+
+        }
+    }
+
+    @Test
+    public void testSyncNoEntries() throws Exception {
+        try (WriteHandle wh = result(
+            newCreateLedgerOp()
+                .withEnsembleSize(3)
+                .withWriteQuorumSize(3)
+                .withAckQuorumSize(2)
+                .withPassword(password)
+                .withLedgerType(LedgerType.VD_JOURNAL)
+                .execute())) {
+            assertEquals(Long.valueOf(-1), result(wh.sync()));
+            LedgerHandle lh = (LedgerHandle) wh;
+            assertEquals(-1, lh.getLastAddConfirmed());
+            assertEquals(-1, lh.getLastAddPushed());
+        }
+    }
+
+    @Test(expected = BKBookieHandleNotAvailableException.class)
+    public void testSyncAllPausedBookies() throws Exception {
+        try (WriteHandle wh = result(
+            newCreateLedgerOp()
+                .withEnsembleSize(3)
+                .withWriteQuorumSize(3)
+                .withAckQuorumSize(2)
+                .withPassword(password)
+                .withLedgerType(LedgerType.VD_JOURNAL)
+                .execute())) {
+            for (int i = 0; i < numEntries - 1; i++) {
+                result(wh.append(data));
+            }
+            long lastEntryID = result(wh.append(data));
+            assertEquals(numEntries - 1, lastEntryID);
+            LedgerHandle lh = (LedgerHandle) wh;
+            assertEquals(numEntries - 1, lh.getLastAddPushed());
+            assertEquals(-1, lh.getLastAddConfirmed());
+            assertEquals(3, lh.getLedgerMetadata().currentEnsemble.size());
+            for (BookieSocketAddress addr : lh.getLedgerMetadata().currentEnsemble) {
+                pauseBookie(addr);
+            }
+            result(wh.sync());
+        }
+    }
+
+    @Test
+    public void testSyncBookiesResumed() throws Exception {
+        try (WriteHandle wh = result(
+            newCreateLedgerOp()
+                .withEnsembleSize(3)
+                .withWriteQuorumSize(3)
+                .withAckQuorumSize(2)
+                .withPassword(password)
+                .withLedgerType(LedgerType.VD_JOURNAL)
+                .execute())) {
+            for (int i = 0; i < numEntries - 1; i++) {
+                result(wh.append(data));
+            }
+            long lastEntryID = result(wh.append(data));
+            assertEquals(numEntries - 1, lastEntryID);
+            LedgerHandle lh = (LedgerHandle) wh;
+            assertEquals(numEntries - 1, lh.getLastAddPushed());
+            assertEquals(-1, lh.getLastAddConfirmed());
+            assertEquals(3, lh.getLedgerMetadata().currentEnsemble.size());
+            for (BookieSocketAddress addr : lh.getLedgerMetadata().currentEnsemble) {
+                pauseBookie(addr);
+            }
+            try {
+                result(wh.sync());
+                fail("cannot sync");
+            } catch (BKBookieHandleNotAvailableException expected){
+            }
+            for (BookieSocketAddress addr : lh.getLedgerMetadata().currentEnsemble) {
+                resumeBookie(addr);
+            }
+            result(wh.sync());
+            result(wh.append(data));
+        }
+    }
+
+    @Test
+    public void testSyncSomePausedBookies() throws Exception {
+        try (WriteHandle wh = result(
+            newCreateLedgerOp()
+                .withEnsembleSize(3)
+                .withWriteQuorumSize(3)
+                .withAckQuorumSize(2)
+                .withPassword(password)
+                .withLedgerType(LedgerType.VD_JOURNAL)
+                .execute())) {
+            for (int i = 0; i < numEntries - 1; i++) {
+                result(wh.append(data));
+            }
+            long lastEntryID = result(wh.append(data));
+            assertEquals(numEntries - 1, lastEntryID);
+            LedgerHandle lh = (LedgerHandle) wh;
+            assertEquals(numEntries - 1, lh.getLastAddPushed());
+            assertEquals(-1, lh.getLastAddConfirmed());
+            assertEquals(3, lh.getLedgerMetadata().currentEnsemble.size());
+
+            BookieSocketAddress addr = lh.getLedgerMetadata().currentEnsemble.get(0);
+            pauseBookie(addr);
+
+            result(wh.sync());
+        }
+    }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LastAddSyncedManagerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LastAddSyncedManagerTest.java
new file mode 100644
index 000000000..708fa02c6
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LastAddSyncedManagerTest.java
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import java.util.Arrays;
+import java.util.List;
+import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+
+/**
+ * Tests on LastAddSyncedManager
+ */
+public class LastAddSyncedManagerTest {
+
+    @Test
+    public void testCalculateLastAddSynced() throws Exception {
+        assertCalculateLastAddSynced(3, 1, 5, Arrays.asList(1L, 2L, 3L), 3);
+        assertCalculateLastAddSynced(3, 2, 5, Arrays.asList(1L, 2L, 3L), 2);
+        assertCalculateLastAddSynced(3, 3, 5, Arrays.asList(1L, 2L, 3L), 1);
+    }
+
+    @Test
+    public void testCalculateLastAddSyncedNoEnoughData() throws Exception {
+        assertCalculateLastAddSynced(3, 3, 3, Arrays.asList(), -1);
+        assertCalculateLastAddSynced(3, 3, 3, Arrays.asList(1L), -1);
+        assertCalculateLastAddSynced(3, 3, 3, Arrays.asList(1L, 2L), -1);
+    }
+
+    private void assertCalculateLastAddSynced(int writeQuorumSize, int ackQuorumSize, int ensembleSize, List<Long> lastAddSynced, long expectedLastAddSynced) {
+        LastAddSyncedManager lastAddSyncedManager = new LastAddSyncedManager(writeQuorumSize, ackQuorumSize);
+        for (int i = 0; i < lastAddSynced.size(); i++) {
+            lastAddSyncedManager.updateBookie(i, lastAddSynced.get(i));
+        }
+        assertEquals(expectedLastAddSynced, lastAddSyncedManager.calculateCurrentLastAddSynced());
+    }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
index 6ef890154..f3e247f72 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
@@ -17,6 +17,10 @@
  */
 package org.apache.bookkeeper.client;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -24,7 +28,7 @@
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import java.util.ArrayList;
-import java.util.List;
+import java.util.Comparator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -32,10 +36,12 @@
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.bookkeeper.client.api.CreateBuilder;
 import org.apache.bookkeeper.client.api.DeleteBuilder;
+import org.apache.bookkeeper.client.api.LedgerType;
 import org.apache.bookkeeper.client.api.OpenBuilder;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.meta.LedgerIdGenerator;
@@ -46,14 +52,10 @@
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
+import org.apache.bookkeeper.util.SafeRunnable;
 import org.junit.After;
 import org.junit.Before;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
 import org.mockito.Mockito;
-import static org.mockito.Mockito.doAnswer;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
@@ -79,11 +81,26 @@
     protected AtomicLong mockNextLedgerId;
     protected ConcurrentSkipListSet<Long> fencedLedgers;
     protected ConcurrentMap<Long, Map<BookieSocketAddress, Map<Long, MockEntry>>> mockLedgerData;
+    protected ConcurrentMap<Long, Map<BookieSocketAddress, Long>> mockLastAddPersistedOnBookie;
+    protected ConcurrentHashMap<BookieSocketAddress, Boolean> pausedBookies;
 
     private Map<BookieSocketAddress, Map<Long, MockEntry>> getMockLedgerContents(long ledgerId) {
         return mockLedgerData.computeIfAbsent(ledgerId, (id) -> new ConcurrentHashMap<>());
     }
 
+    private Map<BookieSocketAddress, Long> getMockLastAddPersisted(long ledgerId) {
+        return mockLastAddPersistedOnBookie.computeIfAbsent(ledgerId, (id) -> new ConcurrentHashMap<>());
+    }
+
+    private long getMockLastAddPersistedInBookie(long ledgerId, BookieSocketAddress bookieSocketAddress) {
+        Map<BookieSocketAddress, Long> mockLastAddPersisted = getMockLastAddPersisted(ledgerId);
+        return mockLastAddPersisted.getOrDefault(bookieSocketAddress, BookieProtocol.INVALID_ENTRY_ID);
+    }
+
+    private void setMockLastAddPersistedInBookie(long ledgerId, BookieSocketAddress bookieSocketAddress, long entryId) {
+        getMockLastAddPersisted(ledgerId).put(bookieSocketAddress, entryId);
+    }
+
     private Map<Long, MockEntry> getMockLedgerContentsInBookie(long ledgerId, BookieSocketAddress bookieSocketAddress) {
         return getMockLedgerContents(ledgerId).computeIfAbsent(bookieSocketAddress, (addr) -> new ConcurrentHashMap<>());
     }
@@ -92,7 +109,7 @@ private MockEntry getMockLedgerEntry(long ledgerId, BookieSocketAddress bookieSo
         return getMockLedgerContentsInBookie(ledgerId, bookieSocketAddress).get(entryId);
     }
 
-    private static final class MockEntry {
+    protected static final class MockEntry {
 
         byte[] payload;
         long lastAddConfirmed;
@@ -108,10 +125,12 @@ public MockEntry(byte[] payload, long lastAddConfirmed) {
     public void setup() throws Exception {
         mockLedgerMetadataRegistry = new ConcurrentHashMap<>();
         mockLedgerData = new ConcurrentHashMap<>();
+        mockLastAddPersistedOnBookie = new ConcurrentHashMap<>();
+        pausedBookies = new ConcurrentHashMap<>();
         mockNextLedgerId = new AtomicLong(1);
         fencedLedgers = new ConcurrentSkipListSet<>();
         scheduler = new ScheduledThreadPoolExecutor(4);
-        executor = OrderedSafeExecutor.newBuilder().build();
+        executor = OrderedSafeExecutor.newBuilder().numThreads(4).build();
         bookieWatcher = mock(BookieWatcher.class);
 
         bookieClient = mock(BookieClient.class);
@@ -124,6 +143,7 @@ public void setup() throws Exception {
 
         when(bk.getCloseLock()).thenReturn(new ReentrantReadWriteLock());
         when(bk.isClosed()).thenReturn(false);
+        when(bk.isDelayEnsembleChange()).thenReturn(false);
         when(bk.getBookieWatcher()).thenReturn(bookieWatcher);
         when(bk.getExplicitLacInterval()).thenReturn(0);
         when(bk.getMainWorkerPool()).thenReturn(executor);
@@ -145,6 +165,7 @@ public void setup() throws Exception {
         setupBookieWatcherForNewEnsemble();
         setupBookieClientReadEntry();
         setupBookieClientAddEntry();
+        setupBookieClientSync();
     }
 
     protected NullStatsLogger setupLoggers() {
@@ -152,6 +173,7 @@ protected NullStatsLogger setupLoggers() {
         when(bk.getOpenOpLogger()).thenReturn(nullStatsLogger.getOpStatsLogger("mock"));
         when(bk.getRecoverOpLogger()).thenReturn(nullStatsLogger.getOpStatsLogger("mock"));
         when(bk.getAddOpLogger()).thenReturn(nullStatsLogger.getOpStatsLogger("mock"));
+        when(bk.getSyncOpLogger()).thenReturn(nullStatsLogger.getOpStatsLogger("mock"));
         when(bk.getReadOpLogger()).thenReturn(nullStatsLogger.getOpStatsLogger("mock"));
         when(bk.getDeleteOpLogger()).thenReturn(nullStatsLogger.getOpStatsLogger("mock"));
         when(bk.getCreateOpLogger()).thenReturn(nullStatsLogger.getOpStatsLogger("mock"));
@@ -161,11 +183,20 @@ protected NullStatsLogger setupLoggers() {
     }
 
     @After
-    public void tearDown() {
-        scheduler.shutdown();
+    public void tearDown() throws InterruptedException {
+        scheduler.shutdownNow();
+        scheduler.awaitTermination(1, TimeUnit.MINUTES);
         executor.shutdown();
     }
 
+    protected void pauseBookie(BookieSocketAddress address) {
+        pausedBookies.put(address, Boolean.TRUE);
+    }
+
+    protected void resumeBookie(BookieSocketAddress address) {
+        pausedBookies.remove(address);
+    }
+
     protected void setBookkeeperConfig(ClientConfiguration config) {
         when(bk.getConf()).thenReturn(config);
     }
@@ -211,9 +242,9 @@ private void setupBookieWatcherForNewEnsemble() throws BKException.BKNotEnoughBo
             });
     }
 
-    private void submit(Runnable operation) {
+    private void submit(Object key, Runnable operation) {
         try {
-            scheduler.submit(operation);
+            executor.submitOrdered(key, SafeRunnable.safeRun(operation));
         } catch (RejectedExecutionException rejected) {
             operation.run();
         }
@@ -332,14 +363,21 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
     protected void setupBookieClientReadEntry() {
         doAnswer((Answer) (InvocationOnMock invokation) -> {
             Object[] args = invokation.getArguments();
-            BookkeeperInternalCallbacks.ReadEntryCallback callback = (BookkeeperInternalCallbacks.ReadEntryCallback) args[4];
             BookieSocketAddress bookieSocketAddress = (BookieSocketAddress) args[0];
             long ledgerId = (Long) args[1];
             long entryId = (Long) args[3];
+            BookkeeperInternalCallbacks.ReadEntryCallback callback = (BookkeeperInternalCallbacks.ReadEntryCallback) args[4];
+            Object ctx = args[5];
 
             DigestManager macManager = new CRC32DigestManager(ledgerId);
             fencedLedgers.add(ledgerId);
-            submit(() -> {
+            submit(ledgerId, () -> {
+
+                if (pausedBookies.containsKey(bookieSocketAddress)) {
+                    callback.readEntryComplete(BKException.Code.BookieHandleNotAvailableException, ledgerId, entryId, null, ctx);
+                    return;
+                }
+
                 MockEntry mockEntry = getMockLedgerEntry(ledgerId, bookieSocketAddress, entryId);
                 if (mockEntry != null) {
                     LOG.info("readEntryAndFenceLedger - found mock entry {}@{} at {}", ledgerId, entryId, bookieSocketAddress);
@@ -362,10 +400,15 @@ protected void setupBookieClientReadEntry() {
             long ledgerId = (Long) args[1];
             long entryId = (Long) args[2];
             BookkeeperInternalCallbacks.ReadEntryCallback callback = (BookkeeperInternalCallbacks.ReadEntryCallback) args[3];
+            Object ctx = args[4];
 
             DigestManager macManager = new CRC32DigestManager(ledgerId);
 
-            submit(() -> {
+            submit(ledgerId, () -> {
+                if (pausedBookies.containsKey(bookieSocketAddress)) {
+                    callback.readEntryComplete(BKException.Code.BookieHandleNotAvailableException, ledgerId, entryId, null, ctx);
+                    return;
+                }
                 MockEntry mockEntry = getMockLedgerEntry(ledgerId, bookieSocketAddress, entryId);
                 if (mockEntry != null) {
                     LOG.info("readEntry - found mock entry {}@{} at {}", ledgerId, entryId, bookieSocketAddress);
@@ -407,18 +450,29 @@ protected void setupBookieClientAddEntry() {
 
             byte[] entry = extractEntryPayload(ledgerId, entryId, toSend);
 
-            submit(() -> {
+            submit(ledgerId, () -> {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("addEntry "+ledgerId+", "+entryId+" "+bookieSocketAddress+" "+pausedBookies);
+                }
+                if (pausedBookies.containsKey(bookieSocketAddress)) {
+                    callback.writeComplete(BKException.Code.BookieHandleNotAvailableException,
+                        ledgerId, entryId, BookieProtocol.INVALID_ENTRY_ID, bookieSocketAddress, ctx);
+                    return;
+                }
+
                 boolean fenced = fencedLedgers.contains(ledgerId);
                 if (fenced) {
                     callback.writeComplete(BKException.Code.LedgerFencedException,
-                        ledgerId, entryId, bookieSocketAddress, ctx);
+                        ledgerId, entryId, BookieProtocol.INVALID_ENTRY_ID, bookieSocketAddress, ctx);
                 } else {
                     if (getMockLedgerContentsInBookie(ledgerId, bookieSocketAddress).isEmpty()) {
                         registerMockEntryForRead(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, bookieSocketAddress,
                             new byte[0], BookieProtocol.INVALID_ENTRY_ID);
                     }
+                    long persistedEntryID = getMockLastAddPersistedInBookie(ledgerId, bookieSocketAddress);
                     registerMockEntryForRead(ledgerId, entryId, bookieSocketAddress, entry, ledgerId);
-                    callback.writeComplete(BKException.Code.OK, ledgerId, entryId, bookieSocketAddress, ctx);
+                    callback.writeComplete(BKException.Code.OK, ledgerId, entryId, persistedEntryID,
+                        bookieSocketAddress, ctx);
                 }
             });
             return null;
@@ -426,7 +480,47 @@ protected void setupBookieClientAddEntry() {
             anyLong(), any(byte[].class),
             anyLong(), any(ByteBuf.class),
             any(BookkeeperInternalCallbacks.WriteCallback.class),
-            any(), anyInt());
+            any(), anyInt(), any(LedgerType.class));
+    }
+
+    protected void setupBookieClientSync() {
+
+        doAnswer((Answer) (InvocationOnMock invokation) -> {
+            Object[] args = invokation.getArguments();
+
+            BookieSocketAddress bookieSocketAddress = (BookieSocketAddress) args[0];
+            long ledgerId = (Long) args[1];
+            BookkeeperInternalCallbacks.SyncCallback callback = (BookkeeperInternalCallbacks.SyncCallback) args[2];
+            Object ctx = args[3];
+
+            submit(ledgerId, () -> {
+
+                if (pausedBookies.containsKey(bookieSocketAddress)) {
+                    callback.syncComplete(BKException.Code.BookieHandleNotAvailableException,
+                        ledgerId, BookieProtocol.INVALID_ENTRY_ID, bookieSocketAddress, ctx);
+                    return;
+                }
+
+                boolean fenced = fencedLedgers.contains(ledgerId);
+                if (fenced) {
+                    callback.syncComplete(BKException.Code.LedgerFencedException,
+                        ledgerId, BookieProtocol.INVALID_ENTRY_ID, bookieSocketAddress, ctx);
+                } else {
+                    long lastAddSynced = getMockLedgerContentsInBookie(ledgerId, bookieSocketAddress)
+                        .keySet()
+                        .stream()
+                        .max(Comparator.naturalOrder())
+                        .orElse(-1L);
+                    setMockLastAddPersistedInBookie(ledgerId, bookieSocketAddress, lastAddSynced);
+                    callback.syncComplete(BKException.Code.OK, ledgerId, lastAddSynced,
+                        bookieSocketAddress, ctx);
+                }
+            });
+            return null;
+        }).when(bookieClient).sync(any(BookieSocketAddress.class),
+            anyLong(),
+            any(BookkeeperInternalCallbacks.SyncCallback.class),
+            any());
     }
 
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
index a2a671a68..3f6f41337 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
@@ -364,11 +364,12 @@ public void testRecoveryOnEntryGap() throws Exception {
         lh.bk.getBookieClient().addEntry(lh.metadata.currentEnsemble.get(0), lh.getId(), lh.ledgerKey, entryId, toSend,
             new WriteCallback() {
                 @Override
-                public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) {
+                public void writeComplete(int rc, long ledgerId, long entryId, long lastAddSyncedEntry,
+                    BookieSocketAddress addr, Object ctx) {
                     addSuccess.set(BKException.Code.OK == rc);
                     addLatch.countDown();
                 }
-            }, 0, BookieProtocol.FLAG_NONE);
+            }, 0, BookieProtocol.FLAG_NONE, lh.getLedgerType());
         addLatch.await();
         assertTrue("add entry 14 should succeed", addSuccess.get());
 
@@ -418,22 +419,24 @@ public void operationComplete(int rc, Void result) {
             private final int rc;
             private final long ledgerId;
             private final long entryId;
+            private final long lastAddSyncedEntry;
             private final BookieSocketAddress addr;
             private final Object ctx;
 
             WriteCallbackEntry(WriteCallback cb,
-                               int rc, long ledgerId, long entryId,
+                               int rc, long ledgerId, long entryId, long lastAddSyncedEntry,
                                BookieSocketAddress addr, Object ctx) {
                 this.cb = cb;
                 this.rc = rc;
                 this.ledgerId = ledgerId;
                 this.entryId = entryId;
+                this.lastAddSyncedEntry = lastAddSyncedEntry;
                 this.addr = addr;
                 this.ctx = ctx;
             }
 
             public void callback() {
-                cb.writeComplete(rc, ledgerId, entryId, addr, ctx);
+                cb.writeComplete(rc, ledgerId, entryId, lastAddSyncedEntry, addr, ctx);
             }
         }
 
@@ -454,12 +457,12 @@ public void addEntry(ByteBuf entry, final WriteCallback cb, Object ctx, byte[] m
                 throws IOException, BookieException {
             super.addEntry(entry, new WriteCallback() {
                 @Override
-                public void writeComplete(int rc, long ledgerId, long entryId,
+                public void writeComplete(int rc, long ledgerId, long entryId, long lastAddSyncedEntry,
                                           BookieSocketAddress addr, Object ctx) {
                     if (delayAddResponse.get()) {
-                        delayQueue.add(new WriteCallbackEntry(cb, rc, ledgerId, entryId, addr, ctx));
+                        delayQueue.add(new WriteCallbackEntry(cb, rc, ledgerId, entryId, lastAddSyncedEntry, addr, ctx));
                     } else {
-                        cb.writeComplete(rc, ledgerId, entryId, addr, ctx);
+                        cb.writeComplete(rc, ledgerId, entryId, lastAddSyncedEntry, addr, ctx);
                     }
                 }
             }, ctx, masterKey);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java
index 946ee4e1e..d54148295 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java
@@ -28,7 +28,6 @@
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.bookkeeper.client.BKException.BKClientClosedException;
-import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
 import org.apache.bookkeeper.client.BKException.BKIncorrectParameterException;
 import org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -46,7 +45,7 @@
 public class BookKeeperBuildersTest extends MockBookKeeperTestCase {
 
     private final static int ensembleSize = 3;
-    private final static int writeQuorumSize = 2;
+    private final static int writeQuorumSize = 3;
     private final static int ackQuorumSize = 1;
     private final static long ledgerId = 12342L;
     private final static Map<String, byte[]> customMetadata = new HashMap<>();
@@ -70,6 +69,62 @@ public void testCreateLedger() throws Exception {
         assertEquals(ackQuorumSize, metadata.getAckQuorumSize());
         assertEquals(writeQuorumSize, metadata.getWriteQuorumSize());
         assertArrayEquals(password, metadata.getPassword());
+        assertEquals(LedgerType.PD_JOURNAL, metadata.getLedgerType());
+    }
+
+    @Test
+    public void testCreateLedgerLedgerTypeVD_JOURNAL() throws Exception {
+        setNewGeneratedLedgerId(ledgerId);
+        WriteHandle writer = newCreateLedgerOp()
+            .withAckQuorumSize(ackQuorumSize)
+            .withEnsembleSize(ensembleSize)
+            .withWriteQuorumSize(writeQuorumSize)
+            .withCustomMetadata(customMetadata)
+            .withLedgerType(LedgerType.VD_JOURNAL)
+            .withPassword(password)
+            .execute()
+            .get();
+        assertEquals(ledgerId, writer.getId());
+        LedgerMetadata metadata = getLedgerMetadata(ledgerId);
+        assertEquals(ensembleSize, metadata.getEnsembleSize());
+        assertEquals(ackQuorumSize, metadata.getAckQuorumSize());
+        assertEquals(writeQuorumSize, metadata.getWriteQuorumSize());
+        assertArrayEquals(password, metadata.getPassword());
+        assertEquals(LedgerType.VD_JOURNAL, metadata.getLedgerType());
+    }
+
+    @Test(expected = BKIncorrectParameterException.class)
+    public void testCreateLedgerLedgerTypeVD_JOURNALNoStriping() throws Exception {
+        setNewGeneratedLedgerId(ledgerId);
+        result(newCreateLedgerOp()
+            .withAckQuorumSize(ackQuorumSize)
+            .withEnsembleSize(writeQuorumSize +1 )
+            .withWriteQuorumSize(writeQuorumSize)
+            .withCustomMetadata(customMetadata)
+            .withLedgerType(LedgerType.VD_JOURNAL)
+            .withPassword(password)
+            .execute());
+    }
+
+    @Test
+    public void testCreateLedgerLedgerTypeExplicitPD_JOURNAL() throws Exception {
+        setNewGeneratedLedgerId(ledgerId);
+        WriteHandle writer = newCreateLedgerOp()
+            .withAckQuorumSize(ackQuorumSize)
+            .withEnsembleSize(ensembleSize)
+            .withWriteQuorumSize(writeQuorumSize)
+            .withCustomMetadata(customMetadata)
+            .withLedgerType(LedgerType.PD_JOURNAL)
+            .withPassword(password)
+            .execute()
+            .get();
+        assertEquals(ledgerId, writer.getId());
+        LedgerMetadata metadata = getLedgerMetadata(ledgerId);
+        assertEquals(ensembleSize, metadata.getEnsembleSize());
+        assertEquals(ackQuorumSize, metadata.getAckQuorumSize());
+        assertEquals(writeQuorumSize, metadata.getWriteQuorumSize());
+        assertArrayEquals(password, metadata.getPassword());
+        assertEquals(LedgerType.PD_JOURNAL, metadata.getLedgerType());
     }
 
     @Test(expected = BKIncorrectParameterException.class)
@@ -141,6 +196,14 @@ public void testFailCustomMetadataNull() throws Exception {
     }
 
     @Test(expected = BKIncorrectParameterException.class)
+    public void testFailLedgerTypeNull() throws Exception {
+        result(newCreateLedgerOp()
+            .withLedgerType(null)
+            .withPassword(password)
+            .execute());
+    }
+
+    @Test(expected = BKIncorrectParameterException.class)
     public void testFailDigestTypeNullAndAutodetectionTrue() throws Exception {
         ClientConfiguration config = new ClientConfiguration();
         config.setEnableDigestTypeAutodetection(true);
@@ -310,7 +373,7 @@ protected LedgerMetadata generateLedgerMetadata(int ensembleSize,
         int writeQuorumSize, int ackQuorumSize, byte[] password,
         Map<String, byte[]> customMetadata) {
         LedgerMetadata ledgerMetadata = new LedgerMetadata(ensembleSize, writeQuorumSize,
-            ackQuorumSize, BookKeeper.DigestType.CRC32, password, customMetadata);
+            ackQuorumSize, BookKeeper.DigestType.CRC32, password, customMetadata, LedgerType.PD_JOURNAL);
         ledgerMetadata.addEnsemble(0, generateNewEnsemble(ensembleSize));
         return ledgerMetadata;
     }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
index 6627f8287..14644adc4 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
@@ -47,6 +47,7 @@
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.LedgerMetadata;
+import org.apache.bookkeeper.client.api.LedgerType;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
@@ -524,7 +525,8 @@ public void testTriggerAuditorWithNoPendingAuditTask() throws Exception {
         int numofledgers = 5;
         Random rand = new Random();
         for (int i = 0; i < numofledgers; i++) {
-            LedgerMetadata metadata = new LedgerMetadata(3, 2, 2, DigestType.CRC32, "passwd".getBytes(), null);
+            LedgerMetadata metadata = new LedgerMetadata(3, 2, 2, DigestType.CRC32, "passwd".getBytes(),
+                                                         null, LedgerType.PD_JOURNAL);
             ArrayList<BookieSocketAddress> ensemble = new ArrayList<BookieSocketAddress>();
             ensemble.add(new BookieSocketAddress("99.99.99.99:9999"));
             ensemble.add(new BookieSocketAddress("11.11.11.11:1111"));
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
index 5bc34d61d..2f0463478 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
@@ -35,6 +35,7 @@
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BKException.Code;
 import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
+import org.apache.bookkeeper.client.api.LedgerType;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -122,7 +123,8 @@ public void readEntryComplete(int rc, long ledgerId, long entryId, ByteBuf bb, O
     };
 
     WriteCallback wrcb = new WriteCallback() {
-        public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) {
+        public void writeComplete(int rc, long ledgerId, long entryId, long lastAddSyncedEntry,
+                                  BookieSocketAddress addr, Object ctx) {
             if (ctx != null) {
                 synchronized (ctx) {
                     if (ctx instanceof ResultStruct) {
@@ -145,7 +147,7 @@ public void testWriteGaps() throws Exception {
 
         BookieClient bc = new BookieClient(new ClientConfiguration(), eventLoopGroup, executor);
         ByteBuf bb = createByteBuffer(1, 1, 1);
-        bc.addEntry(addr, 1, passwd, 1, bb, wrcb, arc, BookieProtocol.FLAG_NONE);
+        bc.addEntry(addr, 1, passwd, 1, bb, wrcb, arc, BookieProtocol.FLAG_NONE, LedgerType.PD_JOURNAL);
         synchronized (arc) {
             arc.wait(1000);
             assertEquals(0, arc.rc);
@@ -155,16 +157,16 @@ public void testWriteGaps() throws Exception {
             assertEquals(1, arc.entry.getInt());
         }
         bb = createByteBuffer(2, 1, 2);
-        bc.addEntry(addr, 1, passwd, 2, bb, wrcb, null, BookieProtocol.FLAG_NONE);
+        bc.addEntry(addr, 1, passwd, 2, bb, wrcb, null, BookieProtocol.FLAG_NONE, LedgerType.PD_JOURNAL);
         bb = createByteBuffer(3, 1, 3);
-        bc.addEntry(addr, 1, passwd, 3, bb, wrcb, null, BookieProtocol.FLAG_NONE);
+        bc.addEntry(addr, 1, passwd, 3, bb, wrcb, null, BookieProtocol.FLAG_NONE, LedgerType.PD_JOURNAL);
         bb = createByteBuffer(5, 1, 5);
-        bc.addEntry(addr, 1, passwd, 5, bb, wrcb, null, BookieProtocol.FLAG_NONE);
+        bc.addEntry(addr, 1, passwd, 5, bb, wrcb, null, BookieProtocol.FLAG_NONE, LedgerType.PD_JOURNAL);
         bb = createByteBuffer(7, 1, 7);
-        bc.addEntry(addr, 1, passwd, 7, bb, wrcb, null, BookieProtocol.FLAG_NONE);
+        bc.addEntry(addr, 1, passwd, 7, bb, wrcb, null, BookieProtocol.FLAG_NONE, LedgerType.PD_JOURNAL);
         synchronized (notifyObject) {
             bb = createByteBuffer(11, 1, 11);
-            bc.addEntry(addr, 1, passwd, 11, bb, wrcb, notifyObject, BookieProtocol.FLAG_NONE);
+            bc.addEntry(addr, 1, passwd, 11, bb, wrcb, notifyObject, BookieProtocol.FLAG_NONE, LedgerType.PD_JOURNAL);
             notifyObject.wait();
         }
         synchronized (arc) {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java
index eb2ff0ee9..f6a37d760 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java
@@ -167,7 +167,7 @@ private long doWrites(int ledgers, int size, int totalwrites)
         throttle = new Semaphore(10000);
         WriteCallback cb = new WriteCallback() {
             @Override
-            public void writeComplete(int rc, long ledgerId, long entryId,
+            public void writeComplete(int rc, long ledgerId, long entryId, long lastAddSyncedEntry,
                     BookieSocketAddress addr, Object ctx) {
                 AtomicInteger counter = (AtomicInteger)ctx;
                 counter.getAndIncrement();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java
index 4aca80838..234a81562 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java
@@ -27,6 +27,7 @@
 
 import java.io.IOException;
 import java.util.Arrays;
+import org.apache.bookkeeper.client.api.LedgerType;
 
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -76,11 +77,13 @@ void write(long ledgerId, long entry, byte[] data, BookieSocketAddress addr, Wri
         byte[] passwd = new byte[20];
         Arrays.fill(passwd, (byte) 'a');
 
-        client.addEntry(addr, ledgerId, passwd, entry, Unpooled.wrappedBuffer(data), cb, ctx, BookieProtocol.FLAG_NONE);
+        client.addEntry(addr, ledgerId, passwd, entry, Unpooled.wrappedBuffer(data), cb, ctx,
+                        BookieProtocol.FLAG_NONE, LedgerType.PD_JOURNAL);
     }
 
     @Override
-    public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) {
+    public void writeComplete(int rc, long ledgerId, long entryId, long lastAddSyncedEntry,
+                              BookieSocketAddress addr, Object ctx) {
         Counter counter = (Counter) ctx;
         counter.increment();
     }
diff --git a/pom.xml b/pom.xml
index 7a4dddca7..1d69d8f5e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -117,7 +117,7 @@
     <findbugs-maven-plugin.version>3.0.5</findbugs-maven-plugin.version>
     <puppycrawl.checkstyle.version>6.19</puppycrawl.checkstyle.version>
     <maven-checkstyle-plugin.version>2.17</maven-checkstyle-plugin.version>
-    <maven-compiler-plugin.version>3.5.1</maven-compiler-plugin.version>
+    <maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
     <maven-deploy-plugin.version>2.7</maven-deploy-plugin.version>
     <maven-jar-plugin.version>2.2</maven-jar-plugin.version>
     <maven-javadoc-plugin.version>2.10.4</maven-javadoc-plugin.version>
@@ -204,7 +204,8 @@
           <target>1.8</target>
           <compilerArgs>
             <compilerArg>-Werror</compilerArg>
-            <compilerArg>-Xlint:deprecation</compilerArg>
+            <!-- Protobuf generates code with warnings for top level 'enums' like LedgerType https://github.com/google/protobuf/issues/2054 -->
+            <!-- <compilerArg>-Xlint:deprecation</compilerArg> -->
             <compilerArg>-Xlint:unchecked</compilerArg>
 	  </compilerArgs>
         </configuration>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services