You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/02/26 08:23:20 UTC

[GitHub] sijie closed pull request #1196: Issue #1195 Handle optional fields gracefully in ReadExplicitLac response

sijie closed pull request #1196: Issue #1195 Handle optional fields gracefully in ReadExplicitLac response
URL: https://github.com/apache/bookkeeper/pull/1196
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 7047b37bb..8d57117d1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -983,7 +983,7 @@ public void asyncAddEntry(ByteBuf data, final AddCallback cb, final Object ctx)
     /**
      * Add entry asynchronously to an open ledger, using an offset and range.
      * This can be used only with {@link LedgerHandleAdv} returned through
-     * ledgers created with {@link BookKeeper#createLedgerAdv(int, int, int, DigestType, byte[])}.
+     * ledgers created with {@link createLedgerAdv(int, int, int, DigestType, byte[])}.
      *
      * @param entryId
      *            entryId of the entry to add.
@@ -1404,11 +1404,12 @@ public long tryReadLastConfirmed() throws InterruptedException, BKException {
 
     /**
      * Obtains asynchronously the explicit last add confirmed from a quorum of
-     * bookies. This call obtains the the explicit last add confirmed each
-     * bookie has received for this ledger and returns the maximum. If in the
-     * write LedgerHandle, explicitLAC feature is not enabled then this will
-     * return {@link #INVALID_ENTRY_ID INVALID_ENTRY_ID}. If the read explicit
-     * lastaddconfirmed is greater than getLastAddConfirmed, then it updates the
+     * bookies. This call obtains Explicit LAC value and piggy-backed LAC value (just like
+     * {@Link #asyncReadLastConfirmed(ReadLastConfirmedCallback, Object)}) from each
+     * bookie in the ensemble and returns the maximum.
+     * If in the write LedgerHandle, explicitLAC feature is not enabled then this call behavior
+     * will be similar to {@Link #asyncReadLastConfirmed(ReadLastConfirmedCallback, Object)}.
+     * If the read explicit lastaddconfirmed is greater than getLastAddConfirmed, then it updates the
      * lastAddConfirmed of this ledgerhandle. If the ledger has been closed, it
      * returns the value of the last add confirmed from the metadata.
      *
@@ -1449,13 +1450,13 @@ public void getLacComplete(int rc, long lac) {
         new PendingReadLacOp(this, innercb).initiate();
     }
 
-    /**
+    /*
      * Obtains synchronously the explicit last add confirmed from a quorum of
-     * bookies. This call obtains the the explicit last add confirmed each
-     * bookie has received for this ledger and returns the maximum. If in the
-     * write LedgerHandle, explicitLAC feature is not enabled then this will
-     * return {@link #INVALID_ENTRY_ID INVALID_ENTRY_ID}. If the read explicit
-     * lastaddconfirmed is greater than getLastAddConfirmed, then it updates the
+     * bookies. This call obtains Explicit LAC value and piggy-backed LAC value (just like
+     * {@Link #readLastAddConfirmed()) from each bookie in the ensemble and returns the maximum.
+     * If in the write LedgerHandle, explicitLAC feature is not enabled then this call behavior
+     * will be similar to {@Link #readLastAddConfirmed()}.
+     * If the read explicit lastaddconfirmed is greater than getLastAddConfirmed, then it updates the
      * lastAddConfirmed of this ledgerhandle. If the ledger has been closed, it
      * returns the value of the last add confirmed from the metadata.
      *
@@ -1463,8 +1464,7 @@ public void getLacComplete(int rc, long lac) {
      *
      * @return The entry id of the explicit last confirmed write or
      *         {@link #INVALID_ENTRY_ID INVALID_ENTRY_ID} if no entry has been
-     *         confirmed or if explicitLAC feature is not enabled in write
-     *         LedgerHandle.
+     *         confirmed.
      * @throws InterruptedException
      * @throws BKException
      */
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
index ecdbc5d9f..db87d8940 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
@@ -94,17 +94,23 @@ public void readLacComplete(int rc, long ledgerId, final ByteBuf lacBuffer, fina
                 // This routine picks both of them and compares to return
                 // the latest Lac.
 
+                // lacBuffer and lastEntryBuffer are optional in the protocol.
+                // So check if they exist before processing them.
+
                 // Extract lac from FileInfo on the ledger.
-                long lac = lh.macManager.verifyDigestAndReturnLac(lacBuffer);
-                if (lac > maxLac) {
-                    maxLac = lac;
+                if (lacBuffer != null && lacBuffer.readableBytes() > 0) {
+                    long lac = lh.macManager.verifyDigestAndReturnLac(lacBuffer);
+                    if (lac > maxLac) {
+                        maxLac = lac;
+                    }
                 }
-
                 // Extract lac from last entry on the disk
-                RecoveryData recoveryData = lh.macManager.verifyDigestAndReturnLastConfirmed(lastEntryBuffer);
-                long recoveredLac = recoveryData.getLastAddConfirmed();
-                if (recoveredLac > maxLac) {
-                    maxLac = recoveredLac;
+                if (lastEntryBuffer != null && lastEntryBuffer.readableBytes() > 0) {
+                    RecoveryData recoveryData = lh.macManager.verifyDigestAndReturnLastConfirmed(lastEntryBuffer);
+                    long recoveredLac = recoveryData.getLastAddConfirmed();
+                    if (recoveredLac > maxLac) {
+                        maxLac = recoveredLac;
+                    }
                 }
                 heardValidResponse = true;
             } catch (BKDigestMatchException e) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
index 85c4a3302..898ddb041 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
@@ -68,13 +68,9 @@ private ReadLacResponse getReadLacResponse() {
         ByteBuf lastEntry = null;
         ByteBuf lac = null;
         try {
-            lastEntry = requestProcessor.bookie.readEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED);
             lac = requestProcessor.bookie.getExplicitLac(ledgerId);
             if (lac != null) {
                 readLacResponse.setLacBody(ByteString.copyFrom(lac.nioBuffer()));
-                readLacResponse.setLastEntryBody(ByteString.copyFrom(lastEntry.nioBuffer()));
-            } else {
-                status = StatusCode.ENOENTRY;
             }
         } catch (Bookie.NoLedgerException e) {
             status = StatusCode.ENOLEDGER;
@@ -83,10 +79,28 @@ private ReadLacResponse getReadLacResponse() {
             status = StatusCode.EIO;
             logger.error("IOException while performing readLac from ledger: {}", ledgerId);
         } finally {
-            ReferenceCountUtil.release(lastEntry);
             ReferenceCountUtil.release(lac);
         }
 
+        try {
+            lastEntry = requestProcessor.bookie.readEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED);
+            if (lastEntry != null) {
+                readLacResponse.setLastEntryBody(ByteString.copyFrom(lastEntry.nioBuffer()));
+            }
+        } catch (Bookie.NoLedgerException e) {
+            status = StatusCode.ENOLEDGER;
+            logger.error("No ledger found while trying to read last entry: {}", ledgerId, e);
+        } catch (IOException e) {
+            status = StatusCode.EIO;
+            logger.error("IOException while trying to read last entry: {}", ledgerId, e);
+        } finally {
+            ReferenceCountUtil.release(lastEntry);
+        }
+
+        if ((lac == null) && (lastEntry == null)) {
+            status = StatusCode.ENOENTRY;
+        }
+
         if (status == StatusCode.EOK) {
             requestProcessor.readLacStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
                     TimeUnit.NANOSECONDS);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
index 2a0e38454..992201694 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
@@ -346,13 +346,6 @@ public void testReadHandleWithNoExplicitLAC() throws Exception {
         }
 
         Thread.sleep(3000);
-        // since explicitlacflush policy is not enabled for writeledgerhandle, when we try
-        // to read explicitlac for rlh, it will be LedgerHandle.INVALID_ENTRY_ID. But it
-        // wont throw some exception.
-        long explicitlac = rlh.readExplicitLastConfirmed();
-        assertTrue("Expected Explicit LAC of rlh: " + LedgerHandle.INVALID_ENTRY_ID
-                + " actual ExplicitLAC of rlh: " + explicitlac,
-                (explicitlac == LedgerHandle.INVALID_ENTRY_ID));
         assertTrue(
                 "Expected LAC of wlh: " + (2 * numOfEntries - 1) + " actual LAC of rlh: " + wlh.getLastAddConfirmed(),
                 (wlh.getLastAddConfirmed() == (2 * numOfEntries - 1)));
@@ -360,9 +353,16 @@ public void testReadHandleWithNoExplicitLAC() throws Exception {
                 "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(),
                 (rlh.getLastAddConfirmed() == (numOfEntries - 2)));
 
+        // since explicitlacflush policy is not enabled for writeledgerhandle, when we try
+        // to read explicitlac for rlh, it will be reading up to the piggyback value.
+        long explicitlac = rlh.readExplicitLastConfirmed();
+        assertTrue(
+                "Expected Explicit LAC of rlh: " + (numOfEntries - 2) + " actual ExplicitLAC of rlh: " + explicitlac,
+                (explicitlac == (2 * numOfEntries - 2)));
+
         try {
-            rlh.readEntries(numOfEntries - 1, numOfEntries - 1);
-            fail("rlh readEntries beyond " + (numOfEntries - 2) + " should fail with ReadException");
+            rlh.readEntries(2 * numOfEntries - 1, 2 * numOfEntries - 1);
+            fail("rlh readEntries beyond " + (2 * numOfEntries - 2) + " should fail with ReadException");
         } catch (BKException.BKReadException readException) {
         }
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java
new file mode 100644
index 000000000..6e638105f
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java
@@ -0,0 +1,112 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import static org.junit.Assert.assertEquals;
+
+import io.netty.buffer.Unpooled;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.util.ByteBufList;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests PendingReadLacOp internals.
+ */
+public class TestPendingReadLacOp extends BookKeeperClusterTestCase {
+    private static final Logger LOG = LoggerFactory.getLogger(TestPendingReadLacOp.class);
+    byte pwd[] = "asdf".getBytes();
+    byte data[] = "foo".getBytes();
+
+    public TestPendingReadLacOp() {
+        super(3);
+    }
+
+    @Test
+    public void testPendingReadLacOpMissingExplicitLAC() throws Exception {
+        LedgerHandle lh = bkc.createLedger(3, 3, 2, BookKeeper.DigestType.CRC32, pwd);
+        lh.append(data);
+        lh.append(data);
+        lh.append(data);
+
+        final CompletableFuture<Long> result = new CompletableFuture<>();
+        PendingReadLacOp pro = new PendingReadLacOp(lh, (rc, lac) -> result.complete(lac)) {
+            @Override
+            public void initiate() {
+                for (int i = 0; i < lh.metadata.currentEnsemble.size(); i++) {
+                    final int index = i;
+                    ByteBufList buffer = lh.getDigestManager().computeDigestAndPackageForSending(
+                            2,
+                            1,
+                            data.length,
+                            Unpooled.wrappedBuffer(data));
+                    bkc.scheduler.schedule(() -> {
+                        readLacComplete(
+                                0,
+                                lh.getId(),
+                                null,
+                                Unpooled.copiedBuffer(buffer.toArray()),
+                                index);
+
+                    }, 0, TimeUnit.SECONDS);
+                    lh.bk.getBookieClient().readLac(lh.metadata.currentEnsemble.get(i),
+                            lh.ledgerId, this, i);
+                }
+            }
+        };
+        pro.initiate();
+        assertEquals(1, result.get().longValue());
+    }
+
+    @Test
+    public void testPendingReadLacOpMissingLAC() throws Exception {
+        LedgerHandle lh = bkc.createLedger(3, 3, 2, BookKeeper.DigestType.MAC, pwd);
+        lh.append(data);
+        lh.append(data);
+        lh.append(data);
+
+        final CompletableFuture<Long> result = new CompletableFuture<>();
+        PendingReadLacOp pro = new PendingReadLacOp(lh, (rc, lac) -> result.complete(lac)) {
+            @Override
+            public void initiate() {
+                for (int i = 0; i < lh.metadata.currentEnsemble.size(); i++) {
+                    final int index = i;
+                    ByteBufList buffer = lh.getDigestManager().computeDigestAndPackageForSendingLac(1);
+                    bkc.scheduler.schedule(() -> {
+                        readLacComplete(
+                                0,
+                                lh.getId(),
+                                buffer.getBuffer(0),
+                                null,
+                                index);
+                    }, 0, TimeUnit.SECONDS);
+                    lh.bk.getBookieClient().readLac(lh.metadata.currentEnsemble.get(i),
+                            lh.ledgerId, this, i);
+                }
+            }
+        };
+        pro.initiate();
+        assertEquals(result.get().longValue(), 1);
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services