You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/10/05 16:24:43 UTC

svn commit: r1394542 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ bookk...

Author: ivank
Date: Fri Oct  5 14:24:42 2012
New Revision: 1394542

URL: http://svn.apache.org/viewvc?rev=1394542&view=rev
Log:
BOOKKEEPER-315: Ledger entries should be replicated sequentially instead of parallel. (umamahesh via ivank)

Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1394542&r1=1394541&r2=1394542&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Fri Oct  5 14:24:42 2012
@@ -150,6 +150,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-319: Manage auditing and replication processes (Vinay via ivank)
 
+        BOOKKEEPER-315: Ledger entries should be replicated sequentially instead of parallel. (umamahesh via ivank)
+
       hedwig-server:
 
         BOOKKEEPER-250: Need a ledger manager like interface to manage metadata operations in Hedwig (sijie via ivank)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java?rev=1394542&r1=1394541&r2=1394542&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java Fri Oct  5 14:24:42 2012
@@ -44,9 +44,9 @@ import org.apache.zookeeper.AsyncCallbac
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZKUtil;
-import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.ZooDefs.Ids;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -633,7 +633,7 @@ public class BookKeeperAdmin {
      */
     private void asyncRecoverLedgerFragment(final LedgerHandle lh,
             final LedgerFragment ledgerFragment,
-            final LedgerFragmentReplicator.SingleFragmentCallback ledgerFragmentMcb,
+            final AsyncCallback.VoidCallback ledgerFragmentMcb,
             final InetSocketAddress newBookie) throws InterruptedException {
         lfr.replicate(lh, ledgerFragment, ledgerFragmentMcb, newBookie);
     }
@@ -650,15 +650,15 @@ public class BookKeeperAdmin {
      */
     public void replicateLedgerFragment(LedgerHandle lh,
             final LedgerFragment ledgerFragment,
-            InetSocketAddress targetBookieAddress) throws InterruptedException,
-            BKException {
-        final SyncCounter syncCounter = new SyncCounter();
+            final InetSocketAddress targetBookieAddress)
+            throws InterruptedException, BKException {
+        SyncCounter syncCounter = new SyncCounter();
         ResultCallBack resultCallBack = new ResultCallBack(syncCounter);
-        SingleFragmentCallback sfcb = new SingleFragmentCallback(
-                resultCallBack, lh, ledgerFragment.getFirstStoredEntryId(),
-                ledgerFragment.getAddress(), targetBookieAddress);
+        SingleFragmentCallback cb = new SingleFragmentCallback(resultCallBack,
+                lh, ledgerFragment.getFirstEntryId(), ledgerFragment
+                        .getAddress(), targetBookieAddress);
         syncCounter.inc();
-        lfr.replicate(lh, ledgerFragment, sfcb, targetBookieAddress);
+        asyncRecoverLedgerFragment(lh, ledgerFragment, cb, targetBookieAddress);
         syncCounter.block(0);
         if (syncCounter.getrc() != BKException.Code.OK) {
             throw BKException.create(syncCounter.getrc());

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java?rev=1394542&r1=1394541&r2=1394542&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java Fri Oct  5 14:24:42 2012
@@ -85,6 +85,13 @@ public class LedgerFragment {
     public InetSocketAddress getAddress() {
         return ensemble.get(bookieIndex);
     }
+    
+    /**
+     * Gets the failedBookie index
+     */
+    public int getBookiesIndex() {
+        return bookieIndex;
+    }
 
     /**
      * Gets the first stored entry id of the fragment in failed bookie.

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java?rev=1394542&r1=1394541&r2=1394542&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java Fri Oct  5 14:24:42 2012
@@ -22,8 +22,11 @@ package org.apache.bookkeeper.client;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.proto.BookieProtocol;
@@ -54,25 +57,10 @@ public class LedgerFragmentReplicator {
     private static Logger LOG = LoggerFactory
             .getLogger(LedgerFragmentReplicator.class);
 
-    /**
-     * This method replicate a ledger fragment which is a contiguous portion of
-     * a ledger that was stored in an ensemble that included the failed bookie.
-     * 
-     * @param lh
-     *            LedgerHandle for the ledger
-     * @param lf
-     *            LedgerFragment to replicate
-     * @param ledgerFragmentMcb
-     *            MultiCallback to invoke once we've recovered the current
-     *            ledger fragment.
-     * @param newBookie
-     *            New bookie we want to use to recover and replicate the ledger
-     *            entries that were stored on the failed bookie.
-     */
-    void replicate(final LedgerHandle lh, final LedgerFragment lf,
-                   final SingleFragmentCallback ledgerFragmentMcb,
-                   final InetSocketAddress newBookie)
-            throws InterruptedException {
+    private void replicateFragmentInternal(final LedgerHandle lh,
+            final LedgerFragment lf,
+            final AsyncCallback.VoidCallback ledgerFragmentMcb,
+            final InetSocketAddress newBookie) throws InterruptedException {
         if (!lf.isClosed()) {
             LOG.error("Trying to replicate an unclosed fragment;"
                       + " This is not safe {}", lf);
@@ -124,6 +112,114 @@ public class LedgerFragmentReplicator {
     }
 
     /**
+     * This method replicate a ledger fragment which is a contiguous portion of
+     * a ledger that was stored in an ensemble that included the failed bookie.
+     * It will Splits the fragment into multiple sub fragments by keeping the
+     * max entries up to the configured value of rereplicationEntryBatchSize and
+     * then it re-replicates that batched entry fragments one by one. After
+     * re-replication of all batched entry fragments, it will update the
+     * ensemble info with new Bookie once
+     * 
+     * @param lh
+     *            LedgerHandle for the ledger
+     * @param lf
+     *            LedgerFragment to replicate
+     * @param ledgerFragmentMcb
+     *            MultiCallback to invoke once we've recovered the current
+     *            ledger fragment.
+     * @param targetBookieAddress
+     *            New bookie we want to use to recover and replicate the ledger
+     *            entries that were stored on the failed bookie.
+     */
+    void replicate(final LedgerHandle lh, final LedgerFragment lf,
+            final AsyncCallback.VoidCallback ledgerFragmentMcb,
+            final InetSocketAddress targetBookieAddress)
+            throws InterruptedException {
+        Set<LedgerFragment> partionedFragments = splitIntoSubFragments(lh, lf,
+                bkc.getConf().getRereplicationEntryBatchSize());
+        LOG.info("Fragment :" + lf + " is split into sub fragments :"
+                + partionedFragments);
+        replicateNextBatch(lh, partionedFragments.iterator(),
+                ledgerFragmentMcb, targetBookieAddress);
+    }
+
+    /** Replicate the batched entry fragments one after other */
+    private void replicateNextBatch(final LedgerHandle lh,
+            final Iterator<LedgerFragment> fragments,
+            final AsyncCallback.VoidCallback ledgerFragmentMcb,
+            final InetSocketAddress targetBookieAddress) {
+        if (fragments.hasNext()) {
+            try {
+                replicateFragmentInternal(lh, fragments.next(),
+                        new AsyncCallback.VoidCallback() {
+                            @Override
+                            public void processResult(int rc, String v, Object ctx) {
+                                if (rc != BKException.Code.OK) {
+                                    ledgerFragmentMcb.processResult(rc, null,
+                                            null);
+                                } else {
+                                    replicateNextBatch(lh, fragments,
+                                            ledgerFragmentMcb,
+                                            targetBookieAddress);
+                                }
+                            }
+
+                        }, targetBookieAddress);
+            } catch (InterruptedException e) {
+                ledgerFragmentMcb.processResult(
+                        BKException.Code.InterruptedException, null, null);
+                Thread.currentThread().interrupt();
+            }
+        } else {
+            ledgerFragmentMcb.processResult(BKException.Code.OK, null, null);
+        }
+    }
+
+    /**
+     * Split the full fragment into batched entry fragments by keeping
+     * rereplicationEntryBatchSize of entries in each one and can treat them as
+     * sub fragments
+     */
+    static Set<LedgerFragment> splitIntoSubFragments(LedgerHandle lh,
+            LedgerFragment ledgerFragment, long rereplicationEntryBatchSize) {
+        Set<LedgerFragment> fragments = new HashSet<LedgerFragment>();
+        if (rereplicationEntryBatchSize <= 0) {
+            // rereplicationEntryBatchSize can not be 0 or less than 0,
+            // returning with the current fragment
+            fragments.add(ledgerFragment);
+            return fragments;
+        }
+
+        long firstEntryId = ledgerFragment.getFirstStoredEntryId();
+        long lastEntryId = ledgerFragment.getLastStoredEntryId();
+        long numberOfEntriesToReplicate = (lastEntryId - firstEntryId) + 1;
+        long splitsWithFullEntries = numberOfEntriesToReplicate
+                / rereplicationEntryBatchSize;
+
+        if (splitsWithFullEntries == 0) {// only one fragment
+            fragments.add(ledgerFragment);
+            return fragments;
+        }
+
+        long fragmentSplitLastEntry = 0;
+        for (int i = 0; i < splitsWithFullEntries; i++) {
+            fragmentSplitLastEntry = (firstEntryId + rereplicationEntryBatchSize) - 1;
+            fragments.add(new LedgerFragment(lh, firstEntryId,
+                    fragmentSplitLastEntry, ledgerFragment.getBookiesIndex()));
+            firstEntryId = fragmentSplitLastEntry + 1;
+        }
+
+        long lastSplitWithPartialEntries = numberOfEntriesToReplicate
+                % rereplicationEntryBatchSize;
+        if (lastSplitWithPartialEntries > 0) {
+            fragments.add(new LedgerFragment(lh, firstEntryId, firstEntryId
+                    + lastSplitWithPartialEntries - 1, ledgerFragment
+                    .getBookiesIndex()));
+        }
+        return fragments;
+    }
+
+    /**
      * This method asynchronously recovers a specific ledger entry by reading
      * the values via the BookKeeper Client (which would read it from the other
      * replicas) and then writing it to the chosen new bookie.
@@ -233,77 +329,94 @@ public class LedgerFragmentReplicator {
                 ledgerFragmentsMcb.processResult(rc, null, null);
                 return;
             }
-            writeLedgerConfig();
+            updateEnsembleInfo(ledgerFragmentsMcb, fragmentStartId, lh,
+                                        oldBookie, newBookie);
         }
+    }
 
-        protected void writeLedgerConfig() {
-            /*
-             * Update the ledger metadata's ensemble info to point to the new
-             * bookie.
-             */
-            ArrayList<InetSocketAddress> ensemble = lh.getLedgerMetadata()
-                    .getEnsembles().get(fragmentStartId);
-            int deadBookieIndex = ensemble.indexOf(oldBookie);
-            ensemble.remove(deadBookieIndex);
-            ensemble.add(deadBookieIndex, newBookie);
-            lh.writeLedgerConfig(new WriteCb());
+    /** Updates the ensemble with newBookie and notify the ensembleUpdatedCb */
+    private static void updateEnsembleInfo(
+            AsyncCallback.VoidCallback ensembleUpdatedCb, long fragmentStartId,
+            LedgerHandle lh, InetSocketAddress oldBookie,
+            InetSocketAddress newBookie) {
+        /*
+         * Update the ledger metadata's ensemble info to point to the new
+         * bookie.
+         */
+        ArrayList<InetSocketAddress> ensemble = lh.getLedgerMetadata()
+                .getEnsembles().get(fragmentStartId);
+        int deadBookieIndex = ensemble.indexOf(oldBookie);
+        ensemble.remove(deadBookieIndex);
+        ensemble.add(deadBookieIndex, newBookie);
+        lh.writeLedgerConfig(new UpdateEnsembleCb(ensembleUpdatedCb,
+                fragmentStartId, lh, oldBookie, newBookie));
+    }
+
+    /**
+     * Update the ensemble data with newBookie. re-reads the metadata on
+     * MetadataVersionException and update ensemble again. On successfull
+     * updation, it will also notify to super call back
+     */
+    private static class UpdateEnsembleCb implements GenericCallback<Void> {
+        final AsyncCallback.VoidCallback ensembleUpdatedCb;
+        final LedgerHandle lh;
+        final long fragmentStartId;
+        final InetSocketAddress oldBookie;
+        final InetSocketAddress newBookie;
+
+        public UpdateEnsembleCb(AsyncCallback.VoidCallback ledgerFragmentsMcb,
+                long fragmentStartId, LedgerHandle lh,
+                InetSocketAddress oldBookie, InetSocketAddress newBookie) {
+            this.ensembleUpdatedCb = ledgerFragmentsMcb;
+            this.lh = lh;
+            this.fragmentStartId = fragmentStartId;
+            this.newBookie = newBookie;
+            this.oldBookie = oldBookie;
         }
 
-        private class WriteCb implements GenericCallback<Void> {
-            @Override
-            public void operationComplete(int rc, Void result) {
-                if (rc == BKException.Code.MetadataVersionException) {
-                    LOG
-                            .warn("Two fragments attempted update at once; ledger id: "
-                                    + lh.getId()
-                                    + " startid: "
-                                    + fragmentStartId);
-                    // try again, the previous success (with which this has
-                    // conflicted)
-                    // will have updated the stat
-                    // other operations such as (addEnsemble) would update it
-                    // too.
-                    lh.rereadMetadata(new OrderedSafeGenericCallback<LedgerMetadata>(lh.bk.mainWorkerPool,
-                                                                                     lh.getId()) {
-                        @Override
-                        public void safeOperationComplete(int rc,
-                                LedgerMetadata newMeta) {
-                            if (rc != BKException.Code.OK) {
-                                LOG
-                                        .error("Error reading updated ledger metadata for ledger "
-                                                + lh.getId());
-                                ledgerFragmentsMcb
-                                        .processResult(rc, null, null);
-                            } else {
-                                lh.metadata = newMeta;
-                                writeLedgerConfig();
+        @Override
+        public void operationComplete(int rc, Void result) {
+            if (rc == BKException.Code.MetadataVersionException) {
+                LOG.warn("Two fragments attempted update at once; ledger id: "
+                        + lh.getId() + " startid: " + fragmentStartId);
+                // try again, the previous success (with which this has
+                // conflicted) will have updated the stat other operations
+                // such as (addEnsemble) would update it too.
+                lh
+                        .rereadMetadata(new OrderedSafeGenericCallback<LedgerMetadata>(
+                                lh.bk.mainWorkerPool, lh.getId()) {
+                            @Override
+                            public void safeOperationComplete(int rc,
+                                    LedgerMetadata newMeta) {
+                                if (rc != BKException.Code.OK) {
+                                    LOG
+                                            .error("Error reading updated ledger metadata for ledger "
+                                                    + lh.getId());
+                                    ensembleUpdatedCb.processResult(rc, null,
+                                            null);
+                                } else {
+                                    lh.metadata = newMeta;
+                                    updateEnsembleInfo(ensembleUpdatedCb,
+                                            fragmentStartId, lh, oldBookie,
+                                            newBookie);
+                                }
                             }
-                        }
-                    });
-                    return;
-                } else if (rc != BKException.Code.OK) {
-                    LOG
-                            .error("Error updating ledger config metadata for ledgerId "
-                                    + lh.getId()
-                                    + " : "
-                                    + BKException.getMessage(rc));
-                } else {
-                    LOG
-                            .info("Updated ZK for ledgerId: ("
-                                    + lh.getId()
-                                    + " : "
-                                    + fragmentStartId
-                                    + ") to point ledger fragments from old dead bookie: ("
-                                    + oldBookie + ") to new bookie: ("
-                                    + newBookie + ")");
-                }
-                /*
-                 * Pass the return code result up the chain with the parent
-                 * callback.
-                 */
-                ledgerFragmentsMcb.processResult(rc, null, null);
+                        });
+                return;
+            } else if (rc != BKException.Code.OK) {
+                LOG.error("Error updating ledger config metadata for ledgerId "
+                        + lh.getId() + " : " + BKException.getMessage(rc));
+            } else {
+                LOG.info("Updated ZK for ledgerId: (" + lh.getId() + " : "
+                        + fragmentStartId
+                        + ") to point ledger fragments from old dead bookie: ("
+                        + oldBookie + ") to new bookie: (" + newBookie + ")");
             }
-        };
+            /*
+             * Pass the return code result up the chain with the parent
+             * callback.
+             */
+            ensembleUpdatedCb.processResult(rc, null, null);
+        }
     }
-
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java?rev=1394542&r1=1394541&r2=1394542&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java Fri Oct  5 14:24:42 2012
@@ -51,6 +51,8 @@ public abstract class AbstractConfigurat
     protected final static String LEDGER_MANAGER_FACTORY_CLASS = "ledgerManagerFactoryClass";
     protected final static String ZK_LEDGERS_ROOT_PATH = "zkLedgersRootPath";
     protected final static String AVAILABLE_NODE = "available";
+    protected final static String REREPLICATION_ENTRY_BATCH_SIZE = "rereplicationEntryBatchSize";
+
     protected AbstractConfiguration() {
         super();
         // add configuration for system properties
@@ -174,4 +176,22 @@ public abstract class AbstractConfigurat
     public String getZkAvailableBookiesPath() {
         return getZkLedgersRootPath() + "/" + AVAILABLE_NODE;
     }
+    
+    /**
+     * Set the max entries to keep in fragment for re-replication. If fragment
+     * has more entries than this count, then the original fragment will be
+     * split into multiple small logical fragments by keeping max entries count
+     * to rereplicationEntryBatchSize. So, re-replication will happen in batches
+     * wise.
+     */
+    public void setRereplicationEntryBatchSize(long rereplicationEntryBatchSize) {
+        setProperty(REREPLICATION_ENTRY_BATCH_SIZE, rereplicationEntryBatchSize);
+    }
+
+    /**
+     * Get the re-replication entry batch size
+     */
+    public long getRereplicationEntryBatchSize() {
+        return getLong(REREPLICATION_ENTRY_BATCH_SIZE, 10);
+    }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java?rev=1394542&r1=1394541&r2=1394542&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java Fri Oct  5 14:24:42 2012
@@ -36,7 +36,6 @@ import org.apache.bookkeeper.client.Ledg
 import org.apache.bookkeeper.client.BKException.BKBookieHandleNotAvailableException;
 import org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsException;
 import org.apache.bookkeeper.client.BKException.BKReadException;
-import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
@@ -57,7 +56,7 @@ public class ReplicationWorker implement
     private static Logger LOG = LoggerFactory
             .getLogger(ReplicationWorker.class);
     final private LedgerUnderreplicationManager underreplicationManager;
-    private AbstractConfiguration conf;
+    private ServerConfiguration conf;
     private ZooKeeper zkc;
     private volatile boolean workerRunning = false;
     final private BookKeeperAdmin admin;

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java?rev=1394542&r1=1394541&r2=1394542&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java Fri Oct  5 14:24:42 2012
@@ -28,6 +28,7 @@ import java.util.SortedMap;
 import java.util.Map.Entry;
 import java.util.concurrent.CountDownLatch;
 
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.junit.Test;
@@ -40,6 +41,8 @@ import org.slf4j.LoggerFactory;
  */
 public class TestLedgerFragmentReplication extends BookKeeperClusterTestCase {
 
+    private static final byte[] TEST_PSSWD = "testpasswd".getBytes();
+    private static final DigestType TEST_DIGEST_TYPE = BookKeeper.DigestType.CRC32;
     private static Logger LOG = LoggerFactory
             .getLogger(TestLedgerFragmentReplication.class);
 
@@ -72,8 +75,8 @@ public class TestLedgerFragmentReplicati
     public void testReplicateLFShouldCopyFailedBookieFragmentsToTargetBookie()
             throws Exception {
         byte[] data = "TestLedgerFragmentReplication".getBytes();
-        LedgerHandle lh = bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32,
-                "testpasswd".getBytes());
+        LedgerHandle lh = bkc.createLedger(3, 3, TEST_DIGEST_TYPE,
+                TEST_PSSWD);
 
         for (int i = 0; i < 10; i++) {
             lh.addEntry(data);
@@ -129,8 +132,8 @@ public class TestLedgerFragmentReplicati
     public void testReplicateLFFailsOnlyOnLastUnClosedFragments()
             throws Exception {
         byte[] data = "TestLedgerFragmentReplication".getBytes();
-        LedgerHandle lh = bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32,
-                "testpasswd".getBytes());
+        LedgerHandle lh = bkc.createLedger(3, 3, TEST_DIGEST_TYPE,
+                TEST_PSSWD);
 
         for (int i = 0; i < 10; i++) {
             lh.addEntry(data);
@@ -187,8 +190,8 @@ public class TestLedgerFragmentReplicati
     public void testReplicateLFShouldReturnFalseIfTheReplicationFails()
             throws Exception {
         byte[] data = "TestLedgerFragmentReplication".getBytes();
-        LedgerHandle lh = bkc.createLedger(2, 1, BookKeeper.DigestType.CRC32,
-                "testpasswd".getBytes());
+        LedgerHandle lh = bkc.createLedger(2, 1, TEST_DIGEST_TYPE,
+                TEST_PSSWD);
 
         for (int i = 0; i < 10; i++) {
             lh.addEntry(data);
@@ -222,6 +225,94 @@ public class TestLedgerFragmentReplicati
             }
         }
     }
+    
+    /**
+     * Tests that splitIntoSubFragment should be able to split the original
+     * passed fragment into sub fragments at correct boundaries
+     */
+    @Test(timeout = 30000)
+    public void testSplitIntoSubFragmentsWithDifferentFragmentBoundaries()
+            throws Exception {
+        LedgerMetadata metadata = new LedgerMetadata(3, 3, 3, TEST_DIGEST_TYPE,
+                TEST_PSSWD) {
+            @Override
+            ArrayList<InetSocketAddress> getEnsemble(long entryId) {
+                return null;
+            }
+
+            @Override
+            public boolean isClosed() {
+                return true;
+            }
+        };
+        LedgerHandle lh = new LedgerHandle(bkc, 0, metadata, TEST_DIGEST_TYPE,
+                TEST_PSSWD);
+        testSplitIntoSubFragments(10, 21, -1, 1, lh);
+        testSplitIntoSubFragments(10, 21, 20, 1, lh);
+        testSplitIntoSubFragments(0, 0, 10, 1, lh);
+        testSplitIntoSubFragments(0, 1, 1, 2, lh);
+        testSplitIntoSubFragments(20, 24, 2, 3, lh);
+        testSplitIntoSubFragments(21, 32, 3, 4, lh);
+        testSplitIntoSubFragments(22, 103, 11, 8, lh);
+        testSplitIntoSubFragments(49, 51, 1, 3, lh);
+        testSplitIntoSubFragments(11, 101, 3, 31, lh);
+    }
+
+    /** assert the sub-fragment boundaries */
+    void testSplitIntoSubFragments(final long oriFragmentFirstEntry,
+            final long oriFragmentLastEntry, long entriesPerSubFragment,
+            long expectedSubFragments, LedgerHandle lh) {
+        LedgerFragment fr = new LedgerFragment(lh, oriFragmentFirstEntry,
+                oriFragmentLastEntry, 0) {
+            @Override
+            public long getLastStoredEntryId() {
+                return oriFragmentLastEntry;
+            }
+
+            @Override
+            public long getFirstStoredEntryId() {
+                return oriFragmentFirstEntry;
+            }
+        };
+        Set<LedgerFragment> subFragments = LedgerFragmentReplicator
+                .splitIntoSubFragments(lh, fr, entriesPerSubFragment);
+        assertEquals(expectedSubFragments, subFragments.size());
+        int fullSubFragment = 0;
+        int partialSubFragment = 0;
+        for (LedgerFragment ledgerFragment : subFragments) {
+            if ((ledgerFragment.getLastKnownEntryId()
+                    - ledgerFragment.getFirstEntryId() + 1) == entriesPerSubFragment) {
+                fullSubFragment++;
+            } else {
+                long totalEntriesToReplicate = oriFragmentLastEntry
+                        - oriFragmentFirstEntry + 1;
+                if (entriesPerSubFragment <= 0
+                        || totalEntriesToReplicate / entriesPerSubFragment == 0) {
+                    assertEquals(
+                            "FirstEntryId should be same as original fragment's firstEntryId",
+                            fr.getFirstEntryId(), ledgerFragment
+                                    .getFirstEntryId());
+                    assertEquals(
+                            "LastEntryId should be same as original fragment's lastEntryId",
+                            fr.getLastKnownEntryId(), ledgerFragment
+                                    .getLastKnownEntryId());
+                } else {
+                    long partialSplitEntries = totalEntriesToReplicate
+                            % entriesPerSubFragment;
+                    assertEquals(
+                            "Partial fragment with wrong entry boundaries",
+                            ledgerFragment.getLastKnownEntryId()
+                                    - ledgerFragment.getFirstEntryId() + 1,
+                            partialSplitEntries);
+                }
+                partialSubFragment++;
+            }
+        }
+        assertEquals("Unexpected number of sub fargments", fullSubFragment
+                + partialSubFragment, expectedSubFragments);
+        assertTrue("There should be only one or zero partial sub Fragment",
+                partialSubFragment == 0 || partialSubFragment == 1);
+    }
 
     private Set<LedgerFragment> getFragmentsToReplicate(LedgerHandle lh)
             throws InterruptedException {
@@ -235,7 +326,7 @@ public class TestLedgerFragmentReplicati
     private void verifyRecoveredLedgers(LedgerHandle lh, long startEntryId,
             long endEntryId) throws BKException, InterruptedException {
         LedgerHandle lhs = bkc.openLedgerNoRecovery(lh.getId(),
-                BookKeeper.DigestType.CRC32, "testpasswd".getBytes());
+                TEST_DIGEST_TYPE, TEST_PSSWD);
         Enumeration<LedgerEntry> entries = lhs.readEntries(startEntryId,
                 endEntryId);
         assertTrue("Should have the elements", entries.hasMoreElements());

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java?rev=1394542&r1=1394541&r2=1394542&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java Fri Oct  5 14:24:42 2012
@@ -69,6 +69,7 @@ public class TestReplicationWorker exten
         basePath = baseClientConf.getZkLedgersRootPath() + '/'
                 + ZkLedgerUnderreplicationManager.UNDER_REPLICATION_NODE
                 + "/ledgers";
+        baseConf.setRereplicationEntryBatchSize(3);
     }
 
     @Override