You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/09/12 20:34:12 UTC

svn commit: r1384080 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ bookk...

Author: ivank
Date: Wed Sep 12 18:34:11 2012
New Revision: 1384080

URL: http://svn.apache.org/viewvc?rev=1384080&view=rev
Log:
BOOKKEEPER-325: Delay the replication of a ledger if RW found that its last fragment is in underReplication. (umamahesh via ivank)

Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1384080&r1=1384079&r2=1384080&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Wed Sep 12 18:34:11 2012
@@ -134,6 +134,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-208: Separate write quorum from ack quorum (ivank)
 
+        BOOKKEEPER-325: Delay the replication of a ledger if RW found that its last fragment is in underReplication. (umamahesh via ivank)
+
       hedwig-server:
 
         BOOKKEEPER-250: Need a ledger manager like interface to manage metadata operations in Hedwig (sijie via ivank)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java?rev=1384080&r1=1384079&r2=1384080&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java Wed Sep 12 18:34:11 2012
@@ -63,7 +63,7 @@ public class LedgerFragment {
      * last fragment, then no one will modify this fragment.</li>
      * </ol>
      */
-    boolean isClosed() {
+    public boolean isClosed() {
         return isLedgerClosed;
     }
 

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java?rev=1384080&r1=1384079&r2=1384080&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java Wed Sep 12 18:34:11 2012
@@ -57,6 +57,7 @@ public class ServerConfiguration extends
     protected final static String ZK_SERVERS = "zkServers";
     // Statistics Parameters
     protected final static String ENABLE_STATISTICS = "enableStatistics";
+    protected final static String OPEN_LEDGER_REREPLICATION_GRACE_PERIOD = "openLedgerRereplicationGracePeriod";
 
     /**
      * Construct a default configuration object
@@ -546,4 +547,30 @@ public class ServerConfiguration extends
         setProperty(MAJOR_COMPACTION_INTERVAL, interval);
         return this;
     }
+    
+    /**
+     * Set the grace period which the rereplication worker will wait before
+     * fencing and rereplicating a ledger fragment which is still being written
+     * to, on bookie failure.
+     * 
+     * The grace period allows the writer to detect the bookie failure, and and
+     * start writing to another ledger fragment. If the writer writes nothing
+     * during the grace period, the rereplication worker assumes that it has
+     * crashed and therefore fences the ledger, preventing any further writes to
+     * that ledger.
+     * 
+     * @see LedgerHandle#openLedger
+     */
+    public void setOpenLedgerRereplicationGracePeriod(String waitTime) {
+        setProperty(OPEN_LEDGER_REREPLICATION_GRACE_PERIOD, waitTime);
+    }
+
+    /**
+     * Get the grace period which the rereplication worker to wait before
+     * fencing and rereplicating a ledger fragment which is still being written
+     * to, on bookie failure.
+     */
+    public long getOpenLedgerRereplicationGracePeriod() {
+        return getLong(OPEN_LEDGER_REREPLICATION_GRACE_PERIOD, 30000);
+    }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java?rev=1384080&r1=1384079&r2=1384080&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java Wed Sep 12 18:34:11 2012
@@ -23,6 +23,8 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.CountDownLatch;
 
 import org.apache.bookkeeper.client.BKException;
@@ -36,6 +38,7 @@ import org.apache.bookkeeper.client.BKEx
 import org.apache.bookkeeper.client.BKException.BKReadException;
 import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
@@ -53,15 +56,17 @@ import org.slf4j.LoggerFactory;
 public class ReplicationWorker implements Runnable {
     private static Logger LOG = LoggerFactory
             .getLogger(ReplicationWorker.class);
-    private LedgerUnderreplicationManager underreplicationManager;
+    final private LedgerUnderreplicationManager underreplicationManager;
     private AbstractConfiguration conf;
     private ZooKeeper zkc;
     private volatile boolean workerRunning = false;
-    private BookKeeperAdmin admin;
+    final private BookKeeperAdmin admin;
     private LedgerChecker ledgerChecker;
     private InetSocketAddress targetBookie;
     private BookKeeper bkc;
     private Thread workerThread;
+    private long openLedgerRereplicationGracePeriod;
+    private Timer pendingReplicationTimer;
 
     /**
      * Replication worker for replicating the ledger fragments from
@@ -77,7 +82,7 @@ public class ReplicationWorker implement
      *            local Bookie address.
      */
     public ReplicationWorker(final ZooKeeper zkc,
-            final AbstractConfiguration conf, InetSocketAddress targetBKAddr)
+            final ServerConfiguration conf, InetSocketAddress targetBKAddr)
             throws CompatibilityException, KeeperException,
             InterruptedException, IOException {
         this.zkc = zkc;
@@ -91,6 +96,9 @@ public class ReplicationWorker implement
         this.admin = new BookKeeperAdmin(bkc);
         this.ledgerChecker = new LedgerChecker(bkc);
         this.workerThread = new Thread(this);
+        this.openLedgerRereplicationGracePeriod = conf
+                .getOpenLedgerRereplicationGracePeriod();
+        this.pendingReplicationTimer = new Timer("PendingReplicationTimer");
     }
 
     /** Start the replication worker */
@@ -103,54 +111,7 @@ public class ReplicationWorker implement
         workerRunning = true;
         while (workerRunning) {
             try {
-                long ledgerIdToReplicate = underreplicationManager
-                        .getLedgerToRereplicate();
-                LOG.info("Going to replicate the fragments of the ledger: "
-                        + ledgerIdToReplicate);
-                LedgerHandle lh;
-                try {
-                    lh = admin.openLedgerNoRecovery(ledgerIdToReplicate);
-                } catch (BKNoSuchLedgerExistsException e) {
-                    // Ledger might have been deleted by user
-                    LOG.info("BKNoSuchLedgerExistsException while opening "
-                            + "ledger for replication. Other clients "
-                            + "might have deleted the ledger. "
-                            + "So, no harm to continue");
-                    underreplicationManager
-                            .markLedgerReplicated(ledgerIdToReplicate);
-                    continue;
-                } catch (BKReadException e) {
-                    LOG.info("BKReadException while"
-                            + " opening ledger for replication."
-                            + " Enough Bookies might not have available"
-                            + "So, no harm to continue");
-                    underreplicationManager
-                            .releaseUnderreplicatedLedger(ledgerIdToReplicate);
-                    continue;
-                } catch (BKBookieHandleNotAvailableException e) {
-                    LOG.info("BKBookieHandleNotAvailableException while"
-                            + " opening ledger for replication."
-                            + " Enough Bookies might not have available"
-                            + "So, no harm to continue");
-                    underreplicationManager
-                            .releaseUnderreplicatedLedger(ledgerIdToReplicate);
-                    continue;
-                }
-
-                boolean isAllFragmentsReplicated = doReplicateFragments(
-                        ledgerIdToReplicate, lh);
-                if (!isAllFragmentsReplicated) {
-                    // Releasing the underReplication ledger lock and compete
-                    // for the replication again for the pending fragments
-                    underreplicationManager
-                            .releaseUnderreplicatedLedger(ledgerIdToReplicate);
-                    continue;
-                }
-
-                LOG.info("Ledger replicated successfully. ledger id is: "
-                        + ledgerIdToReplicate);
-                underreplicationManager
-                        .markLedgerReplicated(ledgerIdToReplicate);
+                rereplicate();
             } catch (InterruptedException e) {
                 shutdown();
                 Thread.currentThread().interrupt();
@@ -173,30 +134,55 @@ public class ReplicationWorker implement
     /**
      * Replicates the under replicated fragments from failed bookie ledger to
      * targetBookie
-     * 
-     * @return - false if the re-replication fails for any fragment. Also
-     *         returns false if fragment ensemble contains the target bookie
-     *         (since target bookie already present in the current fragment
-     *         ensemble, it will skip replication for that particular
-     *         fragments). Returns true if all fragments replicated
-     *         successfully.
-     * @throws BKException 
      */
-    private boolean doReplicateFragments(long ledgerIdToReplicate,
-            LedgerHandle lh) throws InterruptedException, BKException {
-        CheckerCallback checkerCb = new CheckerCallback();
-        ledgerChecker.checkLedger(lh, checkerCb);
-        Set<LedgerFragment> fragments = checkerCb.waitAndGetResult();
+    private void rereplicate() throws InterruptedException, BKException,
+            UnavailableException {
+        long ledgerIdToReplicate = underreplicationManager
+                .getLedgerToRereplicate();
+        LOG.info("Going to replicate the fragments of the ledger: "
+                + ledgerIdToReplicate);
+        LedgerHandle lh;
+        try {
+            lh = admin.openLedgerNoRecovery(ledgerIdToReplicate);
+        } catch (BKNoSuchLedgerExistsException e) {
+            // Ledger might have been deleted by user
+            LOG.info("BKNoSuchLedgerExistsException while opening "
+                    + "ledger for replication. Other clients "
+                    + "might have deleted the ledger. "
+                    + "So, no harm to continue");
+            underreplicationManager.markLedgerReplicated(ledgerIdToReplicate);
+            return;
+        } catch (BKReadException e) {
+            LOG.info("BKReadException while"
+                    + " opening ledger for replication."
+                    + " Enough Bookies might not have available"
+                    + "So, no harm to continue");
+            underreplicationManager
+                    .releaseUnderreplicatedLedger(ledgerIdToReplicate);
+            return;
+        } catch (BKBookieHandleNotAvailableException e) {
+            LOG.info("BKBookieHandleNotAvailableException while"
+                    + " opening ledger for replication."
+                    + " Enough Bookies might not have available"
+                    + "So, no harm to continue");
+            underreplicationManager
+                    .releaseUnderreplicatedLedger(ledgerIdToReplicate);
+            return;
+        }
+        Set<LedgerFragment> fragments = getUnderreplicatedFragments(lh);
         LOG.info("Founds fragments " + fragments
                 + " for replication from ledger: " + ledgerIdToReplicate);
-        boolean isTargetBookieExistsInFragmentEnsemble = false;
-        boolean isAllFragmentsReplicated = true;
+
+        boolean foundOpenFragments = false;
         for (LedgerFragment ledgerFragment : fragments) {
-            if (isTargetBookieExistsInFragmentEnsemble(lh, ledgerFragment)) {
+            if (!ledgerFragment.isClosed()) {
+                foundOpenFragments = true;
+                continue;
+            } else if (isTargetBookieExistsInFragmentEnsemble(lh,
+                    ledgerFragment)) {
                 LOG.info("Target Bookie[" + targetBookie
                         + "] found in the fragment ensemble:"
                         + ledgerFragment.getEnsemble());
-                isTargetBookieExistsInFragmentEnsemble = true;
                 continue;
             }
             try {
@@ -204,35 +190,97 @@ public class ReplicationWorker implement
             } catch (BKException.BKBookieHandleNotAvailableException e) {
                 LOG.warn("BKBookieHandleNotAvailableException "
                         + "while replicating the fragment", e);
-                isAllFragmentsReplicated = false;
             } catch (BKException.BKLedgerRecoveryException e) {
                 LOG.warn("BKLedgerRecoveryException "
                         + "while replicating the fragment", e);
-                isAllFragmentsReplicated = false;
             }
+        }
 
+        if (foundOpenFragments) {
+            deferLedgerLockRelease(ledgerIdToReplicate);
+            return;
         }
-        if (isTargetBookieExistsInFragmentEnsemble) {
-            LOG.info("Releasing the lock, as target Bookie found"
-                    + " in the fragments ensemble.");
-            return false;
-        }
-
-        // There might be some connectivity issues while replicating,
-        // so, still I am eligible to replicate it, lets retry.
-        if (!isAllFragmentsReplicated) {
-            LOG.info("Could not replicate all fragments."
-                    + "So, Releasing the lock. Let's compete "
-                    + "for the replication again");
-            return false;
+        
+        fragments = getUnderreplicatedFragments(lh);
+        if (fragments.size() == 0) {
+            LOG.info("Ledger replicated successfully. ledger id is: "
+                    + ledgerIdToReplicate);
+            underreplicationManager.markLedgerReplicated(ledgerIdToReplicate);
+        } else {
+            // Releasing the underReplication ledger lock and compete
+            // for the replication again for the pending fragments
+            underreplicationManager
+                    .releaseUnderreplicatedLedger(ledgerIdToReplicate);
         }
+    }
 
-        // Re-replication success
-        return true;
-
+    /** Gets the under replicated fragments */
+    private Set<LedgerFragment> getUnderreplicatedFragments(LedgerHandle lh)
+            throws InterruptedException {
+        CheckerCallback checkerCb = new CheckerCallback();
+        ledgerChecker.checkLedger(lh, checkerCb);
+        Set<LedgerFragment> fragments = checkerCb.waitAndGetResult();
+        return fragments;
     }
 
     /**
+     * Schedules a timer task for releasing the lock which will be scheduled
+     * after open ledger fragment replication time. Ledger will be fenced if it
+     * is still in open state when timer task fired
+     */
+    private void deferLedgerLockRelease(final long ledgerId) {
+        long gracePeriod = this.openLedgerRereplicationGracePeriod;
+        TimerTask timerTask = new TimerTask() {
+            @Override
+            public void run() {
+                LedgerHandle lh = null;
+                try {
+                    lh = admin.openLedgerNoRecovery(ledgerId);
+                    Set<LedgerFragment> fragments = getUnderreplicatedFragments(lh);
+                    for (LedgerFragment fragment : fragments) {
+                        if (!fragment.isClosed()) {
+                            lh = admin.openLedger(ledgerId);
+                            break;
+                        }
+                    }
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    LOG.info("InterruptedException "
+                            + "while replicating fragments", e);
+                } catch (BKException e) {
+                    LOG.error("BKException while fencing the ledger"
+                            + " for rereplication of postponed ledgers", e);
+                } finally {
+                    try {
+                        if (lh != null) {
+                            lh.close();
+                        }
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        LOG.info("InterruptedException while closing "
+                                + "ledger", e);
+                    } catch (BKException e) {
+                        // Lets go ahead and release the lock. Catch actual
+                        // exception in normal replication flow and take
+                        // action.
+                        LOG.warn("BKException while closing ledger ", e);
+                    } finally {
+                        try {
+                            underreplicationManager
+                                    .releaseUnderreplicatedLedger(ledgerId);
+                        } catch (UnavailableException e) {
+                            shutdown();
+                            LOG.error("UnavailableException "
+                                    + "while replicating fragments", e);
+                        }
+                    }
+                }
+            }
+        };
+        pendingReplicationTimer.schedule(timerTask, gracePeriod);
+    }
+    
+    /**
      * Stop the replication worker service
      */
     public void shutdown() {
@@ -243,6 +291,7 @@ public class ReplicationWorker implement
             LOG.warn("Exception while closing the "
                     + "ZkLedgerUnderrepliationManager", e);
         }
+        this.pendingReplicationTimer.cancel();
         try {
             bkc.close();
         } catch (InterruptedException e) {

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java?rev=1384080&r1=1384079&r2=1384080&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java Wed Sep 12 18:34:11 2012
@@ -27,4 +27,10 @@ public class ClientUtil {
         return dm.computeDigestAndPackageForSending(entryId, lastAddConfirmed, length,
                                                     data, 0, data.length);
     }
+    
+    /** Returns that whether ledger is in open state */
+    public static boolean isLedgerOpen(LedgerHandle handle) {
+        return !handle.metadata.isClosed();
+    }
+
 }
\ No newline at end of file

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java?rev=1384080&r1=1384079&r2=1384080&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java Wed Sep 12 18:34:11 2012
@@ -29,6 +29,7 @@ import java.util.Map.Entry;
 
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.ClientUtil;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.LedgerHandleAdapter;
@@ -50,11 +51,13 @@ import org.slf4j.LoggerFactory;
  */
 public class TestReplicationWorker extends MultiLedgerManagerTestCase {
 
+    private static final byte[] TESTPASSWD = "testpasswd".getBytes();
     private static final Logger LOG = LoggerFactory
             .getLogger(TestReplicationWorker.class);
     private String basePath = "";
     private LedgerManagerFactory mFactory;
     private LedgerUnderreplicationManager underReplicationManager;
+    private static byte[] data = "TestReplicationWorker".getBytes();
 
     public TestReplicationWorker(String ledgerManagerFactory) {
         super(3);
@@ -96,9 +99,8 @@ public class TestReplicationWorker exten
      */
     @Test(timeout = 30000)
     public void testRWShouldReplicateFragmentsToTargetBookie() throws Exception {
-        byte[] data = "TestReplicationWorker".getBytes();
         LedgerHandle lh = bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32,
-                "testpasswd".getBytes());
+                TESTPASSWD);
 
         for (int i = 0; i < 10; i++) {
             lh.addEntry(data);
@@ -118,8 +120,7 @@ public class TestReplicationWorker exten
                 .getLocalHost().getHostAddress(), startNewBookie);
         LOG.info("New Bookie addr :" + newBkAddr);
 
-        ReplicationWorker rw = new ReplicationWorker(zkc, baseClientConf,
-                newBkAddr);
+        ReplicationWorker rw = new ReplicationWorker(zkc, baseConf, newBkAddr);
 
         rw.start();
         try {
@@ -147,9 +148,8 @@ public class TestReplicationWorker exten
     @Test(timeout = 60000)
     public void testRWShouldRetryUntilThereAreEnoughBksAvailableForReplication()
             throws Exception {
-        byte[] data = "TestReplicationWorker".getBytes();
         LedgerHandle lh = bkc.createLedger(1, 1, BookKeeper.DigestType.CRC32,
-                "testpasswd".getBytes());
+                TESTPASSWD);
 
         for (int i = 0; i < 10; i++) {
             lh.addEntry(data);
@@ -166,8 +166,7 @@ public class TestReplicationWorker exten
         LOG.info("New Bookie addr :" + newBkAddr);
 
         killAllBookies(lh, newBkAddr);
-        ReplicationWorker rw = new ReplicationWorker(zkc, baseClientConf,
-                newBkAddr);
+        ReplicationWorker rw = new ReplicationWorker(zkc, baseConf, newBkAddr);
 
         rw.start();
         try {
@@ -199,9 +198,8 @@ public class TestReplicationWorker exten
     @Test(timeout = 90000)
     public void test2RWsShouldCompeteForReplicationOf2FragmentsAndCompleteReplication()
             throws Exception {
-        byte[] data = "TestReplicationWorker".getBytes();
         LedgerHandle lh = bkc.createLedger(2, 2, BookKeeper.DigestType.CRC32,
-                "testpasswd".getBytes());
+                TESTPASSWD);
 
         for (int i = 0; i < 10; i++) {
             lh.addEntry(data);
@@ -218,8 +216,7 @@ public class TestReplicationWorker exten
         InetSocketAddress newBkAddr1 = new InetSocketAddress(InetAddress
                 .getLocalHost().getHostAddress(), startNewBookie1);
         LOG.info("New Bookie addr :" + newBkAddr1);
-        ReplicationWorker rw1 = new ReplicationWorker(zkc, baseClientConf,
-                newBkAddr1);
+        ReplicationWorker rw1 = new ReplicationWorker(zkc, baseConf, newBkAddr1);
 
         // Starte RW2
         int startNewBookie2 = startNewBookie();
@@ -228,7 +225,7 @@ public class TestReplicationWorker exten
         LOG.info("New Bookie addr :" + newBkAddr2);
         ZooKeeper zkc1 = ZkUtils.createConnectedZookeeperClient(
                 zkUtil.getZooKeeperConnectString(), 10000);
-        ReplicationWorker rw2 = new ReplicationWorker(zkc1, baseClientConf,
+        ReplicationWorker rw2 = new ReplicationWorker(zkc1, baseConf,
                 newBkAddr2);
         rw1.start();
         rw2.start();
@@ -264,9 +261,8 @@ public class TestReplicationWorker exten
     @Test(timeout = 3000)
     public void testRWShouldCleanTheLedgerFromUnderReplicationIfLedgerAlreadyDeleted()
             throws Exception {
-        byte[] data = "TestReplicationWorker".getBytes();
         LedgerHandle lh = bkc.createLedger(2, 2, BookKeeper.DigestType.CRC32,
-                "testpasswd".getBytes());
+                TESTPASSWD);
 
         for (int i = 0; i < 10; i++) {
             lh.addEntry(data);
@@ -281,8 +277,7 @@ public class TestReplicationWorker exten
         InetSocketAddress newBkAddr = new InetSocketAddress(InetAddress
                 .getLocalHost().getHostAddress(), startNewBookie);
         LOG.info("New Bookie addr :" + newBkAddr);
-        ReplicationWorker rw = new ReplicationWorker(zkc, baseClientConf,
-                newBkAddr);
+        ReplicationWorker rw = new ReplicationWorker(zkc, baseConf, newBkAddr);
         rw.start();
 
         try {
@@ -302,12 +297,9 @@ public class TestReplicationWorker exten
     @Test(timeout = 60000)
     public void testMultipleLedgerReplicationWithReplicationWorker()
             throws Exception {
-
-        byte[] data = "TestReplicationWorker".getBytes();
-
         // Ledger1
         LedgerHandle lh1 = bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32,
-                "testpasswd".getBytes());
+                TESTPASSWD);
 
         for (int i = 0; i < 10; i++) {
             lh1.addEntry(data);
@@ -319,7 +311,7 @@ public class TestReplicationWorker exten
 
         // Ledger2
         LedgerHandle lh2 = bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32,
-                "testpasswd".getBytes());
+                TESTPASSWD);
 
         for (int i = 0; i < 10; i++) {
             lh2.addEntry(data);
@@ -342,8 +334,7 @@ public class TestReplicationWorker exten
                 .getLocalHost().getHostAddress(), startNewBookie);
         LOG.info("New Bookie addr :" + newBkAddr);
 
-        ReplicationWorker rw = new ReplicationWorker(zkc, baseClientConf,
-                newBkAddr);
+        ReplicationWorker rw = new ReplicationWorker(zkc, baseConf, newBkAddr);
 
         rw.start();
         try {
@@ -372,6 +363,125 @@ public class TestReplicationWorker exten
         }
 
     }
+    
+    /**
+     * Tests that ReplicationWorker should fence the ledger and release ledger
+     * lock after timeout. Then replication should happen normally.
+     */
+    @Test(timeout = 60000)
+    public void testRWShouldReplicateTheLedgersAfterTimeoutIfLastFragmentIsUR()
+            throws Exception {
+        LedgerHandle lh = bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32,
+                TESTPASSWD);
+
+        for (int i = 0; i < 10; i++) {
+            lh.addEntry(data);
+        }
+        InetSocketAddress replicaToKill = LedgerHandleAdapter
+                .getLedgerMetadata(lh).getEnsembles().get(0L).get(0);
+
+        LOG.info("Killing Bookie", replicaToKill);
+        killBookie(replicaToKill);
+
+        int startNewBookie = startNewBookie();
+
+        InetSocketAddress newBkAddr = new InetSocketAddress(InetAddress
+                .getLocalHost().getHostAddress(), startNewBookie);
+        LOG.info("New Bookie addr :" + newBkAddr);
+
+        ReplicationWorker rw = new ReplicationWorker(zkc, baseConf, newBkAddr);
+
+        LedgerManagerFactory mFactory = LedgerManagerFactory
+                .newLedgerManagerFactory(baseClientConf, zkc);
+        LedgerUnderreplicationManager underReplicationManager = mFactory
+                .newLedgerUnderreplicationManager();
+        rw.start();
+        try {
+
+            underReplicationManager.markLedgerUnderreplicated(lh.getId(),
+                    replicaToKill.toString());
+            while (isLedgerInUnderReplication(lh.getId(),
+                    basePath)) {
+                Thread.sleep(100);
+            }
+            killAllBookies(lh, newBkAddr);
+            // Should be able to read the entries from 0-9
+            verifyRecoveredLedgers(lh, 0, 9);
+            lh = bkc.openLedgerNoRecovery(lh.getId(),
+                    BookKeeper.DigestType.CRC32, TESTPASSWD);
+            assertFalse("Ledger must have been closed by RW", ClientUtil
+                    .isLedgerOpen(lh));
+        } finally {
+            rw.shutdown();
+            underReplicationManager.close();
+        }
+
+    }
+
+    /**
+     * Tests that ReplicationWorker should not have identified for postponing
+     * the replication if ledger is in open state and lastFragment is not in
+     * underReplication state. Note that RW should not fence such ledgers.
+     */
+    @Test(timeout = 30000)
+    public void testRWShouldReplicateTheLedgersAfterTimeoutIfLastFragmentIsNotUR()
+            throws Exception {
+        LedgerHandle lh = bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32,
+                TESTPASSWD);
+
+        for (int i = 0; i < 10; i++) {
+            lh.addEntry(data);
+        }
+        InetSocketAddress replicaToKill = LedgerHandleAdapter
+                .getLedgerMetadata(lh).getEnsembles().get(0L).get(0);
+
+        LOG.info("Killing Bookie", replicaToKill);
+        killBookie(replicaToKill);
+
+        int startNewBookie = startNewBookie();
+
+        // Reform ensemble...Making sure that last fragment is not in
+        // under-replication
+        for (int i = 0; i < 10; i++) {
+            lh.addEntry(data);
+        }
+
+        InetSocketAddress newBkAddr = new InetSocketAddress(InetAddress
+                .getLocalHost().getHostAddress(), startNewBookie);
+        LOG.info("New Bookie addr :" + newBkAddr);
+
+        ReplicationWorker rw = new ReplicationWorker(zkc, baseConf, newBkAddr);
+
+        LedgerManagerFactory mFactory = LedgerManagerFactory
+                .newLedgerManagerFactory(baseClientConf, zkc);
+        LedgerUnderreplicationManager underReplicationManager = mFactory
+                .newLedgerUnderreplicationManager();
+
+        rw.start();
+        try {
+
+            underReplicationManager.markLedgerUnderreplicated(lh.getId(),
+                    replicaToKill.toString());
+            while (isLedgerInUnderReplication(lh.getId(), basePath)) {
+                Thread.sleep(100);
+            }
+
+            killAllBookies(lh, newBkAddr);
+
+            // Should be able to read the entries from 0-9
+            verifyRecoveredLedgers(lh, 0, 9);
+            lh = bkc.openLedgerNoRecovery(lh.getId(),
+                    BookKeeper.DigestType.CRC32, TESTPASSWD);
+
+            // Ledger should be still in open state
+            assertTrue("Ledger must have been closed by RW", ClientUtil
+                    .isLedgerOpen(lh));
+        } finally {
+            rw.shutdown();
+            underReplicationManager.close();
+        }
+
+    }
 
     private void killAllBookies(LedgerHandle lh, InetSocketAddress excludeBK)
             throws InterruptedException {
@@ -411,7 +521,7 @@ public class TestReplicationWorker exten
     private void verifyRecoveredLedgers(LedgerHandle lh, long startEntryId,
             long endEntryId) throws BKException, InterruptedException {
         LedgerHandle lhs = bkc.openLedgerNoRecovery(lh.getId(),
-                BookKeeper.DigestType.CRC32, "testpasswd".getBytes());
+                BookKeeper.DigestType.CRC32, TESTPASSWD);
         Enumeration<LedgerEntry> entries = lhs.readEntries(startEntryId,
                 endEntryId);
         assertTrue("Should have the elements", entries.hasMoreElements());