You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/09/12 20:34:12 UTC
svn commit: r1384080 - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/
bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/
bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ bookk...
Author: ivank
Date: Wed Sep 12 18:34:11 2012
New Revision: 1384080
URL: http://svn.apache.org/viewvc?rev=1384080&view=rev
Log:
BOOKKEEPER-325: Delay the replication of a ledger if RW found that its last fragment is in underReplication. (umamahesh via ivank)
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1384080&r1=1384079&r2=1384080&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Wed Sep 12 18:34:11 2012
@@ -134,6 +134,8 @@ Trunk (unreleased changes)
BOOKKEEPER-208: Separate write quorum from ack quorum (ivank)
+ BOOKKEEPER-325: Delay the replication of a ledger if RW found that its last fragment is in underReplication. (umamahesh via ivank)
+
hedwig-server:
BOOKKEEPER-250: Need a ledger manager like interface to manage metadata operations in Hedwig (sijie via ivank)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java?rev=1384080&r1=1384079&r2=1384080&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java Wed Sep 12 18:34:11 2012
@@ -63,7 +63,7 @@ public class LedgerFragment {
* last fragment, then no one will modify this fragment.</li>
* </ol>
*/
- boolean isClosed() {
+ public boolean isClosed() {
return isLedgerClosed;
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java?rev=1384080&r1=1384079&r2=1384080&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java Wed Sep 12 18:34:11 2012
@@ -57,6 +57,7 @@ public class ServerConfiguration extends
protected final static String ZK_SERVERS = "zkServers";
// Statistics Parameters
protected final static String ENABLE_STATISTICS = "enableStatistics";
+ protected final static String OPEN_LEDGER_REREPLICATION_GRACE_PERIOD = "openLedgerRereplicationGracePeriod";
/**
* Construct a default configuration object
@@ -546,4 +547,30 @@ public class ServerConfiguration extends
setProperty(MAJOR_COMPACTION_INTERVAL, interval);
return this;
}
+
+ /**
+ * Set the grace period which the rereplication worker will wait before
+ * fencing and rereplicating a ledger fragment which is still being written
+ * to, on bookie failure.
+ *
+ * The grace period allows the writer to detect the bookie failure, and and
+ * start writing to another ledger fragment. If the writer writes nothing
+ * during the grace period, the rereplication worker assumes that it has
+ * crashed and therefore fences the ledger, preventing any further writes to
+ * that ledger.
+ *
+ * @see LedgerHandle#openLedger
+ */
+ public void setOpenLedgerRereplicationGracePeriod(String waitTime) {
+ setProperty(OPEN_LEDGER_REREPLICATION_GRACE_PERIOD, waitTime);
+ }
+
+ /**
+ * Get the grace period which the rereplication worker to wait before
+ * fencing and rereplicating a ledger fragment which is still being written
+ * to, on bookie failure.
+ */
+ public long getOpenLedgerRereplicationGracePeriod() {
+ return getLong(OPEN_LEDGER_REREPLICATION_GRACE_PERIOD, 30000);
+ }
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java?rev=1384080&r1=1384079&r2=1384080&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java Wed Sep 12 18:34:11 2012
@@ -23,6 +23,8 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.CountDownLatch;
import org.apache.bookkeeper.client.BKException;
@@ -36,6 +38,7 @@ import org.apache.bookkeeper.client.BKEx
import org.apache.bookkeeper.client.BKException.BKReadException;
import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
@@ -53,15 +56,17 @@ import org.slf4j.LoggerFactory;
public class ReplicationWorker implements Runnable {
private static Logger LOG = LoggerFactory
.getLogger(ReplicationWorker.class);
- private LedgerUnderreplicationManager underreplicationManager;
+ final private LedgerUnderreplicationManager underreplicationManager;
private AbstractConfiguration conf;
private ZooKeeper zkc;
private volatile boolean workerRunning = false;
- private BookKeeperAdmin admin;
+ final private BookKeeperAdmin admin;
private LedgerChecker ledgerChecker;
private InetSocketAddress targetBookie;
private BookKeeper bkc;
private Thread workerThread;
+ private long openLedgerRereplicationGracePeriod;
+ private Timer pendingReplicationTimer;
/**
* Replication worker for replicating the ledger fragments from
@@ -77,7 +82,7 @@ public class ReplicationWorker implement
* local Bookie address.
*/
public ReplicationWorker(final ZooKeeper zkc,
- final AbstractConfiguration conf, InetSocketAddress targetBKAddr)
+ final ServerConfiguration conf, InetSocketAddress targetBKAddr)
throws CompatibilityException, KeeperException,
InterruptedException, IOException {
this.zkc = zkc;
@@ -91,6 +96,9 @@ public class ReplicationWorker implement
this.admin = new BookKeeperAdmin(bkc);
this.ledgerChecker = new LedgerChecker(bkc);
this.workerThread = new Thread(this);
+ this.openLedgerRereplicationGracePeriod = conf
+ .getOpenLedgerRereplicationGracePeriod();
+ this.pendingReplicationTimer = new Timer("PendingReplicationTimer");
}
/** Start the replication worker */
@@ -103,54 +111,7 @@ public class ReplicationWorker implement
workerRunning = true;
while (workerRunning) {
try {
- long ledgerIdToReplicate = underreplicationManager
- .getLedgerToRereplicate();
- LOG.info("Going to replicate the fragments of the ledger: "
- + ledgerIdToReplicate);
- LedgerHandle lh;
- try {
- lh = admin.openLedgerNoRecovery(ledgerIdToReplicate);
- } catch (BKNoSuchLedgerExistsException e) {
- // Ledger might have been deleted by user
- LOG.info("BKNoSuchLedgerExistsException while opening "
- + "ledger for replication. Other clients "
- + "might have deleted the ledger. "
- + "So, no harm to continue");
- underreplicationManager
- .markLedgerReplicated(ledgerIdToReplicate);
- continue;
- } catch (BKReadException e) {
- LOG.info("BKReadException while"
- + " opening ledger for replication."
- + " Enough Bookies might not have available"
- + "So, no harm to continue");
- underreplicationManager
- .releaseUnderreplicatedLedger(ledgerIdToReplicate);
- continue;
- } catch (BKBookieHandleNotAvailableException e) {
- LOG.info("BKBookieHandleNotAvailableException while"
- + " opening ledger for replication."
- + " Enough Bookies might not have available"
- + "So, no harm to continue");
- underreplicationManager
- .releaseUnderreplicatedLedger(ledgerIdToReplicate);
- continue;
- }
-
- boolean isAllFragmentsReplicated = doReplicateFragments(
- ledgerIdToReplicate, lh);
- if (!isAllFragmentsReplicated) {
- // Releasing the underReplication ledger lock and compete
- // for the replication again for the pending fragments
- underreplicationManager
- .releaseUnderreplicatedLedger(ledgerIdToReplicate);
- continue;
- }
-
- LOG.info("Ledger replicated successfully. ledger id is: "
- + ledgerIdToReplicate);
- underreplicationManager
- .markLedgerReplicated(ledgerIdToReplicate);
+ rereplicate();
} catch (InterruptedException e) {
shutdown();
Thread.currentThread().interrupt();
@@ -173,30 +134,55 @@ public class ReplicationWorker implement
/**
* Replicates the under replicated fragments from failed bookie ledger to
* targetBookie
- *
- * @return - false if the re-replication fails for any fragment. Also
- * returns false if fragment ensemble contains the target bookie
- * (since target bookie already present in the current fragment
- * ensemble, it will skip replication for that particular
- * fragments). Returns true if all fragments replicated
- * successfully.
- * @throws BKException
*/
- private boolean doReplicateFragments(long ledgerIdToReplicate,
- LedgerHandle lh) throws InterruptedException, BKException {
- CheckerCallback checkerCb = new CheckerCallback();
- ledgerChecker.checkLedger(lh, checkerCb);
- Set<LedgerFragment> fragments = checkerCb.waitAndGetResult();
+ private void rereplicate() throws InterruptedException, BKException,
+ UnavailableException {
+ long ledgerIdToReplicate = underreplicationManager
+ .getLedgerToRereplicate();
+ LOG.info("Going to replicate the fragments of the ledger: "
+ + ledgerIdToReplicate);
+ LedgerHandle lh;
+ try {
+ lh = admin.openLedgerNoRecovery(ledgerIdToReplicate);
+ } catch (BKNoSuchLedgerExistsException e) {
+ // Ledger might have been deleted by user
+ LOG.info("BKNoSuchLedgerExistsException while opening "
+ + "ledger for replication. Other clients "
+ + "might have deleted the ledger. "
+ + "So, no harm to continue");
+ underreplicationManager.markLedgerReplicated(ledgerIdToReplicate);
+ return;
+ } catch (BKReadException e) {
+ LOG.info("BKReadException while"
+ + " opening ledger for replication."
+ + " Enough Bookies might not have available"
+ + "So, no harm to continue");
+ underreplicationManager
+ .releaseUnderreplicatedLedger(ledgerIdToReplicate);
+ return;
+ } catch (BKBookieHandleNotAvailableException e) {
+ LOG.info("BKBookieHandleNotAvailableException while"
+ + " opening ledger for replication."
+ + " Enough Bookies might not have available"
+ + "So, no harm to continue");
+ underreplicationManager
+ .releaseUnderreplicatedLedger(ledgerIdToReplicate);
+ return;
+ }
+ Set<LedgerFragment> fragments = getUnderreplicatedFragments(lh);
LOG.info("Founds fragments " + fragments
+ " for replication from ledger: " + ledgerIdToReplicate);
- boolean isTargetBookieExistsInFragmentEnsemble = false;
- boolean isAllFragmentsReplicated = true;
+
+ boolean foundOpenFragments = false;
for (LedgerFragment ledgerFragment : fragments) {
- if (isTargetBookieExistsInFragmentEnsemble(lh, ledgerFragment)) {
+ if (!ledgerFragment.isClosed()) {
+ foundOpenFragments = true;
+ continue;
+ } else if (isTargetBookieExistsInFragmentEnsemble(lh,
+ ledgerFragment)) {
LOG.info("Target Bookie[" + targetBookie
+ "] found in the fragment ensemble:"
+ ledgerFragment.getEnsemble());
- isTargetBookieExistsInFragmentEnsemble = true;
continue;
}
try {
@@ -204,35 +190,97 @@ public class ReplicationWorker implement
} catch (BKException.BKBookieHandleNotAvailableException e) {
LOG.warn("BKBookieHandleNotAvailableException "
+ "while replicating the fragment", e);
- isAllFragmentsReplicated = false;
} catch (BKException.BKLedgerRecoveryException e) {
LOG.warn("BKLedgerRecoveryException "
+ "while replicating the fragment", e);
- isAllFragmentsReplicated = false;
}
+ }
+ if (foundOpenFragments) {
+ deferLedgerLockRelease(ledgerIdToReplicate);
+ return;
}
- if (isTargetBookieExistsInFragmentEnsemble) {
- LOG.info("Releasing the lock, as target Bookie found"
- + " in the fragments ensemble.");
- return false;
- }
-
- // There might be some connectivity issues while replicating,
- // so, still I am eligible to replicate it, lets retry.
- if (!isAllFragmentsReplicated) {
- LOG.info("Could not replicate all fragments."
- + "So, Releasing the lock. Let's compete "
- + "for the replication again");
- return false;
+
+ fragments = getUnderreplicatedFragments(lh);
+ if (fragments.size() == 0) {
+ LOG.info("Ledger replicated successfully. ledger id is: "
+ + ledgerIdToReplicate);
+ underreplicationManager.markLedgerReplicated(ledgerIdToReplicate);
+ } else {
+ // Releasing the underReplication ledger lock and compete
+ // for the replication again for the pending fragments
+ underreplicationManager
+ .releaseUnderreplicatedLedger(ledgerIdToReplicate);
}
+ }
- // Re-replication success
- return true;
-
+ /** Gets the under replicated fragments */
+ private Set<LedgerFragment> getUnderreplicatedFragments(LedgerHandle lh)
+ throws InterruptedException {
+ CheckerCallback checkerCb = new CheckerCallback();
+ ledgerChecker.checkLedger(lh, checkerCb);
+ Set<LedgerFragment> fragments = checkerCb.waitAndGetResult();
+ return fragments;
}
/**
+ * Schedules a timer task for releasing the lock which will be scheduled
+ * after open ledger fragment replication time. Ledger will be fenced if it
+ * is still in open state when timer task fired
+ */
+ private void deferLedgerLockRelease(final long ledgerId) {
+ long gracePeriod = this.openLedgerRereplicationGracePeriod;
+ TimerTask timerTask = new TimerTask() {
+ @Override
+ public void run() {
+ LedgerHandle lh = null;
+ try {
+ lh = admin.openLedgerNoRecovery(ledgerId);
+ Set<LedgerFragment> fragments = getUnderreplicatedFragments(lh);
+ for (LedgerFragment fragment : fragments) {
+ if (!fragment.isClosed()) {
+ lh = admin.openLedger(ledgerId);
+ break;
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.info("InterruptedException "
+ + "while replicating fragments", e);
+ } catch (BKException e) {
+ LOG.error("BKException while fencing the ledger"
+ + " for rereplication of postponed ledgers", e);
+ } finally {
+ try {
+ if (lh != null) {
+ lh.close();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.info("InterruptedException while closing "
+ + "ledger", e);
+ } catch (BKException e) {
+ // Lets go ahead and release the lock. Catch actual
+ // exception in normal replication flow and take
+ // action.
+ LOG.warn("BKException while closing ledger ", e);
+ } finally {
+ try {
+ underreplicationManager
+ .releaseUnderreplicatedLedger(ledgerId);
+ } catch (UnavailableException e) {
+ shutdown();
+ LOG.error("UnavailableException "
+ + "while replicating fragments", e);
+ }
+ }
+ }
+ }
+ };
+ pendingReplicationTimer.schedule(timerTask, gracePeriod);
+ }
+
+ /**
* Stop the replication worker service
*/
public void shutdown() {
@@ -243,6 +291,7 @@ public class ReplicationWorker implement
LOG.warn("Exception while closing the "
+ "ZkLedgerUnderrepliationManager", e);
}
+ this.pendingReplicationTimer.cancel();
try {
bkc.close();
} catch (InterruptedException e) {
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java?rev=1384080&r1=1384079&r2=1384080&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java Wed Sep 12 18:34:11 2012
@@ -27,4 +27,10 @@ public class ClientUtil {
return dm.computeDigestAndPackageForSending(entryId, lastAddConfirmed, length,
data, 0, data.length);
}
+
+ /** Returns that whether ledger is in open state */
+ public static boolean isLedgerOpen(LedgerHandle handle) {
+ return !handle.metadata.isClosed();
+ }
+
}
\ No newline at end of file
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java?rev=1384080&r1=1384079&r2=1384080&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java Wed Sep 12 18:34:11 2012
@@ -29,6 +29,7 @@ import java.util.Map.Entry;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.ClientUtil;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.LedgerHandleAdapter;
@@ -50,11 +51,13 @@ import org.slf4j.LoggerFactory;
*/
public class TestReplicationWorker extends MultiLedgerManagerTestCase {
+ private static final byte[] TESTPASSWD = "testpasswd".getBytes();
private static final Logger LOG = LoggerFactory
.getLogger(TestReplicationWorker.class);
private String basePath = "";
private LedgerManagerFactory mFactory;
private LedgerUnderreplicationManager underReplicationManager;
+ private static byte[] data = "TestReplicationWorker".getBytes();
public TestReplicationWorker(String ledgerManagerFactory) {
super(3);
@@ -96,9 +99,8 @@ public class TestReplicationWorker exten
*/
@Test(timeout = 30000)
public void testRWShouldReplicateFragmentsToTargetBookie() throws Exception {
- byte[] data = "TestReplicationWorker".getBytes();
LedgerHandle lh = bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32,
- "testpasswd".getBytes());
+ TESTPASSWD);
for (int i = 0; i < 10; i++) {
lh.addEntry(data);
@@ -118,8 +120,7 @@ public class TestReplicationWorker exten
.getLocalHost().getHostAddress(), startNewBookie);
LOG.info("New Bookie addr :" + newBkAddr);
- ReplicationWorker rw = new ReplicationWorker(zkc, baseClientConf,
- newBkAddr);
+ ReplicationWorker rw = new ReplicationWorker(zkc, baseConf, newBkAddr);
rw.start();
try {
@@ -147,9 +148,8 @@ public class TestReplicationWorker exten
@Test(timeout = 60000)
public void testRWShouldRetryUntilThereAreEnoughBksAvailableForReplication()
throws Exception {
- byte[] data = "TestReplicationWorker".getBytes();
LedgerHandle lh = bkc.createLedger(1, 1, BookKeeper.DigestType.CRC32,
- "testpasswd".getBytes());
+ TESTPASSWD);
for (int i = 0; i < 10; i++) {
lh.addEntry(data);
@@ -166,8 +166,7 @@ public class TestReplicationWorker exten
LOG.info("New Bookie addr :" + newBkAddr);
killAllBookies(lh, newBkAddr);
- ReplicationWorker rw = new ReplicationWorker(zkc, baseClientConf,
- newBkAddr);
+ ReplicationWorker rw = new ReplicationWorker(zkc, baseConf, newBkAddr);
rw.start();
try {
@@ -199,9 +198,8 @@ public class TestReplicationWorker exten
@Test(timeout = 90000)
public void test2RWsShouldCompeteForReplicationOf2FragmentsAndCompleteReplication()
throws Exception {
- byte[] data = "TestReplicationWorker".getBytes();
LedgerHandle lh = bkc.createLedger(2, 2, BookKeeper.DigestType.CRC32,
- "testpasswd".getBytes());
+ TESTPASSWD);
for (int i = 0; i < 10; i++) {
lh.addEntry(data);
@@ -218,8 +216,7 @@ public class TestReplicationWorker exten
InetSocketAddress newBkAddr1 = new InetSocketAddress(InetAddress
.getLocalHost().getHostAddress(), startNewBookie1);
LOG.info("New Bookie addr :" + newBkAddr1);
- ReplicationWorker rw1 = new ReplicationWorker(zkc, baseClientConf,
- newBkAddr1);
+ ReplicationWorker rw1 = new ReplicationWorker(zkc, baseConf, newBkAddr1);
// Starte RW2
int startNewBookie2 = startNewBookie();
@@ -228,7 +225,7 @@ public class TestReplicationWorker exten
LOG.info("New Bookie addr :" + newBkAddr2);
ZooKeeper zkc1 = ZkUtils.createConnectedZookeeperClient(
zkUtil.getZooKeeperConnectString(), 10000);
- ReplicationWorker rw2 = new ReplicationWorker(zkc1, baseClientConf,
+ ReplicationWorker rw2 = new ReplicationWorker(zkc1, baseConf,
newBkAddr2);
rw1.start();
rw2.start();
@@ -264,9 +261,8 @@ public class TestReplicationWorker exten
@Test(timeout = 3000)
public void testRWShouldCleanTheLedgerFromUnderReplicationIfLedgerAlreadyDeleted()
throws Exception {
- byte[] data = "TestReplicationWorker".getBytes();
LedgerHandle lh = bkc.createLedger(2, 2, BookKeeper.DigestType.CRC32,
- "testpasswd".getBytes());
+ TESTPASSWD);
for (int i = 0; i < 10; i++) {
lh.addEntry(data);
@@ -281,8 +277,7 @@ public class TestReplicationWorker exten
InetSocketAddress newBkAddr = new InetSocketAddress(InetAddress
.getLocalHost().getHostAddress(), startNewBookie);
LOG.info("New Bookie addr :" + newBkAddr);
- ReplicationWorker rw = new ReplicationWorker(zkc, baseClientConf,
- newBkAddr);
+ ReplicationWorker rw = new ReplicationWorker(zkc, baseConf, newBkAddr);
rw.start();
try {
@@ -302,12 +297,9 @@ public class TestReplicationWorker exten
@Test(timeout = 60000)
public void testMultipleLedgerReplicationWithReplicationWorker()
throws Exception {
-
- byte[] data = "TestReplicationWorker".getBytes();
-
// Ledger1
LedgerHandle lh1 = bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32,
- "testpasswd".getBytes());
+ TESTPASSWD);
for (int i = 0; i < 10; i++) {
lh1.addEntry(data);
@@ -319,7 +311,7 @@ public class TestReplicationWorker exten
// Ledger2
LedgerHandle lh2 = bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32,
- "testpasswd".getBytes());
+ TESTPASSWD);
for (int i = 0; i < 10; i++) {
lh2.addEntry(data);
@@ -342,8 +334,7 @@ public class TestReplicationWorker exten
.getLocalHost().getHostAddress(), startNewBookie);
LOG.info("New Bookie addr :" + newBkAddr);
- ReplicationWorker rw = new ReplicationWorker(zkc, baseClientConf,
- newBkAddr);
+ ReplicationWorker rw = new ReplicationWorker(zkc, baseConf, newBkAddr);
rw.start();
try {
@@ -372,6 +363,125 @@ public class TestReplicationWorker exten
}
}
+
+ /**
+ * Tests that ReplicationWorker should fence the ledger and release ledger
+ * lock after timeout. Then replication should happen normally.
+ */
+ @Test(timeout = 60000)
+ public void testRWShouldReplicateTheLedgersAfterTimeoutIfLastFragmentIsUR()
+ throws Exception {
+ LedgerHandle lh = bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32,
+ TESTPASSWD);
+
+ for (int i = 0; i < 10; i++) {
+ lh.addEntry(data);
+ }
+ InetSocketAddress replicaToKill = LedgerHandleAdapter
+ .getLedgerMetadata(lh).getEnsembles().get(0L).get(0);
+
+ LOG.info("Killing Bookie", replicaToKill);
+ killBookie(replicaToKill);
+
+ int startNewBookie = startNewBookie();
+
+ InetSocketAddress newBkAddr = new InetSocketAddress(InetAddress
+ .getLocalHost().getHostAddress(), startNewBookie);
+ LOG.info("New Bookie addr :" + newBkAddr);
+
+ ReplicationWorker rw = new ReplicationWorker(zkc, baseConf, newBkAddr);
+
+ LedgerManagerFactory mFactory = LedgerManagerFactory
+ .newLedgerManagerFactory(baseClientConf, zkc);
+ LedgerUnderreplicationManager underReplicationManager = mFactory
+ .newLedgerUnderreplicationManager();
+ rw.start();
+ try {
+
+ underReplicationManager.markLedgerUnderreplicated(lh.getId(),
+ replicaToKill.toString());
+ while (isLedgerInUnderReplication(lh.getId(),
+ basePath)) {
+ Thread.sleep(100);
+ }
+ killAllBookies(lh, newBkAddr);
+ // Should be able to read the entries from 0-9
+ verifyRecoveredLedgers(lh, 0, 9);
+ lh = bkc.openLedgerNoRecovery(lh.getId(),
+ BookKeeper.DigestType.CRC32, TESTPASSWD);
+ assertFalse("Ledger must have been closed by RW", ClientUtil
+ .isLedgerOpen(lh));
+ } finally {
+ rw.shutdown();
+ underReplicationManager.close();
+ }
+
+ }
+
+ /**
+ * Tests that ReplicationWorker should not have identified for postponing
+ * the replication if ledger is in open state and lastFragment is not in
+ * underReplication state. Note that RW should not fence such ledgers.
+ */
+ @Test(timeout = 30000)
+ public void testRWShouldReplicateTheLedgersAfterTimeoutIfLastFragmentIsNotUR()
+ throws Exception {
+ LedgerHandle lh = bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32,
+ TESTPASSWD);
+
+ for (int i = 0; i < 10; i++) {
+ lh.addEntry(data);
+ }
+ InetSocketAddress replicaToKill = LedgerHandleAdapter
+ .getLedgerMetadata(lh).getEnsembles().get(0L).get(0);
+
+ LOG.info("Killing Bookie", replicaToKill);
+ killBookie(replicaToKill);
+
+ int startNewBookie = startNewBookie();
+
+ // Reform ensemble...Making sure that last fragment is not in
+ // under-replication
+ for (int i = 0; i < 10; i++) {
+ lh.addEntry(data);
+ }
+
+ InetSocketAddress newBkAddr = new InetSocketAddress(InetAddress
+ .getLocalHost().getHostAddress(), startNewBookie);
+ LOG.info("New Bookie addr :" + newBkAddr);
+
+ ReplicationWorker rw = new ReplicationWorker(zkc, baseConf, newBkAddr);
+
+ LedgerManagerFactory mFactory = LedgerManagerFactory
+ .newLedgerManagerFactory(baseClientConf, zkc);
+ LedgerUnderreplicationManager underReplicationManager = mFactory
+ .newLedgerUnderreplicationManager();
+
+ rw.start();
+ try {
+
+ underReplicationManager.markLedgerUnderreplicated(lh.getId(),
+ replicaToKill.toString());
+ while (isLedgerInUnderReplication(lh.getId(), basePath)) {
+ Thread.sleep(100);
+ }
+
+ killAllBookies(lh, newBkAddr);
+
+ // Should be able to read the entries from 0-9
+ verifyRecoveredLedgers(lh, 0, 9);
+ lh = bkc.openLedgerNoRecovery(lh.getId(),
+ BookKeeper.DigestType.CRC32, TESTPASSWD);
+
+ // Ledger should be still in open state
+ assertTrue("Ledger must have been closed by RW", ClientUtil
+ .isLedgerOpen(lh));
+ } finally {
+ rw.shutdown();
+ underReplicationManager.close();
+ }
+
+ }
private void killAllBookies(LedgerHandle lh, InetSocketAddress excludeBK)
throws InterruptedException {
@@ -411,7 +521,7 @@ public class TestReplicationWorker exten
private void verifyRecoveredLedgers(LedgerHandle lh, long startEntryId,
long endEntryId) throws BKException, InterruptedException {
LedgerHandle lhs = bkc.openLedgerNoRecovery(lh.getId(),
- BookKeeper.DigestType.CRC32, "testpasswd".getBytes());
+ BookKeeper.DigestType.CRC32, TESTPASSWD);
Enumeration<LedgerEntry> entries = lhs.readEntries(startEntryId,
endEntryId);
assertTrue("Should have the elements", entries.hasMoreElements());