You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by re...@apache.org on 2019/04/19 17:11:13 UTC

[bookkeeper] branch master updated: ISSUE #1967: make ledger creation and removal robust to zk connectionloss

This is an automated email from the ASF dual-hosted git repository.

reddycharan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 407cb35  ISSUE #1967: make ledger creation and removal robust to zk connectionloss
407cb35 is described below

commit 407cb35e594e2334ae687f7c884651c7642349fa
Author: Charan Reddy Guttapalem <re...@gmail.com>
AuthorDate: Fri Apr 19 10:11:08 2019 -0700

    ISSUE #1967: make ledger creation and removal robust to zk connectionloss
    
    
    Descriptions of the changes in this PR:
    
    The bookkeeper project ZooKeeperClient wrapper for the ZooKeeper client
    will resend zk node creations and removals upon reconnect after a
    ConnectionLoss event. In the event that the original succeeded, the
    resent operation will erroneously return LedgerExistException or
    NoSuchLedgerExistsException for creation and removal respectively.
    
    For removal, this patch limits the operation by allowing it to always
    succeed if the ledger does not exist in order to make it idempotent.
    This is appears to be the simplest solution as exclusive removal isn't
    important.
    
    **Note, the above is an actual change to the bk client semantics**
    
    For creation, exclusive creation is cleary important for correctness,
    so this patch adds a creator token field to the LedgerMetdata to
    disambiguate the above race from a real race. For
    AbstractZkLedgerManager, this is simply a random long value.
    
    There's an oportunity for optimization with the above if exclusive
    ledger creation failures are expected to be common.  You only actually
    need to perform this check if the operation was really resent.  I chose
    not to go this route yet because it would require messing with the
    ZooKeeperClient interface to surface that information without burdening
    other callers.
    
    If the client is set to version 2 or older, this field will be ignored
    and the old behavior will be retained.  If the client is version 3 or
    newer but creation races with an older client, the new client will
    interpret the nonce to be BLANK and thereby detect the race correctly.
    
    
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>
    
    This closes #2006 from reddycharan/zkretrialrobust, closes #1967
---
 bookkeeper-proto/src/main/proto/DataFormats.proto  |   2 +
 .../bookkeeper/client/LedgerMetadataBuilder.java   |   9 ++
 .../bookkeeper/client/LedgerMetadataImpl.java      |  10 ++
 .../bookkeeper/client/api/LedgerMetadata.java      |   7 +
 .../bookkeeper/meta/AbstractZkLedgerManager.java   |  54 +++++++-
 .../bookkeeper/meta/LedgerMetadataSerDe.java       |   8 +-
 .../bookkeeper/zookeeper/ZooKeeperClient.java      |   4 +-
 .../apache/bookkeeper/client/BookKeeperTest.java   | 142 ++++++++++++++++++++-
 .../meta/AbstractZkLedgerManagerTest.java          |   5 +-
 .../bookkeeper/meta/TestLedgerMetadataSerDe.java   |   4 +-
 .../backwardcompat/TestCompatOldClients.groovy     |  38 +++++-
 .../TestCompatUpgradeYahooCustom.groovy            |  20 ++-
 12 files changed, 284 insertions(+), 19 deletions(-)

diff --git a/bookkeeper-proto/src/main/proto/DataFormats.proto b/bookkeeper-proto/src/main/proto/DataFormats.proto
index a679248..2501aff 100644
--- a/bookkeeper-proto/src/main/proto/DataFormats.proto
+++ b/bookkeeper-proto/src/main/proto/DataFormats.proto
@@ -60,6 +60,8 @@ message LedgerMetadataFormat {
         optional bytes value = 2;
     }
     repeated cMetadataMapEntry customMetadata = 11;
+
+    optional int64 cToken = 12;
 }
 
 message LedgerRereplicationLayoutFormat {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataBuilder.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataBuilder.java
index 0c80315..e4f75ce 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataBuilder.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataBuilder.java
@@ -63,6 +63,9 @@ public class LedgerMetadataBuilder {
     private boolean storeCtime = false;
     private Map<String, byte[]> customMetadata = Collections.emptyMap();
 
+    private static final long BLANK_CTOKEN = 0;
+    private long cToken = BLANK_CTOKEN;
+
     public static LedgerMetadataBuilder create() {
         return new LedgerMetadataBuilder();
     }
@@ -181,6 +184,11 @@ public class LedgerMetadataBuilder {
         return this;
     }
 
+    public LedgerMetadataBuilder withCToken(long cToken) {
+        this.cToken = cToken;
+        return this;
+    }
+
     public LedgerMetadata build() {
         checkArgument(ensembleSize >= writeQuorumSize, "Write quorum must be less or equal to ensemble size");
         checkArgument(writeQuorumSize >= ackQuorumSize, "Write quorum must be greater or equal to ack quorum");
@@ -189,6 +197,7 @@ public class LedgerMetadataBuilder {
                                       ensembleSize, writeQuorumSize, ackQuorumSize,
                                       state, lastEntryId, length, ensembles,
                                       digestType, password, ctime, storeCtime,
+                                      cToken,
                                       customMetadata);
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataImpl.java
index c0fcadd..842244a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataImpl.java
@@ -69,6 +69,8 @@ class LedgerMetadataImpl implements LedgerMetadata {
 
     private final Map<String, byte[]> customMetadata;
 
+    private long cToken;
+
     LedgerMetadataImpl(int metadataFormatVersion,
                        int ensembleSize,
                        int writeQuorumSize,
@@ -81,6 +83,7 @@ class LedgerMetadataImpl implements LedgerMetadata {
                        Optional<byte[]> password,
                        long ctime,
                        boolean storeCtime,
+                       long cToken,
                        Map<String, byte[]> customMetadata) {
         checkArgument(ensembles.size() > 0, "There must be at least one ensemble in the ledger");
         if (state == State.CLOSED) {
@@ -127,6 +130,8 @@ class LedgerMetadataImpl implements LedgerMetadata {
         this.ctime = ctime;
         this.storeCtime = storeCtime;
 
+        this.cToken = cToken;
+
         this.customMetadata = ImmutableMap.copyOf(customMetadata);
     }
 
@@ -268,4 +273,9 @@ class LedgerMetadataImpl implements LedgerMetadata {
     boolean shouldStoreCtime() {
         return storeCtime;
     }
+
+    @Override
+    public long getCToken() {
+        return cToken;
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LedgerMetadata.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LedgerMetadata.java
index 2ce1940..de6cdf2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LedgerMetadata.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LedgerMetadata.java
@@ -174,4 +174,11 @@ public interface LedgerMetadata {
      * @return the format version.
      */
     int getMetadataFormatVersion();
+
+    /**
+     * Get the unique creator token of the Ledger.
+     *
+     * @return the creator token
+     */
+    long getCToken();
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
index 96c2f0f..9308bfb 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
@@ -33,9 +33,11 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.LedgerMetadataBuilder;
 import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.conf.AbstractConfiguration;
@@ -247,8 +249,19 @@ public abstract class AbstractZkLedgerManager implements LedgerManager, Watcher
     }
 
     @Override
-    public CompletableFuture<Versioned<LedgerMetadata>> createLedgerMetadata(long ledgerId, LedgerMetadata metadata) {
+    public CompletableFuture<Versioned<LedgerMetadata>> createLedgerMetadata(long ledgerId,
+                                                                             LedgerMetadata inputMetadata) {
         CompletableFuture<Versioned<LedgerMetadata>> promise = new CompletableFuture<>();
+        /*
+         * Create a random number and use it as creator token.
+         */
+        final long cToken = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
+        final LedgerMetadata metadata;
+        if (inputMetadata.getMetadataFormatVersion() > LedgerMetadataSerDe.METADATA_FORMAT_VERSION_2) {
+            metadata = LedgerMetadataBuilder.from(inputMetadata).withCToken(cToken).build();
+        } else {
+            metadata = inputMetadata;
+        }
         String ledgerPath = getLedgerPath(ledgerId);
         StringCallback scb = new StringCallback() {
             @Override
@@ -256,8 +269,39 @@ public abstract class AbstractZkLedgerManager implements LedgerManager, Watcher
                 if (rc == Code.OK.intValue()) {
                     promise.complete(new Versioned<>(metadata, new LongVersion(0)));
                 } else if (rc == Code.NODEEXISTS.intValue()) {
-                    LOG.warn("Failed to create ledger metadata for {} which already exist", ledgerId);
-                    promise.completeExceptionally(new BKException.BKLedgerExistException());
+                    LOG.info("Ledger metadata for {} appears to already exist, checking cToken",
+                            ledgerId);
+                    if (metadata.getMetadataFormatVersion() > 2) {
+                        CompletableFuture<Versioned<LedgerMetadata>> readFuture = readLedgerMetadata(ledgerId);
+                        readFuture.handle((readMetadata, exception) -> {
+                            if (exception == null) {
+                                if (readMetadata.getValue().getCToken() == cToken) {
+                                    FutureUtils.complete(promise, new Versioned<>(metadata, new LongVersion(0)));
+                                } else {
+                                    LOG.warn("Failed to create ledger metadata for {} which already exists", ledgerId);
+                                    promise.completeExceptionally(new BKException.BKLedgerExistException());
+                                }
+                            } else if (exception instanceof KeeperException.NoNodeException) {
+                                // This is a pretty strange case.  We tried to create the node, found that it
+                                // already exists, but failed to find it when we reread it.  It's possible that
+                                // we successfully created it, got an erroneous NODEEXISTS due to a resend,
+                                // and then it got removed.  It's also possible that we actually lost the race
+                                // and then it got removed.  I'd argue that returning an error here is the right
+                                // path since recreating it is likely to cause problems.
+                                LOG.warn("Ledger {} appears to have already existed and then been removed, failing"
+                                        + " with LedgerExistException");
+                                promise.completeExceptionally(new BKException.BKLedgerExistException());
+                            } else {
+                                LOG.error("Could not validate node for ledger {} after LedgerExistsException", ledgerId,
+                                        exception);
+                                promise.completeExceptionally(new BKException.ZKException());
+                            }
+                            return null;
+                        });
+                    } else {
+                        LOG.warn("Failed to create ledger metadata for {} which already exists", ledgerId);
+                        promise.completeExceptionally(new BKException.BKLedgerExistException());
+                    }
                 } else {
                     LOG.error("Could not create node for ledger {}", ledgerId,
                             KeeperException.create(Code.get(rc), path));
@@ -301,8 +345,8 @@ public abstract class AbstractZkLedgerManager implements LedgerManager, Watcher
             @Override
             public void processResult(int rc, String path, Object ctx) {
                 if (rc == KeeperException.Code.NONODE.intValue()) {
-                    LOG.warn("Ledger node does not exist in ZooKeeper: ledgerId={}", ledgerId);
-                    promise.completeExceptionally(new BKException.BKNoSuchLedgerExistsException());
+                    LOG.warn("Ledger node does not exist in ZooKeeper: ledgerId={}.  Returning success.", ledgerId);
+                    FutureUtils.complete(promise, null);
                 } else if (rc == KeeperException.Code.OK.intValue()) {
                     // removed listener on ledgerId
                     Set<LedgerMetadataListener> listenerSet = listeners.remove(ledgerId);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerMetadataSerDe.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerMetadataSerDe.java
index aab72fb..b34b34b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerMetadataSerDe.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerMetadataSerDe.java
@@ -78,7 +78,7 @@ public class LedgerMetadataSerDe {
     public static final int METADATA_FORMAT_VERSION_3 = 3;
 
     public static final int MAXIMUM_METADATA_FORMAT_VERSION = METADATA_FORMAT_VERSION_3;
-    public static final int CURRENT_METADATA_FORMAT_VERSION = METADATA_FORMAT_VERSION_2;
+    public static final int CURRENT_METADATA_FORMAT_VERSION = METADATA_FORMAT_VERSION_3;
     private static final int LOWEST_COMPAT_METADATA_FORMAT_VERSION = METADATA_FORMAT_VERSION_1;
 
     // for pulling the version
@@ -205,6 +205,8 @@ public class LedgerMetadataSerDe {
                 builder.addSegment(segmentBuilder.build());
             }
 
+            builder.setCToken(metadata.getCToken());
+
             builder.build().writeDelimitedTo(os);
             return os.toByteArray();
         }
@@ -429,6 +431,10 @@ public class LedgerMetadataSerDe {
                                                Collectors.toMap(e -> e.getKey(),
                                                                 e -> e.getValue().toByteArray())));
         }
+
+        if (data.hasCToken()) {
+            builder.withCToken(data.getCToken());
+        }
     }
 
     private static LedgerMetadata parseVersion1Config(InputStream is) throws IOException {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperClient.java
index be037f5..3d97e6f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperClient.java
@@ -271,7 +271,7 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher, AutoCloseable
         return new Builder();
     }
 
-    ZooKeeperClient(String connectString,
+    protected ZooKeeperClient(String connectString,
                     int sessionTimeoutMs,
                     ZooKeeperWatcherBase watcherManager,
                     RetryPolicy connectRetryPolicy,
@@ -329,7 +329,7 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher, AutoCloseable
         }
     }
 
-    protected void waitForConnection() throws KeeperException, InterruptedException {
+    public void waitForConnection() throws KeeperException, InterruptedException {
         watcherManager.waitForConnection();
     }
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
index cb45ba7..80d059d 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
@@ -24,6 +24,7 @@ import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -32,10 +33,12 @@ import io.netty.util.IllegalReferenceCountException;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Enumeration;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
@@ -45,9 +48,22 @@ import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.api.WriteFlag;
 import org.apache.bookkeeper.client.api.WriteHandle;
 import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
+import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
+import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
+import org.apache.zookeeper.AsyncCallback.StringCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.ConnectionLossException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.data.ACL;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,7 +73,7 @@ import org.slf4j.LoggerFactory;
  */
 public class BookKeeperTest extends BookKeeperClusterTestCase {
     private static final Logger LOG = LoggerFactory.getLogger(BookKeeperTest.class);
-
+    private static final long INVALID_LEDGERID = -1L;
     private final DigestType digestType;
 
     public BookKeeperTest() {
@@ -816,4 +832,128 @@ public class BookKeeperTest extends BookKeeperClusterTestCase {
             }
         }
     }
+
+    class MockZooKeeperClient extends ZooKeeperClient {
+        class MockZooKeeper extends ZooKeeper {
+            public MockZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)
+                    throws IOException {
+                super(connectString, sessionTimeout, watcher, canBeReadOnly);
+            }
+
+            @Override
+            public void create(final String path, byte data[], List<ACL> acl, CreateMode createMode, StringCallback cb,
+                    Object ctx) {
+                StringCallback injectedCallback = new StringCallback() {
+                    @Override
+                    public void processResult(int rc, String path, Object ctx, String name) {
+                        /**
+                         * if ledgerIdToInjectFailure matches with the path of
+                         * the node, then throw CONNECTIONLOSS error and then
+                         * reset it to INVALID_LEDGERID.
+                         */
+                        if (path.contains(ledgerIdToInjectFailure.toString())) {
+                            ledgerIdToInjectFailure.set(INVALID_LEDGERID);
+                            cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, name);
+                        } else {
+                            cb.processResult(rc, path, ctx, name);
+                        }
+                    }
+                };
+                super.create(path, data, acl, createMode, injectedCallback, ctx);
+            }
+        }
+
+        private final String connectString;
+        private final int sessionTimeoutMs;
+        private final ZooKeeperWatcherBase watcherManager;
+        private final AtomicLong ledgerIdToInjectFailure;
+
+        MockZooKeeperClient(String connectString, int sessionTimeoutMs, ZooKeeperWatcherBase watcher,
+                AtomicLong ledgerIdToInjectFailure) throws IOException {
+            /*
+             * in OperationalRetryPolicy maxRetries is > 0. So in case of any
+             * RecoverableException scenario, it will retry.
+             */
+            super(connectString, sessionTimeoutMs, watcher,
+                    new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, Integer.MAX_VALUE),
+                    new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, 3),
+                    NullStatsLogger.INSTANCE, 1, 0, false);
+            this.connectString = connectString;
+            this.sessionTimeoutMs = sessionTimeoutMs;
+            this.watcherManager = watcher;
+            this.ledgerIdToInjectFailure = ledgerIdToInjectFailure;
+        }
+
+        @Override
+        protected ZooKeeper createZooKeeper() throws IOException {
+            return new MockZooKeeper(this.connectString, this.sessionTimeoutMs, this.watcherManager, false);
+        }
+    }
+
+    @Test
+    public void testZKConnectionLossForLedgerCreation() throws Exception {
+        int zkSessionTimeOut = 10000;
+        AtomicLong ledgerIdToInjectFailure = new AtomicLong(INVALID_LEDGERID);
+        ZooKeeperWatcherBase zooKeeperWatcherBase = new ZooKeeperWatcherBase(zkSessionTimeOut,
+                NullStatsLogger.INSTANCE);
+        MockZooKeeperClient zkFaultInjectionWrapper = new MockZooKeeperClient(zkUtil.getZooKeeperConnectString(),
+                zkSessionTimeOut, zooKeeperWatcherBase, ledgerIdToInjectFailure);
+        zkFaultInjectionWrapper.waitForConnection();
+        assertEquals("zkFaultInjectionWrapper should be in connected state", States.CONNECTED,
+                zkFaultInjectionWrapper.getState());
+        BookKeeper bk = new BookKeeper(baseClientConf, zkFaultInjectionWrapper);
+        long oldZkInstanceSessionId = zkFaultInjectionWrapper.getSessionId();
+        long ledgerId = 567L;
+        LedgerHandle lh = bk.createLedgerAdv(ledgerId, 1, 1, 1, DigestType.CRC32, "".getBytes(), null);
+        lh.close();
+
+        /*
+         * trigger Expired event so that MockZooKeeperClient would run
+         * 'clientCreator' and create new zk handle. In this case it would
+         * create MockZooKeeper.
+         */
+        zooKeeperWatcherBase.process(new WatchedEvent(EventType.None, KeeperState.Expired, ""));
+        zkFaultInjectionWrapper.waitForConnection();
+        for (int i = 0; i < 10; i++) {
+            if (zkFaultInjectionWrapper.getState() == States.CONNECTED) {
+                break;
+            }
+            Thread.sleep(200);
+        }
+        assertEquals("zkFaultInjectionWrapper should be in connected state", States.CONNECTED,
+                zkFaultInjectionWrapper.getState());
+        assertNotEquals("Session Id of old and new ZK instance should be different", oldZkInstanceSessionId,
+                zkFaultInjectionWrapper.getSessionId());
+        ledgerId++;
+        ledgerIdToInjectFailure.set(ledgerId);
+        /**
+         * ledgerIdToInjectFailure is set to 'ledgerId', so zookeeper.create
+         * would return CONNECTIONLOSS error for the first time and when it is
+         * retried, as expected it would return NODEEXISTS error.
+         *
+         * AbstractZkLedgerManager.createLedgerMetadata should deal with this
+         * scenario appropriately.
+         */
+        lh = bk.createLedgerAdv(ledgerId, 1, 1, 1, DigestType.CRC32, "".getBytes(), null);
+        lh.close();
+        assertEquals("injectZnodeCreationNoNodeFailure should have been reset it to INVALID_LEDGERID", INVALID_LEDGERID,
+                ledgerIdToInjectFailure.get());
+        lh = bk.openLedger(ledgerId, DigestType.CRC32, "".getBytes());
+        lh.close();
+        ledgerId++;
+        lh = bk.createLedgerAdv(ledgerId, 1, 1, 1, DigestType.CRC32, "".getBytes(), null);
+        lh.close();
+        bk.close();
+    }
+
+    @Test
+    public void testLedgerDeletionIdempotency() throws Exception {
+        BookKeeper bk = new BookKeeper(baseClientConf);
+        long ledgerId = 789L;
+        LedgerHandle lh = bk.createLedgerAdv(ledgerId, 1, 1, 1, DigestType.CRC32, "".getBytes(), null);
+        lh.close();
+        bk.deleteLedger(ledgerId);
+        bk.deleteLedger(ledgerId);
+        bk.close();
+    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java
index 5776af3..8e2ec0a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java
@@ -273,9 +273,8 @@ public class AbstractZkLedgerManagerTest extends MockZooKeeperTestCase {
 
         try {
             result(ledgerManager.removeLedgerMetadata(ledgerId, version));
-            fail("Should fail to remove metadata if no such ledger exists");
         } catch (BKException bke) {
-            assertEquals(Code.NoSuchLedgerExistsException, bke.getCode());
+            fail("Should succeed");
         }
 
         verify(mockZk, times(1))
@@ -294,7 +293,7 @@ public class AbstractZkLedgerManagerTest extends MockZooKeeperTestCase {
 
         try {
             result(ledgerManager.removeLedgerMetadata(ledgerId, version));
-            fail("Should fail to remove metadata if no such ledger exists");
+            fail("Should fail to remove metadata upon ZKException");
         } catch (BKException bke) {
             assertEquals(Code.ZKException, bke.getCode());
         }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLedgerMetadataSerDe.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLedgerMetadataSerDe.java
index d9ce5a2..a0767ba 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLedgerMetadataSerDe.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLedgerMetadataSerDe.java
@@ -60,9 +60,9 @@ public class TestLedgerMetadataSerDe {
 
     // version 3, since 4.9.x, protobuf binary format
     private static final String version3 =
-        "Qm9va2llTWV0YWRhdGFGb3JtYXRWZXJzaW9uCTMKXggCEAMYACD///////////8BKAEyMgoOMTkyL"
+        "Qm9va2llTWV0YWRhdGFGb3JtYXRWZXJzaW9uCTMKYAgCEAMYACD///////////8BKAEyMgoOMTkyL"
         + "jAuMi4xOjMxODEKDjE5Mi4wLjIuMjozMTgxCg4xOTIuMC4yLjM6MzE4MRAAOANCBmZvb2JhckgB"
-        + "UP///////////wE=";
+        + "UP///////////wFgAA==";
 
     private static void testDecodeEncode(String encoded) throws Exception {
         LedgerMetadataSerDe serDe = new LedgerMetadataSerDe();
diff --git a/tests/backward-compat/current-server-old-clients/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatOldClients.groovy b/tests/backward-compat/current-server-old-clients/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatOldClients.groovy
index a7ab4e3..4fd35d5 100644
--- a/tests/backward-compat/current-server-old-clients/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatOldClients.groovy
+++ b/tests/backward-compat/current-server-old-clients/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatOldClients.groovy
@@ -163,11 +163,47 @@ class TestCompatOldClients {
         }
     }
 
+    private void testReadOpenFailure(String writeVersion, String readerVersion) throws Exception {
+        String zookeeper = BookKeeperClusterUtils.zookeeperConnectString(docker)
+
+        def writeCL = MavenClassLoader.forBookKeeperVersion(writeVersion)
+        def writeBK = writeCL.newBookKeeper(zookeeper)
+        def readCL = MavenClassLoader.forBookKeeperVersion(readerVersion)
+        def readBK = readCL.newBookKeeper(zookeeper)
+        try {
+            def numEntries = 5
+            def ledger0 = writeBK.createLedger(3, 2,
+                                               writeCL.digestType("CRC32"),
+                                               PASSWD)
+            for (int i = 0; i < numEntries; i++) {
+                ledger0.addEntry(("foobar" + i).getBytes())
+            }
+            ledger0.close()
+
+            try {
+                def ledger1 = readBK.openLedger(ledger0.getId(), readCL.digestType("CRC32"), PASSWD)
+                Assert.fail("For older versions Openledger call is expected to fail with ZKException");
+            } catch (Exception exc) {
+                Assert.assertEquals(exc.getClass().getName(),
+                                "org.apache.bookkeeper.client.BKException\$ZKException")
+            }
+        } finally {
+            readBK.close()
+            readCL.close()
+            writeBK.close()
+            writeCL.close()
+        }
+    }
+
+    /**
+     * Since METADATA_VERSION is upgraded and it is using binary format, the older
+     * clients which are expecting text format would fail to read ledger metadata.
+     */
     @Test
     public void testOldClientReadsNewClient() throws Exception {
         oldClientVersions.each{
             def version = it
-            ThreadReaper.runWithReaper({ testReads(currentVersion, version) })
+            ThreadReaper.runWithReaper({ testReadOpenFailure(currentVersion, version) })
         }
     }
 
diff --git a/tests/backward-compat/yahoo-custom-version/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatUpgradeYahooCustom.groovy b/tests/backward-compat/yahoo-custom-version/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatUpgradeYahooCustom.groovy
index 18cdec8..9762b4b 100644
--- a/tests/backward-compat/yahoo-custom-version/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatUpgradeYahooCustom.groovy
+++ b/tests/backward-compat/yahoo-custom-version/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatUpgradeYahooCustom.groovy
@@ -170,11 +170,16 @@ class TestCompatUpgradeYahooCustom {
             openAndVerifyEntries(currentCL, currentBK, ledger4.getId())
             assertCantWrite(yahooCL, ledger4)
 
-            // yahoo client can fence a bookie created by current client
+            // Since METADATA_VERSION is upgraded and it is using binary format, the older
+            // clients which are expecting text format would fail to read ledger metadata.
             def ledger5 = createAndWrite(currentCL, currentBK)
             ledgers.add(ledger5.getId())
-            openAndVerifyEntries(yahooCL, yahooBK, ledger5.getId())
-            assertCantWrite(currentCL, ledger5)
+            try {
+                openAndVerifyEntries(yahooCL, yahooBK, ledger5.getId())
+            } catch (Exception exc) {
+                Assert.assertEquals(exc.getClass().getName(),
+                  "org.apache.bookkeeper.client.BKException\$ZKException")
+            }
         } finally {
             currentBK.close()
             currentCL.close()
@@ -203,6 +208,13 @@ class TestCompatUpgradeYahooCustom {
                                                    "org.apache.bookkeeper.stats.NullStatsProvider")
 
         Assert.assertTrue(BookKeeperClusterUtils.startAllBookiesWithVersion(docker, currentVersion))
-        exerciseClients(preUpgradeLedgers)
+        // Since METADATA_VERSION is upgraded and it is using binary format, the older
+        // clients which are expecting text format would fail to read ledger metadata.
+        try {
+            exerciseClients(preUpgradeLedgers)
+        } catch (Exception exc) {
+            Assert.assertEquals(exc.getClass().getName(),
+              "org.apache.bookkeeper.client.BKException\$ZKException")
+        }
     }
 }