You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/02/18 08:53:49 UTC

[GitHub] sijie closed pull request #1155: Issue #1152: stats for durability violations in write/read path

sijie closed pull request #1155: Issue #1152: stats for durability violations in write/read path
URL: https://github.com/apache/bookkeeper/pull/1155
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index ca24a99bd..d1ef7f3ff 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -120,6 +120,9 @@
     private OpStatsLogger recoverReadEntriesStats;
 
     private Counter speculativeReadCounter;
+    private Counter readOpDmCounter;
+    private Counter addOpUrCounter;
+
 
     // whether the event loop group is one we created, or is owned by whoever
     // instantiated us
@@ -1444,10 +1447,12 @@ private void initOpLoggers(StatsLogger stats) {
         openOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.OPEN_OP);
         recoverOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.RECOVER_OP);
         readOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.READ_OP);
+        readOpDmCounter = stats.getCounter(BookKeeperClientStats.READ_OP_DM);
         readLacAndEntryOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.READ_LAST_CONFIRMED_AND_ENTRY);
         readLacAndEntryRespLogger = stats.getOpStatsLogger(
                 BookKeeperClientStats.READ_LAST_CONFIRMED_AND_ENTRY_RESPONSE);
         addOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.ADD_OP);
+        addOpUrCounter = stats.getCounter(BookKeeperClientStats.ADD_OP_UR);
         writeLacOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.WRITE_LAC_OP);
         readLacOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.READ_LAC_OP);
         recoverAddEntriesStats = stats.getOpStatsLogger(BookKeeperClientStats.LEDGER_RECOVER_ADD_ENTRIES);
@@ -1492,7 +1497,12 @@ OpStatsLogger getRecoverAddCountLogger() {
     OpStatsLogger getRecoverReadCountLogger() {
         return recoverReadEntriesStats;
     }
-
+    Counter getReadOpDmCounter() {
+        return readOpDmCounter;
+    }
+    Counter getAddOpUrCounter() {
+        return addOpUrCounter;
+    }
     static EventLoopGroup getDefaultEventLoopGroup() {
         ThreadFactory threadFactory = new DefaultThreadFactory("bookkeeper-io");
         final int numThreads = Runtime.getRuntime().availableProcessors() * 2;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
index 60058e363..e3d84388d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
@@ -40,7 +40,10 @@
     // Data Operations
 
     String ADD_OP = "ADD_ENTRY";
+    String ADD_OP_UR = "ADD_ENTRY_UR"; // Under Replicated during AddEntry.
     String READ_OP = "READ_ENTRY";
+    // Corrupted entry (Digest Mismatch/ Under Replication) detected during ReadEntry
+    String READ_OP_DM = "READ_ENTRY_DM";
     String WRITE_LAC_OP = "WRITE_LAC";
     String READ_LAC_OP = "READ_LAC";
     String READ_LAST_CONFIRMED_AND_ENTRY = "READ_LAST_CONFIRMED_AND_ENTRY";
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index e386701d7..8d5d5b11a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -34,6 +34,7 @@
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.SafeRunnable;
@@ -69,6 +70,7 @@
     long timeoutNanos;
 
     OpStatsLogger addOpLogger;
+    Counter addOpUrCounter;
     long currentLedgerLength;
     int pendingWriteRequests;
     boolean callbackTriggered;
@@ -88,6 +90,7 @@ static PendingAddOp create(LedgerHandle lh, ByteBuf payload, AddCallback cb, Obj
         op.completed = false;
         op.ackSet = lh.distributionSchedule.getAckSet();
         op.addOpLogger = lh.bk.getAddOpLogger();
+        op.addOpUrCounter = lh.bk.getAddOpUrCounter();
         op.timeoutNanos = lh.bk.addEntryQuorumTimeoutNanos;
         op.pendingWriteRequests = 0;
         op.callbackTriggered = false;
@@ -256,6 +259,11 @@ public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddre
         }
 
         if (completed) {
+            if (rc != BKException.Code.OK) {
+                // Got an error after satisfying AQ. This means we are under replicated at the create itself.
+                // Update the stat to reflect it.
+                addOpUrCounter.inc();
+            }
             // even the add operation is completed, but because we don't reset completed flag back to false when
             // #unsetSuccessAndSendWriteRequest doesn't break ack quorum constraint. we still have current pending
             // add op is completed but never callback. so do a check here to complete again.
@@ -424,6 +432,7 @@ private void recyclePendAddOpObject() {
         lh = null;
         isRecoveryAdd = false;
         addOpLogger = null;
+        addOpUrCounter = null;
         completed = false;
         pendingWriteRequests = 0;
         callbackTriggered = false;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index 58ee31dcd..0276ef4d2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -72,6 +72,7 @@
     long endEntryId;
     long requestTimeNanos;
     OpStatsLogger readOpLogger;
+    Counter readOpDmCounter;
     private final Counter speculativeReadCounter;
 
     final int requiredBookiesMissingEntryForRecovery;
@@ -134,6 +135,7 @@ boolean complete(int bookieIndex, BookieSocketAddress host, final ByteBuf buffer
             try {
                 content = lh.macManager.verifyDigestAndReturnData(entryImpl.getEntryId(), buffer);
             } catch (BKDigestMatchException e) {
+                readOpDmCounter.inc();
                 logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch", BKException.Code.DigestMatchException);
                 buffer.release();
                 return false;
@@ -478,6 +480,7 @@ boolean complete(int bookieIndex, BookieSocketAddress host, ByteBuf buffer) {
         heardFromHostsBitSet = new BitSet(getLedgerMetadata().getEnsembleSize());
 
         readOpLogger = lh.bk.getReadOpLogger();
+        readOpDmCounter = lh.bk.getReadOpDmCounter();
         speculativeReadCounter = lh.bk.getSpeculativeReadCounter();
     }
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
index bae15b81a..05c42d3cb 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
@@ -20,12 +20,17 @@
  */
 package org.apache.bookkeeper.client;
 
+import static org.apache.bookkeeper.client.BookKeeperClientStats.ADD_OP;
+import static org.apache.bookkeeper.client.BookKeeperClientStats.ADD_OP_UR;
+import static org.apache.bookkeeper.client.BookKeeperClientStats.CLIENT_SCOPE;
+import static org.apache.bookkeeper.client.BookKeeperClientStats.READ_OP_DM;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
+import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Enumeration;
@@ -38,12 +43,16 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.BKException.BKLedgerClosedException;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LongHierarchicalLedgerManagerFactory;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.zookeeper.KeeperException;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -136,6 +145,100 @@ public void testWithMultipleBookieFailuresInLastEnsemble() throws Exception {
 
         int i = numEntriesToWrite;
         numEntriesToWrite = numEntriesToWrite + 50;
+        for (; i < numEntriesToWrite; i++) {
+            ByteBuffer entry = ByteBuffer.allocate(4);
+            entry.putInt(rng.nextInt(maxInt));
+            entry.position(0);
+
+            entries1.add(entry.array());
+            lh.addEntry(entry.array());
+        }
+        readEntries(lh, entries1);
+        lh.close();
+    }
+
+    /**
+     * Verify write and Read durability stats.
+     */
+    @Test
+    public void testWriteAndReadStats() throws Exception {
+        // Create a ledger
+        lh = bkc.createLedger(3, 3, 2, digestType, ledgerPassword);
+
+        // write-batch-1
+        for (int i = 0; i < numEntriesToWrite; i++) {
+            ByteBuffer entry = ByteBuffer.allocate(4);
+            entry.putInt(rng.nextInt(maxInt));
+            entry.position(0);
+
+            entries1.add(entry.array());
+            lh.addEntry(entry.array());
+        }
+        assertTrue(
+                "Stats should have captured a new writes",
+                bkc.getTestStatsProvider().getOpStatsLogger(
+                        CLIENT_SCOPE + "." + ADD_OP)
+                        .getSuccessCount() > 0);
+
+        CountDownLatch sleepLatch1 = new CountDownLatch(1);
+        CountDownLatch sleepLatch2 = new CountDownLatch(1);
+        ArrayList<BookieSocketAddress> ensemble = lh.getLedgerMetadata()
+                .getEnsembles().entrySet().iterator().next().getValue();
+
+        sleepBookie(ensemble.get(0), sleepLatch1);
+
+        int i = numEntriesToWrite;
+        numEntriesToWrite = numEntriesToWrite + 50;
+
+        // write-batch-2
+
+        for (; i < numEntriesToWrite; i++) {
+            ByteBuffer entry = ByteBuffer.allocate(4);
+            entry.putInt(rng.nextInt(maxInt));
+            entry.position(0);
+
+            entries1.add(entry.array());
+            lh.addEntry(entry.array());
+        }
+
+        // Let the second bookie go to sleep. This forces write timeout and ensemble change
+        // Which will be enough time to receive delayed write failures on the write-batch-2
+
+        sleepBookie(ensemble.get(1), sleepLatch2);
+        i = numEntriesToWrite;
+        numEntriesToWrite = numEntriesToWrite + 50;
+
+        // write-batch-3
+
+        for (; i < numEntriesToWrite; i++) {
+            ByteBuffer entry = ByteBuffer.allocate(4);
+            entry.putInt(rng.nextInt(maxInt));
+            entry.position(0);
+
+            entries1.add(entry.array());
+            lh.addEntry(entry.array());
+        }
+
+        assertTrue(
+                "Stats should have captured a new UnderReplication during write",
+                bkc.getTestStatsProvider().getCounter(
+                        CLIENT_SCOPE + "." + ADD_OP_UR)
+                        .get() > 0);
+
+        sleepLatch1.countDown();
+        sleepLatch2.countDown();
+
+        // Replace the bookie with a fake bookie
+        ServerConfiguration conf = killBookie(ensemble.get(0));
+        BookieWriteLedgerTest.CorruptReadBookie corruptBookie = new BookieWriteLedgerTest.CorruptReadBookie(conf);
+        bs.add(startBookie(conf, corruptBookie));
+        bsConfs.add(conf);
+
+        i = numEntriesToWrite;
+        numEntriesToWrite = numEntriesToWrite + 50;
+
+        // write-batch-4
+
         for (; i < numEntriesToWrite; i++) {
             ByteBuffer entry = ByteBuffer.allocate(4);
             entry.putInt(rng.nextInt(maxInt));
@@ -146,6 +249,11 @@ public void testWithMultipleBookieFailuresInLastEnsemble() throws Exception {
         }
 
         readEntries(lh, entries1);
+        assertTrue(
+                "Stats should have captured DigestMismatch on Read",
+                bkc.getTestStatsProvider().getCounter(
+                        CLIENT_SCOPE + "." + READ_OP_DM)
+                        .get() > 0);
         lh.close();
     }
 
@@ -992,4 +1100,28 @@ public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
             x.notify();
         }
     }
+
+    static class CorruptReadBookie extends Bookie {
+
+        static final Logger LOG = LoggerFactory.getLogger(CorruptReadBookie.class);
+        ByteBuf localBuf;
+
+        public CorruptReadBookie(ServerConfiguration conf)
+                throws IOException, KeeperException, InterruptedException, BookieException {
+            super(conf);
+        }
+
+        @Override
+        public ByteBuf readEntry(long ledgerId, long entryId) throws IOException, NoLedgerException {
+            localBuf = super.readEntry(ledgerId, entryId);
+
+            int capacity = 0;
+            while (capacity < localBuf.capacity()) {
+                localBuf.setByte(capacity, 0);
+                capacity++;
+            }
+            return localBuf;
+        }
+
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services