You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2017/06/05 20:13:38 UTC

bookkeeper git commit: BOOKKEEPER-1088: Ledger Recovery (part-3) - Add a ReadEntryListener to callback on individual request

Repository: bookkeeper
Updated Branches:
  refs/heads/master b3b958c2b -> 5fe86525a


BOOKKEEPER-1088: Ledger Recovery (part-3) - Add a ReadEntryListener to callback on individual request

THIS CHANGE IS BASED ON #177  (you can review 868a3c8 for the only change that belongs to BOOKKEEPER-1088).

bookkeeper recovery improvement (part-3): add a ReadEntryListener to callback on individual request.

- add read entry listener which allow doing batch read, but callback on individual entries in sequence. so in recovery op, we could issue batch reads, then on each individual callback do add entry and stop when received NoSuchEntry.

Author: Sijie Guo <si...@apache.org>
Author: Sijie Guo <si...@twitter.com>

Reviewers: Enrico Olivelli <eo...@gmail.com>, Jia Zhai <None>, Venkateswararao Jujjuri (JV) <None>

Closes #178 from sijie/recovery_improvements_part3 and squashes the following commits:

dd24faf [Sijie Guo] Merge branch 'master' into recovery_improvements_part3
2e1ebb9 [Sijie Guo] Merge branch 'master' into recovery_improvements_part3
8b8a3c8 [Sijie Guo] bookkeeper recovery improvement (part-3): add a ReadEntryListener to callback on individual request.
db3e98b [Sijie Guo] Address conflicts
f0fb89c [Sijie Guo] bookkeeper recovery improvement (part-2): add a parallel reading request in PendingReadOp
80ffc6c [Sijie Guo] Address conflicts
3db0b84 [Sijie Guo] bookkeeper recovery improvement (part-1): refactor PendingReadOp


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/5fe86525
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/5fe86525
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/5fe86525

Branch: refs/heads/master
Commit: 5fe86525a9c823f79b3e97fd82ea4aa1c75c79eb
Parents: b3b958c
Author: Sijie Guo <si...@apache.org>
Authored: Mon Jun 5 13:13:31 2017 -0700
Committer: Sijie Guo <si...@apache.org>
Committed: Mon Jun 5 13:13:31 2017 -0700

----------------------------------------------------------------------
 .../client/ListenerBasedPendingReadOp.java      |  70 +++++
 .../apache/bookkeeper/client/PendingReadOp.java |  38 ++-
 .../proto/BookkeeperInternalCallbacks.java      |  21 ++
 .../client/TestReadEntryListener.java           | 304 +++++++++++++++++++
 4 files changed, 423 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5fe86525/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java
new file mode 100644
index 0000000..69221d0
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java
@@ -0,0 +1,70 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import java.util.NoSuchElementException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryListener;
+import org.apache.bookkeeper.util.MathUtils;
+
+class ListenerBasedPendingReadOp extends PendingReadOp {
+
+    final ReadEntryListener listener;
+
+    ListenerBasedPendingReadOp(LedgerHandle lh, ScheduledExecutorService scheduler,
+                               long startEntryId, long endEntryId,
+                               ReadEntryListener listener, Object ctx) {
+        super(lh, scheduler, startEntryId, endEntryId, null, ctx);
+        this.listener = listener;
+    }
+
+    @Override
+    protected void submitCallback(int code) {
+        LedgerEntryRequest request;
+        while ((request = seq.peek()) != null) {
+            if (!request.isComplete()) {
+                return;
+            }
+            seq.remove();
+            long latencyNanos = MathUtils.elapsedNanos(requestTimeNanos);
+            if (BKException.Code.OK == request.getRc()) {
+                readOpLogger.registerSuccessfulEvent(latencyNanos, TimeUnit.NANOSECONDS);
+            } else {
+                readOpLogger.registerFailedEvent(latencyNanos, TimeUnit.NANOSECONDS);
+            }
+            // callback with completed entry
+            listener.onEntryComplete(request.getRc(), lh, request, ctx);
+        }
+        // if all entries are already completed.
+        cancelSpeculativeTask(true);
+    }
+
+    @Override
+    public boolean hasMoreElements() {
+        return false;
+    }
+
+    @Override
+    public LedgerEntry nextElement() throws NoSuchElementException {
+        throw new NoSuchElementException();
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5fe86525/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index 7b01b7f..6f97b9b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -78,6 +78,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
 
         final AtomicBoolean complete = new AtomicBoolean(false);
 
+        int rc = BKException.Code.OK;
         int firstError = BKException.Code.OK;
         int numMissedEntryReads = 0;
 
@@ -117,6 +118,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
             }
 
             if (!complete.getAndSet(true)) {
+                rc = BKException.Code.OK;
                 /*
                  * The length is a long and it is the last field of the metadata of an entry.
                  * Consequently, we have to subtract 8 from METADATA_LENGTH to get the length.
@@ -139,6 +141,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
          */
         boolean fail(int rc) {
             if (complete.compareAndSet(false, true)) {
+                this.rc = rc;
                 submitCallback(rc);
                 return true;
             } else {
@@ -203,6 +206,15 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
             return complete.get();
         }
 
+        /**
+         * Get result code of this entry.
+         *
+         * @return result code.
+         */
+        int getRc() {
+            return rc;
+        }
+
         @Override
         public String toString() {
             return String.format("L%d-E%d", ledgerId, entryId);
@@ -404,7 +416,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
         return lh.metadata;
     }
 
-    private void cancelSpeculativeTask(boolean mayInterruptIfRunning) {
+    protected void cancelSpeculativeTask(boolean mayInterruptIfRunning) {
         if (speculativeTask != null) {
             speculativeTask.cancel(mayInterruptIfRunning);
             speculativeTask = null;
@@ -470,9 +482,11 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
             }
             seq.add(entry);
             i++;
-
-            entry.read();
         } while (i <= endEntryId);
+        // read the entries.
+        for (LedgerEntryRequest entry : seq) {
+            entry.read();
+        }
     }
 
     private static class ReadContext {
@@ -507,19 +521,23 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
         heardFromHosts.add(rctx.to);
 
         if (entry.complete(rctx.to, buffer)) {
-            numPendingEntries--;
-            if (numPendingEntries == 0) {
-                submitCallback(BKException.Code.OK);
-            }
+            submitCallback(BKException.Code.OK);
         }
 
         if(numPendingEntries < 0)
             LOG.error("Read too many values");
     }
 
-    private void submitCallback(int code) {
-        if (cb == null) {
-            // Callback had already been triggered before
+    protected void submitCallback(int code) {
+        if (BKException.Code.OK == code) {
+            numPendingEntries--;
+            if (numPendingEntries != 0) {
+                return;
+            }
+        }
+
+        // ensure callback once
+        if (!complete.compareAndSet(false, true)) {
             return;
         }
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5fe86525/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
index 58fd451..ddea8b7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
@@ -26,6 +26,8 @@ import io.netty.buffer.ByteBuf;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.LedgerMetadata;
 import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
 import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -95,6 +97,25 @@ public class BookkeeperInternalCallbacks {
     }
 
     /**
+     * Listener on entries responded.
+     */
+    public interface ReadEntryListener {
+        /**
+         * On given <i>entry</i> completed.
+         *
+         * @param rc
+         *          result code of reading this entry.
+         * @param lh
+         *          ledger handle.
+         * @param entry
+         *          ledger entry.
+         * @param ctx
+         *          callback context.
+         */
+        void onEntryComplete(int rc, LedgerHandle lh, LedgerEntry entry, Object ctx);
+    }
+
+    /**
      * This is a multi callback object that waits for all of
      * the multiple async operations to complete. If any fail, then we invoke
      * the final callback with a provided failureRc

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5fe86525/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadEntryListener.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadEntryListener.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadEntryListener.java
new file mode 100644
index 0000000..d3ef194
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadEntryListener.java
@@ -0,0 +1,304 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryListener;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Unit tests for {@link org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryListener}.
+ */
+public class TestReadEntryListener extends BookKeeperClusterTestCase {
+
+    static Logger LOG = LoggerFactory.getLogger(TestReadEntryListener.class);
+
+    final DigestType digestType;
+    final byte[] passwd = "read-entry-listener".getBytes();
+
+    public TestReadEntryListener() {
+        super(6);
+        this.digestType = DigestType.CRC32;
+    }
+
+    long getLedgerToRead(int ensemble, int writeQuorum, int ackQuorum, int numEntries)
+            throws Exception {
+        LedgerHandle lh = bkc.createLedger(ensemble, writeQuorum, ackQuorum, digestType, passwd);
+        for (int i = 0; i < numEntries; i++) {
+            lh.addEntry(("" + i).getBytes());
+        }
+        lh.close();
+        return lh.getId();
+    }
+
+    static class EntryWithRC {
+        final LedgerEntry entry;
+        final int rc;
+
+        EntryWithRC(int rc, LedgerEntry entry) {
+            this.rc = rc;
+            this.entry = entry;
+        }
+    }
+
+    static class LatchListener implements ReadEntryListener {
+
+        final CountDownLatch l;
+        final Map<Long, EntryWithRC> resultCodes;
+        boolean inOrder = true;
+        long nextEntryId;
+
+        LatchListener(long startEntryId, int numEntries) {
+            l = new CountDownLatch(numEntries);
+            resultCodes = new HashMap<Long, EntryWithRC>();
+            this.nextEntryId = startEntryId;
+        }
+
+        @Override
+        public void onEntryComplete(int rc, LedgerHandle lh, LedgerEntry entry, Object ctx) {
+            if (nextEntryId != entry.getEntryId()) {
+                inOrder = false;
+            }
+            ++nextEntryId;
+            resultCodes.put(entry.getEntryId(), new EntryWithRC(rc, entry));
+            l.countDown();
+        }
+
+        void expectComplete() throws Exception {
+            l.await();
+        }
+
+        boolean isInOrder() {
+            return inOrder;
+        }
+    }
+
+    void basicReadTest(boolean parallelRead) throws Exception {
+        int numEntries = 10;
+
+        long id = getLedgerToRead(5, 2, 2, numEntries);
+        LedgerHandle lh = bkc.openLedger(id, digestType, passwd);
+
+        // read single entry
+        for (int i = 0; i < numEntries; i++) {
+            LatchListener listener = new LatchListener(i, 1);
+            ListenerBasedPendingReadOp readOp =
+                    new ListenerBasedPendingReadOp(lh, lh.bk.scheduler, i, i, listener, null);
+            readOp.parallelRead(parallelRead).initiate();
+            listener.expectComplete();
+            assertEquals(1, listener.resultCodes.size());
+            EntryWithRC entry = listener.resultCodes.get((long) i);
+            assertNotNull(entry);
+            assertEquals(BKException.Code.OK, entry.rc);
+            assertEquals(i, Integer.parseInt(new String(entry.entry.getEntry())));
+            assertTrue(listener.isInOrder());
+        }
+
+        // read multiple entries
+        LatchListener listener = new LatchListener(0L, numEntries);
+        ListenerBasedPendingReadOp readOp =
+                new ListenerBasedPendingReadOp(lh, lh.bk.scheduler, 0, numEntries - 1, listener, null);
+        readOp.parallelRead(parallelRead).initiate();
+        listener.expectComplete();
+        assertEquals(numEntries, listener.resultCodes.size());
+        for (int i = 0; i < numEntries; i++) {
+            EntryWithRC entry = listener.resultCodes.get((long) i);
+            assertNotNull(entry);
+            assertEquals(BKException.Code.OK, entry.rc);
+            assertEquals(i, Integer.parseInt(new String(entry.entry.getEntry())));
+        }
+        assertTrue(listener.isInOrder());
+
+        lh.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testBasicEnableParallelRead() throws Exception {
+        basicReadTest(true);
+    }
+
+    @Test(timeout = 60000)
+    public void testBasicDisableParallelRead() throws Exception {
+        basicReadTest(false);
+    }
+
+    private void readMissingEntriesTest(boolean parallelRead) throws Exception {
+        int numEntries = 10;
+
+        long id = getLedgerToRead(5, 2, 2, numEntries);
+        LedgerHandle lh = bkc.openLedger(id, digestType, passwd);
+
+        // read single entry
+        LatchListener listener = new LatchListener(11L, 1);
+        ListenerBasedPendingReadOp readOp =
+                new ListenerBasedPendingReadOp(lh, lh.bk.scheduler, 11, 11, listener, null);
+        readOp.parallelRead(parallelRead).initiate();
+        listener.expectComplete();
+        assertEquals(1, listener.resultCodes.size());
+        EntryWithRC entry = listener.resultCodes.get(11L);
+        assertNotNull(entry);
+        assertEquals(BKException.Code.NoSuchEntryException, entry.rc);
+        assertTrue(listener.isInOrder());
+
+        // read multiple missing entries
+        listener = new LatchListener(11L, 3);
+        readOp = new ListenerBasedPendingReadOp(lh, lh.bk.scheduler, 11, 13, listener, null);
+        readOp.parallelRead(parallelRead).initiate();
+        listener.expectComplete();
+        assertEquals(3, listener.resultCodes.size());
+        assertTrue(listener.isInOrder());
+
+        for (int i = 11; i <= 13; i++) {
+            entry = listener.resultCodes.get((long) i);
+            assertNotNull(entry);
+            assertEquals(BKException.Code.NoSuchEntryException, entry.rc);
+        }
+
+        // read multiple entries with missing entries
+        listener = new LatchListener(5L, 10);
+        readOp = new ListenerBasedPendingReadOp(lh, lh.bk.scheduler, 5L, 14L, listener, null);
+        readOp.parallelRead(parallelRead).initiate();
+        listener.expectComplete();
+        assertEquals(10, listener.resultCodes.size());
+        assertTrue(listener.isInOrder());
+
+        for (long i = 5L; i <= 14L; i++) {
+            entry = listener.resultCodes.get(i);
+            assertNotNull(entry);
+            if (i < 10L) {
+                assertEquals(BKException.Code.OK, entry.rc);
+                assertEquals(i, Integer.parseInt(new String(entry.entry.getEntry())));
+            } else {
+                assertEquals(BKException.Code.NoSuchEntryException, entry.rc);
+            }
+        }
+
+        lh.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testReadMissingEntriesEnableParallelRead() throws Exception {
+        readMissingEntriesTest(true);
+    }
+
+    @Test(timeout = 60000)
+    public void testReadMissingEntriesDisableParallelRead() throws Exception {
+        readMissingEntriesTest(false);
+    }
+
+    private void readWithFailedBookiesTest(boolean parallelRead) throws Exception {
+        int numEntries = 10;
+
+        long id = getLedgerToRead(5, 3, 3, numEntries);
+
+        LedgerHandle lh = bkc.openLedger(id, digestType, passwd);
+
+        ArrayList<BookieSocketAddress> ensemble =
+                lh.getLedgerMetadata().getEnsemble(5);
+        // kill two bookies
+        killBookie(ensemble.get(0));
+        killBookie(ensemble.get(1));
+
+        // read multiple entries
+        LatchListener listener = new LatchListener(0L, numEntries);
+        ListenerBasedPendingReadOp readOp =
+                new ListenerBasedPendingReadOp(lh, lh.bk.scheduler, 0, numEntries - 1, listener, null);
+        readOp.parallelRead(parallelRead).initiate();
+        listener.expectComplete();
+        assertEquals(numEntries, listener.resultCodes.size());
+        for (int i = 0; i < numEntries; i++) {
+            EntryWithRC entry = listener.resultCodes.get((long) i);
+            assertNotNull(entry);
+            assertEquals(BKException.Code.OK, entry.rc);
+            assertEquals(i, Integer.parseInt(new String(entry.entry.getEntry())));
+        }
+
+        lh.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testReadWithFailedBookiesEnableParallelRead() throws Exception {
+        readWithFailedBookiesTest(true);
+    }
+
+    @Test(timeout = 60000)
+    public void testReadWithFailedBookiesDisableParallelRead() throws Exception {
+        readWithFailedBookiesTest(false);
+    }
+
+    private void readFailureWithFailedBookiesTest(boolean parallelRead) throws Exception {
+        int numEntries = 10;
+
+        long id = getLedgerToRead(5, 3, 3, numEntries);
+
+        LedgerHandle lh = bkc.openLedger(id, digestType, passwd);
+
+        ArrayList<BookieSocketAddress> ensemble =
+                lh.getLedgerMetadata().getEnsemble(5);
+        // kill bookies
+        killBookie(ensemble.get(0));
+        killBookie(ensemble.get(1));
+        killBookie(ensemble.get(2));
+
+        // read multiple entries
+        LatchListener listener = new LatchListener(0L, numEntries);
+        ListenerBasedPendingReadOp readOp =
+                new ListenerBasedPendingReadOp(lh, lh.bk.scheduler, 0, numEntries - 1, listener, null);
+        readOp.parallelRead(parallelRead).initiate();
+        listener.expectComplete();
+        assertEquals(numEntries, listener.resultCodes.size());
+        for (int i = 0; i < numEntries; i++) {
+            EntryWithRC entry = listener.resultCodes.get((long) i);
+            assertNotNull(entry);
+            if (i % 5 == 0) {
+                assertEquals(BKException.Code.BookieHandleNotAvailableException, entry.rc);
+            } else {
+                assertEquals(BKException.Code.OK, entry.rc);
+                assertEquals(i, Integer.parseInt(new String(entry.entry.getEntry())));
+            }
+        }
+
+        lh.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testReadFailureWithFailedBookiesEnableParallelRead() throws Exception {
+        readWithFailedBookiesTest(true);
+    }
+
+    @Test(timeout = 60000)
+    public void testReadFailureWithFailedBookiesDisableParallelRead() throws Exception {
+        readWithFailedBookiesTest(false);
+    }
+}