You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/02/17 06:41:22 UTC

[GitHub] sijie closed pull request #1085: [Merge Yahoo repo]: YBK-160: Doing distributed random verification of ledger fragments

sijie closed pull request #1085: [Merge Yahoo repo]: YBK-160: Doing distributed random verification of ledger fragments
URL: https://github.com/apache/bookkeeper/pull/1085
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
index 8c857d843..049492bab 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
@@ -25,9 +25,12 @@
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.bookkeeper.client.BKException.Code;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
@@ -35,6 +38,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * A utility class to check the complete ledger and finds the UnderReplicated fragments if any.
  *
@@ -143,7 +147,8 @@ public LedgerChecker(BookKeeper bkc) {
      * @throws InvalidFragmentException
      */
     private void verifyLedgerFragment(LedgerFragment fragment,
-                                      GenericCallback<LedgerFragment> cb)
+                                      GenericCallback<LedgerFragment> cb,
+                                      Long percentageOfLedgerFragmentToBeVerified)
             throws InvalidFragmentException, BKException {
         Set<Integer> bookiesToCheck = fragment.getBookiesIndexes();
         if (bookiesToCheck.isEmpty()) {
@@ -156,7 +161,7 @@ private void verifyLedgerFragment(LedgerFragment fragment,
         for (Integer bookieIndex : bookiesToCheck) {
             LedgerFragmentCallback lfCb = new LedgerFragmentCallback(
                     fragment, bookieIndex, cb, badBookies, numBookies);
-            verifyLedgerFragment(fragment, bookieIndex, lfCb);
+            verifyLedgerFragment(fragment, bookieIndex, lfCb, percentageOfLedgerFragmentToBeVerified);
         }
     }
 
@@ -173,7 +178,8 @@ private void verifyLedgerFragment(LedgerFragment fragment,
      */
     private void verifyLedgerFragment(LedgerFragment fragment,
                                       int bookieIndex,
-                                      GenericCallback<LedgerFragment> cb)
+                                      GenericCallback<LedgerFragment> cb,
+                                      long percentageOfLedgerFragmentToBeVerified)
             throws InvalidFragmentException {
         long firstStored = fragment.getFirstStoredEntryId(bookieIndex);
         long lastStored = fragment.getLastStoredEntryId(bookieIndex);
@@ -195,10 +201,48 @@ private void verifyLedgerFragment(LedgerFragment fragment,
             bookieClient.readEntry(bookie, fragment
                     .getLedgerId(), firstStored, manycb, null);
         } else {
-            ReadManyEntriesCallback manycb = new ReadManyEntriesCallback(2,
+            if (lastStored <= firstStored) {
+                cb.operationComplete(Code.IncorrectParameterException, null);
+                return;
+            }
+
+            long lengthOfLedgerFragment = lastStored - firstStored + 1;
+
+            int numberOfEntriesToBeVerified =
+                (int) (lengthOfLedgerFragment * (percentageOfLedgerFragmentToBeVerified / 100.0));
+
+            TreeSet<Long> entriesToBeVerified = new TreeSet<Long>();
+
+            if (numberOfEntriesToBeVerified < lengthOfLedgerFragment) {
+                // Evenly pick random entries over the length of the fragment
+                if (numberOfEntriesToBeVerified > 0) {
+                    int lengthOfBucket = (int) (lengthOfLedgerFragment / numberOfEntriesToBeVerified);
+                    for (long index = firstStored;
+                         index < (lastStored - lengthOfBucket - 1);
+                         index += lengthOfBucket) {
+                        long potentialEntryId = ThreadLocalRandom.current().nextInt((lengthOfBucket)) + index;
+                        if (fragment.isStoredEntryId(potentialEntryId, bookieIndex)) {
+                            entriesToBeVerified.add(potentialEntryId);
+                        }
+                    }
+                }
+                entriesToBeVerified.add(firstStored);
+                entriesToBeVerified.add(lastStored);
+            } else {
+                // Verify the entire fragment
+                while (firstStored <= lastStored) {
+                    if (fragment.isStoredEntryId(firstStored, bookieIndex)) {
+                        entriesToBeVerified.add(firstStored);
+                    }
+                    firstStored++;
+                }
+            }
+            ReadManyEntriesCallback manycb = new ReadManyEntriesCallback(entriesToBeVerified.size(),
                     fragment, cb);
-            bookieClient.readEntry(bookie, fragment.getLedgerId(), firstStored, manycb, null);
-            bookieClient.readEntry(bookie, fragment.getLedgerId(), lastStored, manycb, null);
+
+            for (Long entryID: entriesToBeVerified) {
+                bookieClient.readEntry(bookie, fragment.getLedgerId(), entryID, manycb, null);
+            }
         }
     }
 
@@ -267,6 +311,12 @@ public void operationComplete(int rc, LedgerFragment result) {
      */
     public void checkLedger(final LedgerHandle lh,
                             final GenericCallback<Set<LedgerFragment>> cb) {
+        checkLedger(lh, cb, 0L);
+    }
+
+    public void checkLedger(final LedgerHandle lh,
+                            final GenericCallback<Set<LedgerFragment>> cb,
+                            long percentageOfLedgerFragmentToBeVerified) {
         // build a set of all fragment replicas
         final Set<LedgerFragment> fragments = new HashSet<LedgerFragment>();
 
@@ -286,9 +336,6 @@ public void checkLedger(final LedgerHandle lh,
             curEnsemble = e.getValue();
         }
 
-
-
-
         /* Checking the last segment of the ledger can be complicated in some cases.
          * In the case that the ledger is closed, we can just check the fragments of
          * the segment as normal even if no data has ever been written to.
@@ -327,7 +374,8 @@ public void operationComplete(int rc, Boolean result) {
                                                       if (result) {
                                                           fragments.add(lastLedgerFragment);
                                                       }
-                                                      checkFragments(fragments, cb);
+                                                      checkFragments(fragments, cb,
+                                                          percentageOfLedgerFragmentToBeVerified);
                                                   }
                                               });
 
@@ -342,12 +390,12 @@ public void operationComplete(int rc, Boolean result) {
                 fragments.add(lastLedgerFragment);
             }
         }
-
-        checkFragments(fragments, cb);
+        checkFragments(fragments, cb, percentageOfLedgerFragmentToBeVerified);
     }
 
     private void checkFragments(Set<LedgerFragment> fragments,
-                                GenericCallback<Set<LedgerFragment>> cb) {
+                                GenericCallback<Set<LedgerFragment>> cb,
+                                long percentageOfLedgerFragmentToBeVerified) {
         if (fragments.size() == 0) { // no fragments to verify
             cb.operationComplete(BKException.Code.OK, fragments);
             return;
@@ -359,7 +407,7 @@ private void checkFragments(Set<LedgerFragment> fragments,
         for (LedgerFragment r : fragments) {
             LOG.debug("Checking fragment {}", r);
             try {
-                verifyLedgerFragment(r, allFragmentsCb);
+                verifyLedgerFragment(r, allFragmentsCb, percentageOfLedgerFragmentToBeVerified);
             } catch (InvalidFragmentException ife) {
                 LOG.error("Invalid fragment found : {}", r);
                 allFragmentsCb.operationComplete(
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
index f7cdf73d5..1a97af846 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
@@ -201,6 +201,10 @@ public Long getLastStoredEntryId(int bookieIndex) {
         return LedgerHandle.INVALID_ENTRY_ID;
     }
 
+    public boolean isStoredEntryId(long entryId, int bookieIndex) {
+        return schedule.hasEntry(entryId, bookieIndex);
+    }
+
     /**
      * Gets the ensemble of fragment.
      *
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 3a7c763de..8777d445e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -116,6 +116,7 @@
     // Replication parameters
     protected static final String AUDITOR_PERIODIC_CHECK_INTERVAL = "auditorPeriodicCheckInterval";
     protected static final String AUDITOR_PERIODIC_BOOKIE_CHECK_INTERVAL = "auditorPeriodicBookieCheckInterval";
+    protected static final String AUDITOR_LEDGER_VERIFICATION_PERCENTAGE = "auditorLedgerVerificationPercentage";
     protected static final String AUTO_RECOVERY_DAEMON_ENABLED = "autoRecoveryDaemonEnabled";
     protected static final String LOST_BOOKIE_RECOVERY_DELAY = "lostBookieRecoveryDelay";
     protected static final String RW_REREPLICATE_BACKOFF_MS = "rwRereplicateBackoffMs";
@@ -1804,6 +1805,28 @@ public long getAuditorPeriodicBookieCheckInterval() {
         return getLong(AUDITOR_PERIODIC_BOOKIE_CHECK_INTERVAL, 86400);
     }
 
+    /**
+     * Set what percentage of a ledger (fragment)'s entries will be verified.
+     * 0 - only the first and last entry of each ledger fragment would be verified
+     * 100 - the entire ledger fragment would be verified
+     * anything else - randomly picked entries from over the fragment would be verifiec
+     * @param auditorLedgerVerificationPercentage The verification proportion as a percentage
+     * @return ServerConfiguration
+     */
+    public ServerConfiguration setAuditorLedgerVerificationPercentage(long auditorLedgerVerificationPercentage) {
+        setProperty(AUDITOR_LEDGER_VERIFICATION_PERCENTAGE, auditorLedgerVerificationPercentage);
+        return this;
+    }
+
+    /**
+     * Get what percentage of a ledger (fragment)'s entries will be verified.
+     * @see #setAuditorLedgerVerificationPercentage(long)
+     * @return percentage of a ledger (fragment)'s entries will be verified. Default is 0.
+     */
+    public long getAuditorLedgerVerificationPercentage() {
+        return getLong(AUDITOR_LEDGER_VERIFICATION_PERCENTAGE, 0);
+    }
+
     /**
      * Sets that whether the auto-recovery service can start along with Bookie
      * server itself or not.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
index 188ce71b2..a1d9ea054 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
@@ -638,7 +638,9 @@ public void process(final Long ledgerId,
                     LedgerHandle lh = null;
                     try {
                         lh = admin.openLedgerNoRecovery(ledgerId);
-                        checker.checkLedger(lh, new ProcessLostFragmentsCb(lh, callback));
+                        checker.checkLedger(lh,
+                            new ProcessLostFragmentsCb(lh, callback),
+                            conf.getAuditorLedgerVerificationPercentage());
                         // we collect the following stats to get a measure of the
                         // distribution of a single ledger within the bk cluster
                         // the higher the number of fragments/bookies, the more distributed it is
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
index cd9bce470..65c0c2ce2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
@@ -235,7 +235,9 @@ private boolean rereplicate(long ledgerIdToReplicate) throws InterruptedExceptio
         boolean deferLedgerLockRelease = false;
 
         try (LedgerHandle lh = admin.openLedgerNoRecovery(ledgerIdToReplicate)) {
-            Set<LedgerFragment> fragments = getUnderreplicatedFragments(lh);
+            Set<LedgerFragment> fragments =
+                getUnderreplicatedFragments(lh, conf.getAuditorLedgerVerificationPercentage());
+
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Founds fragments {} for replication from ledger: {}", fragments, ledgerIdToReplicate);
             }
@@ -263,7 +265,7 @@ private boolean rereplicate(long ledgerIdToReplicate) throws InterruptedExceptio
                 return false;
             }
 
-            fragments = getUnderreplicatedFragments(lh);
+            fragments = getUnderreplicatedFragments(lh, conf.getAuditorLedgerVerificationPercentage());
             if (fragments.size() == 0) {
                 LOG.info("Ledger replicated successfully. ledger id is: " + ledgerIdToReplicate);
                 underreplicationManager.markLedgerReplicated(ledgerIdToReplicate);
@@ -346,10 +348,10 @@ private boolean isLastSegmentOpenAndMissingBookies(LedgerHandle lh) throws BKExc
     /**
      * Gets the under replicated fragments.
      */
-    private Set<LedgerFragment> getUnderreplicatedFragments(LedgerHandle lh)
+    private Set<LedgerFragment> getUnderreplicatedFragments(LedgerHandle lh, Long ledgerVerificationPercentage)
             throws InterruptedException {
         CheckerCallback checkerCb = new CheckerCallback();
-        ledgerChecker.checkLedger(lh, checkerCb);
+        ledgerChecker.checkLedger(lh, checkerCb, ledgerVerificationPercentage);
         Set<LedgerFragment> fragments = checkerCb.waitAndGetResult();
         return fragments;
     }
@@ -371,7 +373,8 @@ public void run() {
                         lh = admin.openLedger(ledgerId);
                     }
 
-                    Set<LedgerFragment> fragments = getUnderreplicatedFragments(lh);
+                    Set<LedgerFragment> fragments =
+                        getUnderreplicatedFragments(lh, conf.getAuditorLedgerVerificationPercentage());
                     for (LedgerFragment fragment : fragments) {
                         if (!fragment.isClosed()) {
                             lh = admin.openLedger(ledgerId);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
index 83bfd502e..25dfaf411 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
@@ -140,10 +140,10 @@ public void run() {
         final CountDownLatch checklatch = new CountDownLatch(1);
         final AtomicInteger numFragments = new AtomicInteger(-1);
         lc.checkLedger(lh2, new GenericCallback<Set<LedgerFragment>>() {
-                public void operationComplete(int rc, Set<LedgerFragment> fragments) {
-                    LOG.debug("Checked ledgers returned {} {}", rc, fragments);
+                public void operationComplete(int rc, Set<LedgerFragment> badFragments) {
+                    LOG.debug("Checked ledgers returned {} {}", rc, badFragments);
                     if (rc == BKException.Code.OK) {
-                        numFragments.set(fragments.size());
+                        numFragments.set(badFragments.size());
                     }
                     checklatch.countDown();
                 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerChecker.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerChecker.java
index 1dc2bb29f..20308e6f4 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerChecker.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerChecker.java
@@ -22,11 +22,12 @@
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
-
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
@@ -91,14 +92,10 @@ public void testChecker() throws Exception {
         for (LedgerFragment r : result) {
             LOG.info("unreplicated fragment: {}", r);
         }
+
         assertEquals("Should have one missing fragment", 1, result.size());
-        assertEquals("There should be 1 fragments. But returned fragments are "
-            + result, 1, result.size());
-        LedgerFragment lf = result.iterator().next();
-        assertEquals("There should be 1 failed bookies in first fragment " + lf,
-            1, lf.getBookiesIndexes().size());
-        assertEquals("Fragment should be missing from first replica",
-            lf.getAddress(0), replicaToKill);
+        assertTrue("Fragment should be missing from first replica",
+            result.iterator().next().getAddresses().contains(replicaToKill));
 
         BookieSocketAddress replicaToKill2 = lh.getLedgerMetadata()
                 .getEnsembles().get(0L).get(1);
@@ -110,16 +107,10 @@ public void testChecker() throws Exception {
         for (LedgerFragment r : result) {
             LOG.info("unreplicated fragment: {}", r);
         }
-        assertEquals("Should have two missing fragments", 2, result.size());
-        for (LedgerFragment fragment : result) {
-            if (fragment.getFirstEntryId() == 0L) {
-                assertEquals("There should be 2 failed bookies in first fragment",
-                    2, fragment.getBookiesIndexes().size());
-            } else {
-                assertEquals("There should be 1 failed bookies in second fragment",
-                    1, fragment.getBookiesIndexes().size());
-            }
-        }
+
+        AtomicInteger number = new AtomicInteger();
+        result.forEach(ledgerFragment -> number.addAndGet(ledgerFragment.getAddresses().size()));
+        assertEquals("Should have three missing fragments", 3, number.get());
     }
 
     /**


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services