You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2020/06/12 15:05:48 UTC

[bookkeeper] branch master updated: Remove lock from URManager’s state only if lock znode deletion has succeeded

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 0bea124  Remove lock from URManager’s state only if lock znode deletion has succeeded
0bea124 is described below

commit 0bea124989d68376249826e35b179a755eda9025
Author: Charan Reddy Guttapalem <cg...@salesforce.com>
AuthorDate: Fri Jun 12 08:05:39 2020 -0700

    Remove lock from URManager’s state only if lock znode deletion has succeeded
    
    
    
    
    Descriptions of the changes in this PR:
    
    - in ZkLedgerUnderreplicationManager.releaseUnderreplicatedLedger remove ‘lock’
     from ‘heldLocks’ only if lock znode deletion has succeeded.
    - This is needed because, if RW.logBKExceptionAndReleaseLedger fails to delete
     the lock znode, then it needs to be tried once more in the finally block of
     'rereplicate(long ledgerIdToReplicate)’ before giving up and shutting down RW.
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>
    
    This closes #2206 from reddycharan/fixzklossissue
---
 .../bookkeeper/meta/AbstractZkLedgerManager.java   |   2 +-
 .../bookkeeper/meta/HierarchicalLedgerManager.java |   2 +-
 .../meta/ZkLedgerUnderreplicationManager.java      |   3 +-
 .../replication/TestReplicationWorker.java         | 225 ++++++++++++++++++++-
 4 files changed, 228 insertions(+), 4 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
index 808cf89..0b42c3d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
@@ -181,7 +181,7 @@ public abstract class AbstractZkLedgerManager implements LedgerManager, Watcher
      *          Ledger ID
      * @return ledger node path
      */
-    protected abstract String getLedgerPath(long ledgerId);
+    public abstract String getLedgerPath(long ledgerId);
 
     /**
      * Get ledger id from its znode ledger path.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
index 9cd6aed..b5fd0f2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
@@ -73,7 +73,7 @@ class HierarchicalLedgerManager extends AbstractHierarchicalLedgerManager {
     }
 
     @Override
-    protected String getLedgerPath(long ledgerId) {
+    public String getLedgerPath(long ledgerId) {
         return ledgerRootPath + StringUtils.getHybridHierarchicalLedgerPath(ledgerId);
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
index abc2827..9de751e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
@@ -629,7 +629,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
             LOG.debug("releaseLedger(ledgerId={})", ledgerId);
         }
         try {
-            Lock l = heldLocks.remove(ledgerId);
+            Lock l = heldLocks.get(ledgerId);
             if (l != null) {
                 zkc.delete(l.getLockZNode(), -1);
             }
@@ -642,6 +642,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
             Thread.currentThread().interrupt();
             throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
         }
+        heldLocks.remove(ledgerId);
     }
 
     @Override
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
index 11531b5..09751e9 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
@@ -22,6 +22,7 @@ package org.apache.bookkeeper.replication;
 import static org.apache.bookkeeper.replication.ReplicationStats.NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -34,6 +35,7 @@ import java.util.Optional;
 import java.util.TimerTask;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import lombok.Cleanup;
 
@@ -44,11 +46,14 @@ import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.AbstractZkLedgerManager;
+import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
 import org.apache.bookkeeper.meta.MetadataBookieDriver;
 import org.apache.bookkeeper.meta.MetadataClientDriver;
 import org.apache.bookkeeper.meta.MetadataDrivers;
+import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
 import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
@@ -59,8 +64,17 @@ import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.test.TestStatsProvider;
 import org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger;
 import org.apache.bookkeeper.util.BookKeeperConstants;
+import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
+import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
+import org.apache.zookeeper.AsyncCallback.StatCallback;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -79,8 +93,10 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
     private MetadataBookieDriver driver;
     private LedgerManagerFactory mFactory;
     private LedgerUnderreplicationManager underReplicationManager;
+    private LedgerManager ledgerManager;
     private static byte[] data = "TestReplicationWorker".getBytes();
     private OrderedScheduler scheduler;
+    private String zkLedgersRootPath;
 
     public TestReplicationWorker() {
         this("org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory");
@@ -100,7 +116,7 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
     public void setUp() throws Exception {
         super.setUp();
 
-        String zkLedgersRootPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(baseClientConf);
+        zkLedgersRootPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(baseClientConf);
         basePath = zkLedgersRootPath + '/'
                 + BookKeeperConstants.UNDER_REPLICATION_NODE
                 + BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH;
@@ -121,6 +137,7 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
             NullStatsLogger.INSTANCE);
         // initialize urReplicationManager
         mFactory = driver.getLedgerManagerFactory();
+        ledgerManager = mFactory.newLedgerManager();
         underReplicationManager = mFactory.newLedgerUnderreplicationManager();
     }
 
@@ -857,4 +874,210 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
         }
     }
 
+    class MockZooKeeperClient extends ZooKeeperClient {
+        private final String connectString;
+        private final int sessionTimeoutMs;
+        private final ZooKeeperWatcherBase watcherManager;
+        private volatile String pathOfSetDataToFail;
+        private volatile String pathOfDeleteToFail;
+        private AtomicInteger numOfTimesSetDataFailed = new AtomicInteger();
+        private AtomicInteger numOfTimesDeleteFailed = new AtomicInteger();
+
+        MockZooKeeperClient(String connectString, int sessionTimeoutMs, ZooKeeperWatcherBase watcher)
+                throws IOException {
+            /*
+             * in OperationalRetryPolicy maxRetries is set to 0. So it wont
+             * retry incase of any error/exception.
+             */
+            super(connectString, sessionTimeoutMs, watcher,
+                    new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, Integer.MAX_VALUE),
+                    new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, 0),
+                    NullStatsLogger.INSTANCE, 1, 0, false);
+            this.connectString = connectString;
+            this.sessionTimeoutMs = sessionTimeoutMs;
+            this.watcherManager = watcher;
+        }
+
+        @Override
+        protected ZooKeeper createZooKeeper() throws IOException {
+            return new MockZooKeeper(this.connectString, this.sessionTimeoutMs, this.watcherManager, false);
+        }
+
+        private void setPathOfSetDataToFail(String pathOfSetDataToFail) {
+            this.pathOfSetDataToFail = pathOfSetDataToFail;
+        }
+
+        private void setPathOfDeleteToFail(String pathOfDeleteToFail) {
+            this.pathOfDeleteToFail = pathOfDeleteToFail;
+        }
+
+        private int getNumOfTimesSetDataFailed() {
+            return numOfTimesSetDataFailed.get();
+        }
+
+        private int getNumOfTimesDeleteFailed() {
+            return numOfTimesDeleteFailed.get();
+        }
+
+        class MockZooKeeper extends ZooKeeper {
+            public MockZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)
+                    throws IOException {
+                super(connectString, sessionTimeout, watcher, canBeReadOnly);
+            }
+
+            @Override
+            public void setData(final String path, final byte[] data, final int version, final StatCallback cb,
+                    final Object context) {
+                if ((pathOfSetDataToFail != null) && (pathOfSetDataToFail.equals(path))) {
+                    /*
+                     * if pathOfSetDataToFail matches with the path of the node,
+                     * then callback with CONNECTIONLOSS error.
+                     */
+                    LOG.error("setData of MockZooKeeper, is failing with CONNECTIONLOSS for path: {}", path);
+                    numOfTimesSetDataFailed.incrementAndGet();
+                    cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, context, null);
+                } else {
+                    super.setData(path, data, version, cb, context);
+                }
+            }
+
+            @Override
+            public void delete(final String path, final int version) throws KeeperException, InterruptedException {
+                if ((pathOfDeleteToFail != null) && (pathOfDeleteToFail.equals(path))) {
+                    /*
+                     * if pathOfDeleteToFail matches with the path of the node,
+                     * then throw CONNECTIONLOSS exception.
+                     */
+                    LOG.error("delete of MockZooKeeper, is failing with CONNECTIONLOSS for path: {}", path);
+                    numOfTimesDeleteFailed.incrementAndGet();
+                    throw new KeeperException.ConnectionLossException();
+                } else {
+                    super.delete(path, version);
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testRWShutDownInTheCaseOfZKOperationFailures() throws Exception {
+        /*
+         * create MockZooKeeperClient instance and wait for it to be connected.
+         */
+        int zkSessionTimeOut = 10000;
+        ZooKeeperWatcherBase zooKeeperWatcherBase = new ZooKeeperWatcherBase(zkSessionTimeOut,
+                NullStatsLogger.INSTANCE);
+        MockZooKeeperClient zkFaultInjectionWrapper = new MockZooKeeperClient(zkUtil.getZooKeeperConnectString(),
+                zkSessionTimeOut, zooKeeperWatcherBase);
+        zkFaultInjectionWrapper.waitForConnection();
+        assertEquals("zkFaultInjectionWrapper should be in connected state", States.CONNECTED,
+                zkFaultInjectionWrapper.getState());
+        long oldZkInstanceSessionId = zkFaultInjectionWrapper.getSessionId();
+
+        /*
+         * create ledger and add entries.
+         */
+        BookKeeper bkWithMockZK = new BookKeeper(baseClientConf, zkFaultInjectionWrapper);
+        long ledgerId = 567L;
+        LedgerHandle lh = bkWithMockZK.createLedgerAdv(ledgerId, 2, 2, 2, BookKeeper.DigestType.CRC32, TESTPASSWD,
+                null);
+        for (int i = 0; i < 10; i++) {
+            lh.addEntry(i, data);
+        }
+        lh.close();
+
+        /*
+         * trigger Expired event so that MockZooKeeperClient would run
+         * 'clientCreator' and create new zk handle. In this case it would
+         * create MockZooKeeper instance.
+         */
+        zooKeeperWatcherBase.process(new WatchedEvent(EventType.None, KeeperState.Expired, ""));
+        zkFaultInjectionWrapper.waitForConnection();
+        for (int i = 0; i < 10; i++) {
+            if (zkFaultInjectionWrapper.getState() == States.CONNECTED) {
+                break;
+            }
+            Thread.sleep(200);
+        }
+        assertEquals("zkFaultInjectionWrapper should be in connected state", States.CONNECTED,
+                zkFaultInjectionWrapper.getState());
+        assertNotEquals("Session Id of old and new ZK instance should be different", oldZkInstanceSessionId,
+                zkFaultInjectionWrapper.getSessionId());
+
+        /*
+         * Kill a Bookie, so that ledger becomes underreplicated. Since totally
+         * 3 bookies are available and the ensemblesize of the current ledger is
+         * 2, we should be able to replicate to the other bookie.
+         */
+        BookieSocketAddress replicaToKill = lh.getLedgerMetadata().getAllEnsembles().get(0L).get(0);
+        LOG.info("Killing Bookie", replicaToKill);
+        killBookie(replicaToKill);
+
+        /*
+         * Start RW.
+         */
+        ReplicationWorker rw = new ReplicationWorker(baseConf, bkWithMockZK, false, NullStatsLogger.INSTANCE);
+        rw.start();
+        try {
+            for (int i = 0; i < 40; i++) {
+                if (rw.isRunning()) {
+                    break;
+                }
+                LOG.info("Waiting for the RW to start...");
+                Thread.sleep(500);
+            }
+            assertTrue("RW should be running", rw.isRunning());
+
+            /*
+             * Since Auditor is not running, ledger needs to be marked
+             * underreplicated explicitly. But before marking ledger
+             * underreplicated, set paths for which MockZooKeeper's setData and
+             * Delete operation to fail.
+             *
+             * ZK.setData will be called by 'updateEnsembleInfo' operation after
+             * completion of copying to a new bookie. ZK.delete will be called by
+             * RW.logBKExceptionAndReleaseLedger and finally block in
+             * 'rereplicate(long ledgerIdToReplicate)'
+             */
+            AbstractZkLedgerManager absZKLedgerManager = (AbstractZkLedgerManager) ledgerManager;
+            String ledgerPath = absZKLedgerManager.getLedgerPath(ledgerId);
+            String urLockPath = ZkLedgerUnderreplicationManager
+                    .getUrLedgerLockZnode(ZkLedgerUnderreplicationManager.getUrLockPath(zkLedgersRootPath), ledgerId);
+            zkFaultInjectionWrapper.setPathOfSetDataToFail(ledgerPath);
+            zkFaultInjectionWrapper.setPathOfDeleteToFail(urLockPath);
+            underReplicationManager.markLedgerUnderreplicated(lh.getId(), replicaToKill.toString());
+
+            /*
+             * Since there is only one RW, it will try to replicate underreplicated
+             * ledger. After completion of copying it to a new bookie, it will try
+             * to update ensembleinfo. Which would fail with our MockZK. After that
+             * it would try to delete lock znode as part of
+             * RW.logBKExceptionAndReleaseLedger, which will also fail because of
+             * our MockZK. In the finally block in 'rereplicate(long
+             * ledgerIdToReplicate)' it would try one more time to delete the ledger
+             * and once again it will fail because of our MockZK. So RW gives up and
+             * shutdowns itself.
+             */
+            for (int i = 0; i < 40; i++) {
+                if (!rw.isRunning()) {
+                    break;
+                }
+                LOG.info("Waiting for the RW to shutdown...");
+                Thread.sleep(500);
+            }
+
+            /*
+             * as described earlier, numOfTimes setDataFailed should be 1 and
+             * numOfTimes deleteFailed should be 2
+             */
+            assertEquals("NumOfTimesSetDataFailed", 1,
+                    zkFaultInjectionWrapper.getNumOfTimesSetDataFailed());
+            assertEquals("NumOfTimesDeleteFailed", 2,
+                    zkFaultInjectionWrapper.getNumOfTimesDeleteFailed());
+            assertFalse("RW should be shutdown", rw.isRunning());
+        } finally {
+            rw.shutdown();
+            zkFaultInjectionWrapper.close();
+            bkWithMockZK.close();
+        }
+    }
 }