You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/10/05 16:24:43 UTC
svn commit: r1394542 - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/
bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/
bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ bookk...
Author: ivank
Date: Fri Oct 5 14:24:42 2012
New Revision: 1394542
URL: http://svn.apache.org/viewvc?rev=1394542&view=rev
Log:
BOOKKEEPER-315: Ledger entries should be replicated sequentially instead of parallel. (umamahesh via ivank)
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1394542&r1=1394541&r2=1394542&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Fri Oct 5 14:24:42 2012
@@ -150,6 +150,8 @@ Trunk (unreleased changes)
BOOKKEEPER-319: Manage auditing and replication processes (Vinay via ivank)
+ BOOKKEEPER-315: Ledger entries should be replicated sequentially instead of parallel. (umamahesh via ivank)
+
hedwig-server:
BOOKKEEPER-250: Need a ledger manager like interface to manage metadata operations in Hedwig (sijie via ivank)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java?rev=1394542&r1=1394541&r2=1394542&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java Fri Oct 5 14:24:42 2012
@@ -44,9 +44,9 @@ import org.apache.zookeeper.AsyncCallbac
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZKUtil;
-import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.ZooDefs.Ids;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -633,7 +633,7 @@ public class BookKeeperAdmin {
*/
private void asyncRecoverLedgerFragment(final LedgerHandle lh,
final LedgerFragment ledgerFragment,
- final LedgerFragmentReplicator.SingleFragmentCallback ledgerFragmentMcb,
+ final AsyncCallback.VoidCallback ledgerFragmentMcb,
final InetSocketAddress newBookie) throws InterruptedException {
lfr.replicate(lh, ledgerFragment, ledgerFragmentMcb, newBookie);
}
@@ -650,15 +650,15 @@ public class BookKeeperAdmin {
*/
public void replicateLedgerFragment(LedgerHandle lh,
final LedgerFragment ledgerFragment,
- InetSocketAddress targetBookieAddress) throws InterruptedException,
- BKException {
- final SyncCounter syncCounter = new SyncCounter();
+ final InetSocketAddress targetBookieAddress)
+ throws InterruptedException, BKException {
+ SyncCounter syncCounter = new SyncCounter();
ResultCallBack resultCallBack = new ResultCallBack(syncCounter);
- SingleFragmentCallback sfcb = new SingleFragmentCallback(
- resultCallBack, lh, ledgerFragment.getFirstStoredEntryId(),
- ledgerFragment.getAddress(), targetBookieAddress);
+ SingleFragmentCallback cb = new SingleFragmentCallback(resultCallBack,
+ lh, ledgerFragment.getFirstEntryId(), ledgerFragment
+ .getAddress(), targetBookieAddress);
syncCounter.inc();
- lfr.replicate(lh, ledgerFragment, sfcb, targetBookieAddress);
+ asyncRecoverLedgerFragment(lh, ledgerFragment, cb, targetBookieAddress);
syncCounter.block(0);
if (syncCounter.getrc() != BKException.Code.OK) {
throw BKException.create(syncCounter.getrc());
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java?rev=1394542&r1=1394541&r2=1394542&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java Fri Oct 5 14:24:42 2012
@@ -85,6 +85,13 @@ public class LedgerFragment {
public InetSocketAddress getAddress() {
return ensemble.get(bookieIndex);
}
+
+ /**
+ * Gets the failedBookie index
+ */
+ public int getBookiesIndex() {
+ return bookieIndex;
+ }
/**
* Gets the first stored entry id of the fragment in failed bookie.
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java?rev=1394542&r1=1394541&r2=1394542&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java Fri Oct 5 14:24:42 2012
@@ -22,8 +22,11 @@ package org.apache.bookkeeper.client;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.Set;
import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
import org.apache.bookkeeper.proto.BookieProtocol;
@@ -54,25 +57,10 @@ public class LedgerFragmentReplicator {
private static Logger LOG = LoggerFactory
.getLogger(LedgerFragmentReplicator.class);
- /**
- * This method replicate a ledger fragment which is a contiguous portion of
- * a ledger that was stored in an ensemble that included the failed bookie.
- *
- * @param lh
- * LedgerHandle for the ledger
- * @param lf
- * LedgerFragment to replicate
- * @param ledgerFragmentMcb
- * MultiCallback to invoke once we've recovered the current
- * ledger fragment.
- * @param newBookie
- * New bookie we want to use to recover and replicate the ledger
- * entries that were stored on the failed bookie.
- */
- void replicate(final LedgerHandle lh, final LedgerFragment lf,
- final SingleFragmentCallback ledgerFragmentMcb,
- final InetSocketAddress newBookie)
- throws InterruptedException {
+ private void replicateFragmentInternal(final LedgerHandle lh,
+ final LedgerFragment lf,
+ final AsyncCallback.VoidCallback ledgerFragmentMcb,
+ final InetSocketAddress newBookie) throws InterruptedException {
if (!lf.isClosed()) {
LOG.error("Trying to replicate an unclosed fragment;"
+ " This is not safe {}", lf);
@@ -124,6 +112,114 @@ public class LedgerFragmentReplicator {
}
/**
+ * This method replicate a ledger fragment which is a contiguous portion of
+ * a ledger that was stored in an ensemble that included the failed bookie.
+ * It will Splits the fragment into multiple sub fragments by keeping the
+ * max entries up to the configured value of rereplicationEntryBatchSize and
+ * then it re-replicates that batched entry fragments one by one. After
+ * re-replication of all batched entry fragments, it will update the
+ * ensemble info with new Bookie once
+ *
+ * @param lh
+ * LedgerHandle for the ledger
+ * @param lf
+ * LedgerFragment to replicate
+ * @param ledgerFragmentMcb
+ * MultiCallback to invoke once we've recovered the current
+ * ledger fragment.
+ * @param targetBookieAddress
+ * New bookie we want to use to recover and replicate the ledger
+ * entries that were stored on the failed bookie.
+ */
+ void replicate(final LedgerHandle lh, final LedgerFragment lf,
+ final AsyncCallback.VoidCallback ledgerFragmentMcb,
+ final InetSocketAddress targetBookieAddress)
+ throws InterruptedException {
+ Set<LedgerFragment> partionedFragments = splitIntoSubFragments(lh, lf,
+ bkc.getConf().getRereplicationEntryBatchSize());
+ LOG.info("Fragment :" + lf + " is split into sub fragments :"
+ + partionedFragments);
+ replicateNextBatch(lh, partionedFragments.iterator(),
+ ledgerFragmentMcb, targetBookieAddress);
+ }
+
+ /** Replicate the batched entry fragments one after other */
+ private void replicateNextBatch(final LedgerHandle lh,
+ final Iterator<LedgerFragment> fragments,
+ final AsyncCallback.VoidCallback ledgerFragmentMcb,
+ final InetSocketAddress targetBookieAddress) {
+ if (fragments.hasNext()) {
+ try {
+ replicateFragmentInternal(lh, fragments.next(),
+ new AsyncCallback.VoidCallback() {
+ @Override
+ public void processResult(int rc, String v, Object ctx) {
+ if (rc != BKException.Code.OK) {
+ ledgerFragmentMcb.processResult(rc, null,
+ null);
+ } else {
+ replicateNextBatch(lh, fragments,
+ ledgerFragmentMcb,
+ targetBookieAddress);
+ }
+ }
+
+ }, targetBookieAddress);
+ } catch (InterruptedException e) {
+ ledgerFragmentMcb.processResult(
+ BKException.Code.InterruptedException, null, null);
+ Thread.currentThread().interrupt();
+ }
+ } else {
+ ledgerFragmentMcb.processResult(BKException.Code.OK, null, null);
+ }
+ }
+
+ /**
+ * Split the full fragment into batched entry fragments by keeping
+ * rereplicationEntryBatchSize of entries in each one and can treat them as
+ * sub fragments
+ */
+ static Set<LedgerFragment> splitIntoSubFragments(LedgerHandle lh,
+ LedgerFragment ledgerFragment, long rereplicationEntryBatchSize) {
+ Set<LedgerFragment> fragments = new HashSet<LedgerFragment>();
+ if (rereplicationEntryBatchSize <= 0) {
+ // rereplicationEntryBatchSize can not be 0 or less than 0,
+ // returning with the current fragment
+ fragments.add(ledgerFragment);
+ return fragments;
+ }
+
+ long firstEntryId = ledgerFragment.getFirstStoredEntryId();
+ long lastEntryId = ledgerFragment.getLastStoredEntryId();
+ long numberOfEntriesToReplicate = (lastEntryId - firstEntryId) + 1;
+ long splitsWithFullEntries = numberOfEntriesToReplicate
+ / rereplicationEntryBatchSize;
+
+ if (splitsWithFullEntries == 0) {// only one fragment
+ fragments.add(ledgerFragment);
+ return fragments;
+ }
+
+ long fragmentSplitLastEntry = 0;
+ for (int i = 0; i < splitsWithFullEntries; i++) {
+ fragmentSplitLastEntry = (firstEntryId + rereplicationEntryBatchSize) - 1;
+ fragments.add(new LedgerFragment(lh, firstEntryId,
+ fragmentSplitLastEntry, ledgerFragment.getBookiesIndex()));
+ firstEntryId = fragmentSplitLastEntry + 1;
+ }
+
+ long lastSplitWithPartialEntries = numberOfEntriesToReplicate
+ % rereplicationEntryBatchSize;
+ if (lastSplitWithPartialEntries > 0) {
+ fragments.add(new LedgerFragment(lh, firstEntryId, firstEntryId
+ + lastSplitWithPartialEntries - 1, ledgerFragment
+ .getBookiesIndex()));
+ }
+ return fragments;
+ }
+
+ /**
* This method asynchronously recovers a specific ledger entry by reading
* the values via the BookKeeper Client (which would read it from the other
* replicas) and then writing it to the chosen new bookie.
@@ -233,77 +329,94 @@ public class LedgerFragmentReplicator {
ledgerFragmentsMcb.processResult(rc, null, null);
return;
}
- writeLedgerConfig();
+ updateEnsembleInfo(ledgerFragmentsMcb, fragmentStartId, lh,
+ oldBookie, newBookie);
}
+ }
- protected void writeLedgerConfig() {
- /*
- * Update the ledger metadata's ensemble info to point to the new
- * bookie.
- */
- ArrayList<InetSocketAddress> ensemble = lh.getLedgerMetadata()
- .getEnsembles().get(fragmentStartId);
- int deadBookieIndex = ensemble.indexOf(oldBookie);
- ensemble.remove(deadBookieIndex);
- ensemble.add(deadBookieIndex, newBookie);
- lh.writeLedgerConfig(new WriteCb());
+ /** Updates the ensemble with newBookie and notify the ensembleUpdatedCb */
+ private static void updateEnsembleInfo(
+ AsyncCallback.VoidCallback ensembleUpdatedCb, long fragmentStartId,
+ LedgerHandle lh, InetSocketAddress oldBookie,
+ InetSocketAddress newBookie) {
+ /*
+ * Update the ledger metadata's ensemble info to point to the new
+ * bookie.
+ */
+ ArrayList<InetSocketAddress> ensemble = lh.getLedgerMetadata()
+ .getEnsembles().get(fragmentStartId);
+ int deadBookieIndex = ensemble.indexOf(oldBookie);
+ ensemble.remove(deadBookieIndex);
+ ensemble.add(deadBookieIndex, newBookie);
+ lh.writeLedgerConfig(new UpdateEnsembleCb(ensembleUpdatedCb,
+ fragmentStartId, lh, oldBookie, newBookie));
+ }
+
+ /**
+ * Update the ensemble data with newBookie. re-reads the metadata on
+ * MetadataVersionException and update ensemble again. On successfull
+ * updation, it will also notify to super call back
+ */
+ private static class UpdateEnsembleCb implements GenericCallback<Void> {
+ final AsyncCallback.VoidCallback ensembleUpdatedCb;
+ final LedgerHandle lh;
+ final long fragmentStartId;
+ final InetSocketAddress oldBookie;
+ final InetSocketAddress newBookie;
+
+ public UpdateEnsembleCb(AsyncCallback.VoidCallback ledgerFragmentsMcb,
+ long fragmentStartId, LedgerHandle lh,
+ InetSocketAddress oldBookie, InetSocketAddress newBookie) {
+ this.ensembleUpdatedCb = ledgerFragmentsMcb;
+ this.lh = lh;
+ this.fragmentStartId = fragmentStartId;
+ this.newBookie = newBookie;
+ this.oldBookie = oldBookie;
}
- private class WriteCb implements GenericCallback<Void> {
- @Override
- public void operationComplete(int rc, Void result) {
- if (rc == BKException.Code.MetadataVersionException) {
- LOG
- .warn("Two fragments attempted update at once; ledger id: "
- + lh.getId()
- + " startid: "
- + fragmentStartId);
- // try again, the previous success (with which this has
- // conflicted)
- // will have updated the stat
- // other operations such as (addEnsemble) would update it
- // too.
- lh.rereadMetadata(new OrderedSafeGenericCallback<LedgerMetadata>(lh.bk.mainWorkerPool,
- lh.getId()) {
- @Override
- public void safeOperationComplete(int rc,
- LedgerMetadata newMeta) {
- if (rc != BKException.Code.OK) {
- LOG
- .error("Error reading updated ledger metadata for ledger "
- + lh.getId());
- ledgerFragmentsMcb
- .processResult(rc, null, null);
- } else {
- lh.metadata = newMeta;
- writeLedgerConfig();
+ @Override
+ public void operationComplete(int rc, Void result) {
+ if (rc == BKException.Code.MetadataVersionException) {
+ LOG.warn("Two fragments attempted update at once; ledger id: "
+ + lh.getId() + " startid: " + fragmentStartId);
+ // try again, the previous success (with which this has
+ // conflicted) will have updated the stat other operations
+ // such as (addEnsemble) would update it too.
+ lh
+ .rereadMetadata(new OrderedSafeGenericCallback<LedgerMetadata>(
+ lh.bk.mainWorkerPool, lh.getId()) {
+ @Override
+ public void safeOperationComplete(int rc,
+ LedgerMetadata newMeta) {
+ if (rc != BKException.Code.OK) {
+ LOG
+ .error("Error reading updated ledger metadata for ledger "
+ + lh.getId());
+ ensembleUpdatedCb.processResult(rc, null,
+ null);
+ } else {
+ lh.metadata = newMeta;
+ updateEnsembleInfo(ensembleUpdatedCb,
+ fragmentStartId, lh, oldBookie,
+ newBookie);
+ }
}
- }
- });
- return;
- } else if (rc != BKException.Code.OK) {
- LOG
- .error("Error updating ledger config metadata for ledgerId "
- + lh.getId()
- + " : "
- + BKException.getMessage(rc));
- } else {
- LOG
- .info("Updated ZK for ledgerId: ("
- + lh.getId()
- + " : "
- + fragmentStartId
- + ") to point ledger fragments from old dead bookie: ("
- + oldBookie + ") to new bookie: ("
- + newBookie + ")");
- }
- /*
- * Pass the return code result up the chain with the parent
- * callback.
- */
- ledgerFragmentsMcb.processResult(rc, null, null);
+ });
+ return;
+ } else if (rc != BKException.Code.OK) {
+ LOG.error("Error updating ledger config metadata for ledgerId "
+ + lh.getId() + " : " + BKException.getMessage(rc));
+ } else {
+ LOG.info("Updated ZK for ledgerId: (" + lh.getId() + " : "
+ + fragmentStartId
+ + ") to point ledger fragments from old dead bookie: ("
+ + oldBookie + ") to new bookie: (" + newBookie + ")");
}
- };
+ /*
+ * Pass the return code result up the chain with the parent
+ * callback.
+ */
+ ensembleUpdatedCb.processResult(rc, null, null);
+ }
}
-
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java?rev=1394542&r1=1394541&r2=1394542&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java Fri Oct 5 14:24:42 2012
@@ -51,6 +51,8 @@ public abstract class AbstractConfigurat
protected final static String LEDGER_MANAGER_FACTORY_CLASS = "ledgerManagerFactoryClass";
protected final static String ZK_LEDGERS_ROOT_PATH = "zkLedgersRootPath";
protected final static String AVAILABLE_NODE = "available";
+ protected final static String REREPLICATION_ENTRY_BATCH_SIZE = "rereplicationEntryBatchSize";
+
protected AbstractConfiguration() {
super();
// add configuration for system properties
@@ -174,4 +176,22 @@ public abstract class AbstractConfigurat
public String getZkAvailableBookiesPath() {
return getZkLedgersRootPath() + "/" + AVAILABLE_NODE;
}
+
+ /**
+ * Set the max entries to keep in fragment for re-replication. If fragment
+ * has more entries than this count, then the original fragment will be
+ * split into multiple small logical fragments by keeping max entries count
+ * to rereplicationEntryBatchSize. So, re-replication will happen in batches
+ * wise.
+ */
+ public void setRereplicationEntryBatchSize(long rereplicationEntryBatchSize) {
+ setProperty(REREPLICATION_ENTRY_BATCH_SIZE, rereplicationEntryBatchSize);
+ }
+
+ /**
+ * Get the re-replication entry batch size
+ */
+ public long getRereplicationEntryBatchSize() {
+ return getLong(REREPLICATION_ENTRY_BATCH_SIZE, 10);
+ }
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java?rev=1394542&r1=1394541&r2=1394542&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java Fri Oct 5 14:24:42 2012
@@ -36,7 +36,6 @@ import org.apache.bookkeeper.client.Ledg
import org.apache.bookkeeper.client.BKException.BKBookieHandleNotAvailableException;
import org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsException;
import org.apache.bookkeeper.client.BKException.BKReadException;
-import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
@@ -57,7 +56,7 @@ public class ReplicationWorker implement
private static Logger LOG = LoggerFactory
.getLogger(ReplicationWorker.class);
final private LedgerUnderreplicationManager underreplicationManager;
- private AbstractConfiguration conf;
+ private ServerConfiguration conf;
private ZooKeeper zkc;
private volatile boolean workerRunning = false;
final private BookKeeperAdmin admin;
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java?rev=1394542&r1=1394541&r2=1394542&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java Fri Oct 5 14:24:42 2012
@@ -28,6 +28,7 @@ import java.util.SortedMap;
import java.util.Map.Entry;
import java.util.concurrent.CountDownLatch;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.junit.Test;
@@ -40,6 +41,8 @@ import org.slf4j.LoggerFactory;
*/
public class TestLedgerFragmentReplication extends BookKeeperClusterTestCase {
+ private static final byte[] TEST_PSSWD = "testpasswd".getBytes();
+ private static final DigestType TEST_DIGEST_TYPE = BookKeeper.DigestType.CRC32;
private static Logger LOG = LoggerFactory
.getLogger(TestLedgerFragmentReplication.class);
@@ -72,8 +75,8 @@ public class TestLedgerFragmentReplicati
public void testReplicateLFShouldCopyFailedBookieFragmentsToTargetBookie()
throws Exception {
byte[] data = "TestLedgerFragmentReplication".getBytes();
- LedgerHandle lh = bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32,
- "testpasswd".getBytes());
+ LedgerHandle lh = bkc.createLedger(3, 3, TEST_DIGEST_TYPE,
+ TEST_PSSWD);
for (int i = 0; i < 10; i++) {
lh.addEntry(data);
@@ -129,8 +132,8 @@ public class TestLedgerFragmentReplicati
public void testReplicateLFFailsOnlyOnLastUnClosedFragments()
throws Exception {
byte[] data = "TestLedgerFragmentReplication".getBytes();
- LedgerHandle lh = bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32,
- "testpasswd".getBytes());
+ LedgerHandle lh = bkc.createLedger(3, 3, TEST_DIGEST_TYPE,
+ TEST_PSSWD);
for (int i = 0; i < 10; i++) {
lh.addEntry(data);
@@ -187,8 +190,8 @@ public class TestLedgerFragmentReplicati
public void testReplicateLFShouldReturnFalseIfTheReplicationFails()
throws Exception {
byte[] data = "TestLedgerFragmentReplication".getBytes();
- LedgerHandle lh = bkc.createLedger(2, 1, BookKeeper.DigestType.CRC32,
- "testpasswd".getBytes());
+ LedgerHandle lh = bkc.createLedger(2, 1, TEST_DIGEST_TYPE,
+ TEST_PSSWD);
for (int i = 0; i < 10; i++) {
lh.addEntry(data);
@@ -222,6 +225,94 @@ public class TestLedgerFragmentReplicati
}
}
}
+
+ /**
+ * Tests that splitIntoSubFragment should be able to split the original
+ * passed fragment into sub fragments at correct boundaries
+ */
+ @Test(timeout = 30000)
+ public void testSplitIntoSubFragmentsWithDifferentFragmentBoundaries()
+ throws Exception {
+ LedgerMetadata metadata = new LedgerMetadata(3, 3, 3, TEST_DIGEST_TYPE,
+ TEST_PSSWD) {
+ @Override
+ ArrayList<InetSocketAddress> getEnsemble(long entryId) {
+ return null;
+ }
+
+ @Override
+ public boolean isClosed() {
+ return true;
+ }
+ };
+ LedgerHandle lh = new LedgerHandle(bkc, 0, metadata, TEST_DIGEST_TYPE,
+ TEST_PSSWD);
+ testSplitIntoSubFragments(10, 21, -1, 1, lh);
+ testSplitIntoSubFragments(10, 21, 20, 1, lh);
+ testSplitIntoSubFragments(0, 0, 10, 1, lh);
+ testSplitIntoSubFragments(0, 1, 1, 2, lh);
+ testSplitIntoSubFragments(20, 24, 2, 3, lh);
+ testSplitIntoSubFragments(21, 32, 3, 4, lh);
+ testSplitIntoSubFragments(22, 103, 11, 8, lh);
+ testSplitIntoSubFragments(49, 51, 1, 3, lh);
+ testSplitIntoSubFragments(11, 101, 3, 31, lh);
+ }
+
+ /** assert the sub-fragment boundaries */
+ void testSplitIntoSubFragments(final long oriFragmentFirstEntry,
+ final long oriFragmentLastEntry, long entriesPerSubFragment,
+ long expectedSubFragments, LedgerHandle lh) {
+ LedgerFragment fr = new LedgerFragment(lh, oriFragmentFirstEntry,
+ oriFragmentLastEntry, 0) {
+ @Override
+ public long getLastStoredEntryId() {
+ return oriFragmentLastEntry;
+ }
+
+ @Override
+ public long getFirstStoredEntryId() {
+ return oriFragmentFirstEntry;
+ }
+ };
+ Set<LedgerFragment> subFragments = LedgerFragmentReplicator
+ .splitIntoSubFragments(lh, fr, entriesPerSubFragment);
+ assertEquals(expectedSubFragments, subFragments.size());
+ int fullSubFragment = 0;
+ int partialSubFragment = 0;
+ for (LedgerFragment ledgerFragment : subFragments) {
+ if ((ledgerFragment.getLastKnownEntryId()
+ - ledgerFragment.getFirstEntryId() + 1) == entriesPerSubFragment) {
+ fullSubFragment++;
+ } else {
+ long totalEntriesToReplicate = oriFragmentLastEntry
+ - oriFragmentFirstEntry + 1;
+ if (entriesPerSubFragment <= 0
+ || totalEntriesToReplicate / entriesPerSubFragment == 0) {
+ assertEquals(
+ "FirstEntryId should be same as original fragment's firstEntryId",
+ fr.getFirstEntryId(), ledgerFragment
+ .getFirstEntryId());
+ assertEquals(
+ "LastEntryId should be same as original fragment's lastEntryId",
+ fr.getLastKnownEntryId(), ledgerFragment
+ .getLastKnownEntryId());
+ } else {
+ long partialSplitEntries = totalEntriesToReplicate
+ % entriesPerSubFragment;
+ assertEquals(
+ "Partial fragment with wrong entry boundaries",
+ ledgerFragment.getLastKnownEntryId()
+ - ledgerFragment.getFirstEntryId() + 1,
+ partialSplitEntries);
+ }
+ partialSubFragment++;
+ }
+ }
+ assertEquals("Unexpected number of sub fargments", fullSubFragment
+ + partialSubFragment, expectedSubFragments);
+ assertTrue("There should be only one or zero partial sub Fragment",
+ partialSubFragment == 0 || partialSubFragment == 1);
+ }
private Set<LedgerFragment> getFragmentsToReplicate(LedgerHandle lh)
throws InterruptedException {
@@ -235,7 +326,7 @@ public class TestLedgerFragmentReplicati
private void verifyRecoveredLedgers(LedgerHandle lh, long startEntryId,
long endEntryId) throws BKException, InterruptedException {
LedgerHandle lhs = bkc.openLedgerNoRecovery(lh.getId(),
- BookKeeper.DigestType.CRC32, "testpasswd".getBytes());
+ TEST_DIGEST_TYPE, TEST_PSSWD);
Enumeration<LedgerEntry> entries = lhs.readEntries(startEntryId,
endEntryId);
assertTrue("Should have the elements", entries.hasMoreElements());
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java?rev=1394542&r1=1394541&r2=1394542&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java Fri Oct 5 14:24:42 2012
@@ -69,6 +69,7 @@ public class TestReplicationWorker exten
basePath = baseClientConf.getZkLedgersRootPath() + '/'
+ ZkLedgerUnderreplicationManager.UNDER_REPLICATION_NODE
+ "/ledgers";
+ baseConf.setRereplicationEntryBatchSize(3);
}
@Override