You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/10/04 12:40:30 UTC

svn commit: r1393983 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ bookkee...

Author: ivank
Date: Thu Oct  4 10:40:29 2012
New Revision: 1393983

URL: http://svn.apache.org/viewvc?rev=1393983&view=rev
Log:
BOOKKEEPER-278: Ability to disable auto recovery temporarily (rakeshr via ivank)

Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationEnableCb.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZkUtils.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1393983&r1=1393982&r2=1393983&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Thu Oct  4 10:40:29 2012
@@ -146,6 +146,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-388: Document bookie format command (kiran_bc via ivank)
 
+        BOOKKEEPER-278: Ability to disable auto recovery temporarily (rakeshr via ivank)
+
       hedwig-server:
 
         BOOKKEEPER-250: Need a ledger manager like interface to manage metadata operations in Hedwig (sijie via ivank)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java?rev=1393983&r1=1393982&r2=1393983&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java Thu Oct  4 10:40:29 2012
@@ -17,6 +17,7 @@
  */
 package org.apache.bookkeeper.meta;
 
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.replication.ReplicationException;
 
 /**
@@ -61,4 +62,39 @@ public interface LedgerUnderreplicationM
      */
     void close()
             throws ReplicationException.UnavailableException;
+
+    /**
+     * Stop ledger replication. Currently running ledger rereplication tasks
+     * will be continued and will be stopped from next task. This will block
+     * ledger replication {@link #Auditor} and {@link #getLedgerToRereplicate()}
+     * tasks
+     */
+    void disableLedgerReplication()
+            throws ReplicationException.UnavailableException;
+
+    /**
+     * Resuming ledger replication. This will allow ledger replication
+     * {@link #Auditor} and {@link #getLedgerToRereplicate()} tasks to continue
+     */
+    void enableLedgerReplication()
+            throws ReplicationException.UnavailableException;
+
+    /**
+     * Check whether the ledger replication is enabled or not. This will return
+     * true if the ledger replication is enabled, otherwise return false
+     * 
+     * @return - return true if it is enabled otherwise return false
+     */
+    boolean isLedgerReplicationEnabled()
+            throws ReplicationException.UnavailableException;
+
+    /**
+     * Receive notification asynchronously when the ledger replication process
+     * is enabled
+     * 
+     * @param cb
+     *            - callback implementation to receive the notification
+     */
+    void notifyLedgerReplicationEnabled(GenericCallback<Void> cb)
+            throws ReplicationException.UnavailableException;
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java?rev=1393983&r1=1393982&r2=1393983&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java Thu Oct  4 10:40:29 2012
@@ -18,7 +18,11 @@
 
 package org.apache.bookkeeper.meta;
 
+import org.apache.bookkeeper.replication.ReplicationEnableCb;
 import org.apache.bookkeeper.replication.ReplicationException;
+import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat;
 import org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat;
 import org.apache.bookkeeper.conf.AbstractConfiguration;
@@ -65,6 +69,7 @@ public class ZkLedgerUnderreplicationMan
     static final Logger LOG = LoggerFactory.getLogger(ZkLedgerUnderreplicationManager.class);
     static final Charset UTF8 = Charset.forName("UTF-8");
     public static final String UNDER_REPLICATION_NODE = "underreplication";
+    static final String DISABLE_NODE = "disable";
     static final String LAYOUT="BASIC";
     static final int LAYOUT_VERSION=1;
 
@@ -104,22 +109,6 @@ public class ZkLedgerUnderreplicationMan
         checkLayout();
     }
 
-    private void createOptimistic(String path, byte[] data) throws KeeperException, InterruptedException {
-        try {
-            zkc.create(path, data,
-                       Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        } catch (KeeperException.NoNodeException nne) {
-            int lastSlash = path.lastIndexOf('/');
-            if (lastSlash <= 0) {
-                throw nne;
-            }
-            String parent = path.substring(0, lastSlash);
-            createOptimistic(parent, new byte[0]);
-            zkc.create(path, data,
-                       Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        }        
-    }
-
     private void checkLayout()
             throws KeeperException, InterruptedException, ReplicationException.CompatibilityException {
         if (zkc.exists(basePath, false) == null) {
@@ -212,8 +201,9 @@ public class ZkLedgerUnderreplicationMan
                 UnderreplicatedLedgerFormat.Builder builder = UnderreplicatedLedgerFormat.newBuilder();
                 try {
                     builder.addReplica(missingReplica);
-                    createOptimistic(znode,
-                                     TextFormat.printToString(builder.build()).getBytes(UTF8));
+                    ZkUtils.createFullPathOptimistic(zkc, znode, TextFormat
+                            .printToString(builder.build()).getBytes(UTF8),
+                            Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                 } catch (KeeperException.NodeExistsException nee) {
                     Stat s = zkc.exists(znode, false);
                     if (s == null) {
@@ -331,6 +321,7 @@ public class ZkLedgerUnderreplicationMan
         LOG.debug("getLedgerToRereplicate()");
         try {
             while (true) {
+                waitIfLedgerReplicationDisabled();
                 final CountDownLatch changedLatch = new CountDownLatch(1);
                 Watcher w = new Watcher() {
                         public void process(WatchedEvent e) {
@@ -352,10 +343,19 @@ public class ZkLedgerUnderreplicationMan
             throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
-            throw new ReplicationException.UnavailableException("Interrupted while conecting zookeeper", ie);
+            throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
         }
     }
 
+    private void waitIfLedgerReplicationDisabled() throws UnavailableException,
+            InterruptedException {
+        ReplicationEnableCb cb = new ReplicationEnableCb();
+        if (!this.isLedgerReplicationEnabled()) {
+            this.notifyLedgerReplicationEnabled(cb);
+            cb.await();
+        }
+    }
+    
     @Override
     public void releaseUnderreplicatedLedger(long ledgerId) throws ReplicationException.UnavailableException {
         LOG.debug("releaseLedger(ledgerId={})", ledgerId);
@@ -371,7 +371,7 @@ public class ZkLedgerUnderreplicationMan
             throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
-            throw new ReplicationException.UnavailableException("Interrupted while conecting zookeeper", ie);
+            throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
         }
     }
 
@@ -389,7 +389,95 @@ public class ZkLedgerUnderreplicationMan
             throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
-            throw new ReplicationException.UnavailableException("Interrupted while conecting zookeeper", ie);
+            throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
+        }
+    }
+
+    @Override
+    public void disableLedgerReplication()
+            throws ReplicationException.UnavailableException {
+        LOG.debug("disableLedegerReplication()");
+        try {
+            ZkUtils.createFullPathOptimistic(zkc, basePath + '/'
+                    + ZkLedgerUnderreplicationManager.DISABLE_NODE, ""
+                    .getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            LOG.info("Auto ledger re-replication is disabled!");
+        } catch (KeeperException ke) {
+            LOG.error("Exception while stopping replication", ke);
+            throw new ReplicationException.UnavailableException(
+                    "Error contacting zookeeper", ke);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            throw new ReplicationException.UnavailableException(
+                    "Interrupted while connecting zookeeper", ie);
+        }
+    }
+
+    @Override
+    public void enableLedgerReplication()
+            throws ReplicationException.UnavailableException {
+        LOG.debug("enableLedegerReplication()");
+        try {
+            zkc.delete(basePath + '/'
+                    + ZkLedgerUnderreplicationManager.DISABLE_NODE, -1);
+            LOG.info("Resuming automatic ledger re-replication");
+        } catch (KeeperException ke) {
+            LOG.error("Exception while resuming ledger replication", ke);
+            throw new ReplicationException.UnavailableException(
+                    "Error contacting zookeeper", ke);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            throw new ReplicationException.UnavailableException(
+                    "Interrupted while connecting zookeeper", ie);
+        }
+    }
+
+    @Override
+    public boolean isLedgerReplicationEnabled()
+            throws ReplicationException.UnavailableException {
+        LOG.debug("isLedgerReplicationEnabled()");
+        try {
+            if (null != zkc.exists(basePath + '/' + DISABLE_NODE, false)) {
+                return false;
+            }
+            return true;
+        } catch (KeeperException ke) {
+            LOG.error("Error while checking the state of "
+                    + "ledger re-replication", ke);
+            throw new ReplicationException.UnavailableException(
+                    "Error contacting zookeeper", ke);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            throw new ReplicationException.UnavailableException(
+                    "Interrupted while contacting zookeeper", ie);
+        }
+    }
+
+    @Override
+    public void notifyLedgerReplicationEnabled(final GenericCallback<Void> cb)
+            throws ReplicationException.UnavailableException {
+        LOG.debug("notifyLedgerReplicationEnabled()");
+        Watcher w = new Watcher() {
+            public void process(WatchedEvent e) {
+                if (e.getType() == Watcher.Event.EventType.NodeDeleted) {
+                    cb.operationComplete(0, null);
+                }
+            }
+        };
+        try {
+            if (null == zkc.exists(basePath + '/' + DISABLE_NODE, w)) {
+                cb.operationComplete(0, null);
+                return;
+            }
+        } catch (KeeperException ke) {
+            LOG.error("Error while checking the state of "
+                    + "ledger re-replication", ke);
+            throw new ReplicationException.UnavailableException(
+                    "Error contacting zookeeper", ke);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            throw new ReplicationException.UnavailableException(
+                    "Interrupted while contacting zookeeper", ie);
         }
     }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java?rev=1393983&r1=1393982&r2=1393983&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java Thu Oct  4 10:40:29 2012
@@ -106,6 +106,10 @@ public class Auditor extends Thread impl
             while (true) {
                 // wait for bookie join/failure notifications
                 bookieNotifications.take();
+
+                // check whether ledger replication is enabled
+                waitIfLedgerReplicationDisabled();
+
                 List<String> availableBookies = getAvailableBookies();
 
                 // casting to String, as knownBookies and availableBookies
@@ -132,11 +136,22 @@ public class Auditor extends Thread impl
             LOG.error("Interrupted while watching available bookies ", ie);
         } catch (BKAuditException bke) {
             LOG.error("Exception while watching available bookies", bke);
+        } catch (UnavailableException ue) {
+            LOG.error("Exception while watching available bookies", ue);
         }
 
         shutdown();
     }
 
+    private void waitIfLedgerReplicationDisabled() throws UnavailableException,
+            InterruptedException {
+        ReplicationEnableCb cb = new ReplicationEnableCb();
+        if (!ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
+            ledgerUnderreplicationManager.notifyLedgerReplicationEnabled(cb);
+            cb.await();
+        }
+    }
+    
     private List<String> getAvailableBookies() throws KeeperException,
             InterruptedException {
         return zkc.getChildren(conf.getZkAvailableBookiesPath(), this);

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationEnableCb.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationEnableCb.java?rev=1393983&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationEnableCb.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationEnableCb.java Thu Oct  4 10:40:29 2012
@@ -0,0 +1,56 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.replication;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Callback which is getting notified when the replication process is enabled
+ */
+public class ReplicationEnableCb implements GenericCallback<Void> {
+
+    private static final Logger LOG = LoggerFactory
+            .getLogger(ReplicationEnableCb.class);
+    private final CountDownLatch latch = new CountDownLatch(1);
+
+    @Override
+    public void operationComplete(int rc, Void result) {
+        latch.countDown();
+        LOG.debug("Automatic ledger re-replication is enabled");
+    }
+
+    /**
+     * This is a blocking call and causes the current thread to wait until the
+     * replication process is enabled
+     * 
+     * @throws InterruptedException
+     *             interrupted while waiting
+     */
+    public void await() throws InterruptedException {
+        LOG.debug("Automatic ledger re-replication is disabled. "
+                + "Hence waiting until its enabled!");
+        latch.await();
+    }
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZkUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZkUtils.java?rev=1393983&r1=1393982&r2=1393983&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZkUtils.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZkUtils.java Thu Oct  4 10:40:29 2012
@@ -140,4 +140,38 @@ public class ZkUtils {
         }
         return newZk;
     }
+
+    /**
+     * Utility to create the complete znode path synchronously
+     * 
+     * @param zkc
+     *            - ZK instance
+     * @param path
+     *            - znode path
+     * @param data
+     *            - znode data
+     * @param acl
+     *            - Acl of the zk path
+     * @param createMode
+     *            - Create mode of zk path
+     * @throws KeeperException
+     *             if the server returns a non-zero error code, or invalid ACL
+     * @throws InterruptedException
+     *             if the transaction is interrupted
+     */
+    public static void createFullPathOptimistic(ZooKeeper zkc, String path,
+            byte[] data, final List<ACL> acl, final CreateMode createMode)
+            throws KeeperException, InterruptedException {
+        try {
+            zkc.create(path, data, acl, createMode);
+        } catch (KeeperException.NoNodeException nne) {
+            int lastSlash = path.lastIndexOf('/');
+            if (lastSlash <= 0) {
+                throw nne;
+            }
+            String parent = path.substring(0, lastSlash);
+            createFullPathOptimistic(zkc, parent, new byte[0], acl, createMode);
+            zkc.create(path, data, acl, createMode);
+        }
+    }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java?rev=1393983&r1=1393982&r2=1393983&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java Thu Oct  4 10:40:29 2012
@@ -256,6 +256,31 @@ public class AuditorLedgerCheckerTest ex
         }
     }
 
+    @Test//(timeout = 30000)
+    public void testToggleLedgerReplication() throws Exception {
+        LedgerHandle lh1 = createAndAddEntriesToLedger();
+        ledgerList.add(lh1.getId());
+        LOG.debug("Created following ledgers : " + ledgerList);
+
+        // failing another bookie
+        CountDownLatch urReplicaLatch = registerUrLedgerWatcher(ledgerList
+                .size());
+
+        // disabling ledger replication
+        urLedgerMgr.disableLedgerReplication();
+        ArrayList<String> shutdownBookieList = new ArrayList<String>();
+        shutdownBookieList.add(shutdownBookie(bs.size() - 1));
+        shutdownBookieList.add(shutdownBookie(bs.size() - 1));
+
+        assertFalse("Ledger replication is not disabled!", urReplicaLatch
+                .await(5, TimeUnit.SECONDS));
+
+        // enabling ledger replication
+        urLedgerMgr.enableLedgerReplication();
+        assertTrue("Ledger replication is not disabled!", urReplicaLatch.await(
+                5, TimeUnit.SECONDS));
+    }
+
     private CountDownLatch registerUrLedgerWatcher(int count)
             throws KeeperException, InterruptedException {
         final CountDownLatch underReplicaLatch = new CountDownLatch(count);

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java?rev=1393983&r1=1393982&r2=1393983&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java Thu Oct  4 10:40:29 2012
@@ -49,8 +49,12 @@ import org.apache.bookkeeper.replication
 import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
 import org.apache.bookkeeper.test.ZooKeeperUtil;
 import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher.Event.EventType;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -76,6 +80,7 @@ public class TestLedgerUnderreplicationM
 
     String basePath;
     String urLedgerPath;
+    boolean isLedgerReplicationDisabled = true;
 
     @Before
     public void setupZooKeeper() throws Exception {
@@ -466,6 +471,119 @@ public class TestLedgerUnderreplicationM
         verifyMarkLedgerUnderreplicated(missingReplica);
     }
 
+    /**
+     * Test disabling the ledger re-replication. After disabling, it will not be
+     * able to getLedgerToRereplicate(). This calls will enter into infinite
+     * waiting until enabling rereplication process
+     */
+    @Test(timeout = 20000)
+    public void testDisableLedegerReplication() throws Exception {
+        final LedgerUnderreplicationManager replicaMgr = lmf1
+                .newLedgerUnderreplicationManager();
+
+        // simulate few urLedgers before disabling
+        final Long ledgerA = 0xfeadeefdacL;
+        final String missingReplica = "localhost:3181";
+
+        // disabling replication
+        replicaMgr.disableLedgerReplication();
+        LOG.info("Disabled Ledeger Replication");
+
+        try {
+            replicaMgr.markLedgerUnderreplicated(ledgerA, missingReplica);
+        } catch (UnavailableException e) {
+            LOG.debug("Unexpected exception while marking urLedger", e);
+            fail("Unexpected exception while marking urLedger" + e.getMessage());
+        }
+
+        Future<Long> fA = getLedgerToReplicate(replicaMgr);
+        try {
+            fA.get(5, TimeUnit.SECONDS);
+            fail("Shouldn't be able to find a ledger to replicate");
+        } catch (TimeoutException te) {
+            // expected behaviour, as the replication is disabled
+            isLedgerReplicationDisabled = false;
+        }
+
+        assertTrue("Ledger replication is not disabled!",
+                !isLedgerReplicationDisabled);
+    }
+
+    /**
+     * Test enabling the ledger re-replication. After enableLedegerReplication,
+     * should continue getLedgerToRereplicate() task
+     */
+    @Test(timeout = 20000)
+    public void testEnableLedegerReplication() throws Exception {
+        isLedgerReplicationDisabled = true;
+        final LedgerUnderreplicationManager replicaMgr = lmf1
+                .newLedgerUnderreplicationManager();
+
+        // simulate few urLedgers before disabling
+        final Long ledgerA = 0xfeadeefdacL;
+        final String missingReplica = "localhost:3181";
+        try {
+            replicaMgr.markLedgerUnderreplicated(ledgerA, missingReplica);
+        } catch (UnavailableException e) {
+            LOG.debug("Unexpected exception while marking urLedger", e);
+            fail("Unexpected exception while marking urLedger" + e.getMessage());
+        }
+
+        // disabling replication
+        replicaMgr.disableLedgerReplication();
+        LOG.debug("Disabled Ledeger Replication");
+
+        String znodeA = getUrLedgerZnode(ledgerA);
+        final CountDownLatch znodeLatch = new CountDownLatch(2);
+        String urledgerA = StringUtils.substringAfterLast(znodeA, "/");
+        String urLockLedgerA = basePath + "/locks/" + urledgerA;
+        zkc1.exists(urLockLedgerA, new Watcher(){
+            @Override
+            public void process(WatchedEvent event) {
+                if (event.getType() == EventType.NodeCreated) {
+                    znodeLatch.countDown();
+                    LOG.debug("Recieved node creation event for the zNodePath:"
+                            + event.getPath());
+                }
+                
+            }});
+        // getLedgerToRereplicate is waiting until enable rereplication
+        Thread thread1 = new Thread() {
+            @Override
+            public void run() {
+                try {
+                    Long lA = replicaMgr.getLedgerToRereplicate();
+                    assertEquals("Should be the ledger I just marked", lA,
+                            ledgerA);
+                    isLedgerReplicationDisabled = false;
+                    znodeLatch.countDown();
+                } catch (UnavailableException e) {
+                    LOG.debug("Unexpected exception while marking urLedger", e);
+                    isLedgerReplicationDisabled = false;
+                }
+            }
+        };
+        thread1.start();
+
+        try {
+            znodeLatch.await(5, TimeUnit.SECONDS);
+            assertTrue("Ledger replication is not disabled!",
+                    isLedgerReplicationDisabled);
+            assertEquals("Failed to disable ledger replication!", 2, znodeLatch
+                    .getCount());
+
+            replicaMgr.enableLedgerReplication();
+            znodeLatch.await(5, TimeUnit.SECONDS);
+            LOG.debug("Enabled Ledeger Replication");
+            assertTrue("Ledger replication is not disabled!",
+                    !isLedgerReplicationDisabled);
+            assertEquals("Failed to disable ledger replication!", 0, znodeLatch
+                    .getCount());
+        } finally {
+            thread1.interrupt();
+        }
+    }
+
     private void verifyMarkLedgerUnderreplicated(Collection<String> missingReplica)
             throws KeeperException, InterruptedException,
             CompatibilityException, UnavailableException {