You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/06/14 09:13:48 UTC

[bookkeeper] branch branch-4.7 updated: ISSUE #1476: LedgerEntry is recycled twice at ReadLastConfirmedAndEntryOp

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/branch-4.7 by this push:
     new c7b1610  ISSUE #1476: LedgerEntry is recycled twice at ReadLastConfirmedAndEntryOp
c7b1610 is described below

commit c7b1610bb0dac9d46b80e5f607248b9b391ec267
Author: Sijie Guo <si...@apache.org>
AuthorDate: Thu Jun 14 02:13:22 2018 -0700

    ISSUE #1476: LedgerEntry is recycled twice at ReadLastConfirmedAndEntryOp
    
    Descriptions of the changes in this PR:
    
    The issue #1476 is caused by peculative reads with object recycling, same request object will be added to the CompletionObjects multiple times with different txnid.  In fact the logic of process the request already take this into account, only on place inside `ReadLastConfirmedAndEntryOp.requestComplete` forget to check requestComplete before calling `submitCallback` which in turn call request.close.
    
    ### Motivation
    
    to fix #1476
    
    ### Changes
    
    check `requestComplete` before `submitCallback` in `ReadLastConfirmedAndEntryOp.requestComplete`
    
    Master Issue: #1476
    
    Author: Sijie Guo <si...@apache.org>
    Author: infodog <in...@hotmail.com>
    Author: zhengxiangyang <zx...@xinshi.net>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Jia Zhai <None>
    
    This closes #1509 from infodog/issue1476, closes #1476
    
    (cherry picked from commit 6476fc323fffd328ed9620d9aa39c4c232f8d2be)
    Signed-off-by: Sijie Guo <si...@apache.org>
---
 .../org/apache/bookkeeper/client/BookKeeper.java   |   4 +
 .../client/ReadLastConfirmedAndEntryOp.java        |  52 +++--
 .../client/impl/LastConfirmedAndEntryImpl.java     |  14 +-
 .../client/ReadLastConfirmedAndEntryOpTest.java    | 252 +++++++++++++++++++++
 4 files changed, 293 insertions(+), 29 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index bd0a12d..1ee0595 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -742,6 +742,10 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
         }
     }
 
+    boolean shouldReorderReadSequence() {
+        return reorderReadSequence;
+    }
+
     ZooKeeper getZkHandle() {
         return ((ZKMetadataClientDriver) metadataDriver).getZk();
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
index b9888ba..1b3fa70 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
@@ -81,9 +81,9 @@ class ReadLastConfirmedAndEntryOp implements BookkeeperInternalCallbacks.ReadEnt
         ReadLACAndEntryRequest(ArrayList<BookieSocketAddress> ensemble, long lId, long eId) {
             this.entryImpl = LedgerEntryImpl.create(lId, eId);
             this.ensemble = ensemble;
-            this.writeSet = lh.distributionSchedule.getWriteSetForLongPoll(eId);
-            if (lh.bk.reorderReadSequence) {
-                this.orderedEnsemble = lh.bk.placementPolicy.reorderReadLACSequence(ensemble,
+            this.writeSet = lh.getDistributionSchedule().getWriteSetForLongPoll(eId);
+            if (lh.getBk().shouldReorderReadSequence()) {
+                this.orderedEnsemble = lh.getBk().getPlacementPolicy().reorderReadLACSequence(ensemble,
                         lh.getBookiesHealthInfo(), writeSet.copy());
             } else {
                 this.orderedEnsemble = writeSet.copy();
@@ -118,7 +118,7 @@ class ReadLastConfirmedAndEntryOp implements BookkeeperInternalCallbacks.ReadEnt
         boolean complete(int bookieIndex, BookieSocketAddress host, final ByteBuf buffer, long entryId) {
             ByteBuf content;
             try {
-                content = lh.macManager.verifyDigestAndReturnData(entryId, buffer);
+                content = lh.getDigestManager().verifyDigestAndReturnData(entryId, buffer);
             } catch (BKException.BKDigestMatchException e) {
                 logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch", BKException.Code.DigestMatchException);
                 return false;
@@ -201,7 +201,7 @@ class ReadLastConfirmedAndEntryOp implements BookkeeperInternalCallbacks.ReadEnt
 
             if (LOG.isDebugEnabled()) {
                 LOG.debug("{} while reading entry: {} ledgerId: {} from bookie: {}", errMsg, entryImpl.getEntryId(),
-                        lh.ledgerId, host);
+                        lh.getId(), host);
             }
         }
 
@@ -417,7 +417,7 @@ class ReadLastConfirmedAndEntryOp implements BookkeeperInternalCallbacks.ReadEnt
                 for (int i = 0; i < numReplicasTried; i++) {
                     int slowBookieIndex = orderedEnsemble.get(i);
                     BookieSocketAddress slowBookieSocketAddress = ensemble.get(slowBookieIndex);
-                    lh.bk.placementPolicy.registerSlowBookie(slowBookieSocketAddress, entryId);
+                    lh.getBk().getPlacementPolicy().registerSlowBookie(slowBookieSocketAddress, entryId);
                 }
             }
             return completed;
@@ -449,7 +449,7 @@ class ReadLastConfirmedAndEntryOp implements BookkeeperInternalCallbacks.ReadEnt
     }
 
     protected LedgerMetadata getLedgerMetadata() {
-        return lh.metadata;
+        return lh.getLedgerMetadata();
     }
 
     ReadLastConfirmedAndEntryOp parallelRead(boolean enabled) {
@@ -462,7 +462,7 @@ class ReadLastConfirmedAndEntryOp implements BookkeeperInternalCallbacks.ReadEnt
      */
     @Override
     public ListenableFuture<Boolean> issueSpeculativeRequest() {
-        return lh.bk.getMainWorkerPool().submitOrdered(lh.getId(), new Callable<Boolean>() {
+        return lh.getBk().getMainWorkerPool().submitOrdered(lh.getId(), new Callable<Boolean>() {
             @Override
             public Boolean call() throws Exception {
                 if (!requestComplete.get() && !request.isComplete()
@@ -480,14 +480,14 @@ class ReadLastConfirmedAndEntryOp implements BookkeeperInternalCallbacks.ReadEnt
 
     public void initiate() {
         if (parallelRead) {
-            request = new ParallelReadRequest(lh.metadata.currentEnsemble, lh.ledgerId, prevEntryId + 1);
+            request = new ParallelReadRequest(lh.getLedgerMetadata().currentEnsemble, lh.getId(), prevEntryId + 1);
         } else {
-            request = new SequenceReadRequest(lh.metadata.currentEnsemble, lh.ledgerId, prevEntryId + 1);
+            request = new SequenceReadRequest(lh.getLedgerMetadata().currentEnsemble, lh.getId(), prevEntryId + 1);
         }
         request.read();
 
-        if (!parallelRead && lh.bk.getReadLACSpeculativeRequestPolicy().isPresent()) {
-            lh.bk.getReadLACSpeculativeRequestPolicy().get().initiateSpeculativeRequest(scheduler, this);
+        if (!parallelRead && lh.getBk().getReadLACSpeculativeRequestPolicy().isPresent()) {
+            lh.getBk().getReadLACSpeculativeRequestPolicy().get().initiateSpeculativeRequest(scheduler, this);
         }
     }
 
@@ -496,8 +496,8 @@ class ReadLastConfirmedAndEntryOp implements BookkeeperInternalCallbacks.ReadEnt
             LOG.debug("Calling Read LAC and Entry with {} and long polling interval {} on Bookie {} - Parallel {}",
                     prevEntryId, timeOutInMillis, to, parallelRead);
         }
-        lh.bk.getBookieClient().readEntryWaitForLACUpdate(to,
-            lh.ledgerId,
+        lh.getBk().getBookieClient().readEntryWaitForLACUpdate(to,
+            lh.getId(),
             BookieProtocol.LAST_ADD_CONFIRMED,
             prevEntryId,
             timeOutInMillis,
@@ -517,12 +517,12 @@ class ReadLastConfirmedAndEntryOp implements BookkeeperInternalCallbacks.ReadEnt
         long latencyMicros = MathUtils.elapsedMicroSec(requestTimeNano);
         LedgerEntry entry;
         if (BKException.Code.OK != rc) {
-            lh.bk.getReadLacAndEntryOpLogger()
+            lh.getBk().getReadLacAndEntryOpLogger()
                 .registerFailedEvent(latencyMicros, TimeUnit.MICROSECONDS);
             entry = null;
         } else {
             // could received advanced lac, with no entry
-            lh.bk.getReadLacAndEntryOpLogger()
+            lh.getBk().getReadLacAndEntryOpLogger()
                 .registerSuccessfulEvent(latencyMicros, TimeUnit.MICROSECONDS);
             if (request.entryImpl.getEntryBuffer() != null) {
                 entry = new LedgerEntry(request.entryImpl);
@@ -558,18 +558,20 @@ class ReadLastConfirmedAndEntryOp implements BookkeeperInternalCallbacks.ReadEnt
 
             if (entryId != BookieProtocol.LAST_ADD_CONFIRMED) {
                 buffer.retain();
-                if (request.complete(rCtx.getBookieIndex(), bookie, buffer, entryId)) {
+                if (!requestComplete.get() && request.complete(rCtx.getBookieIndex(), bookie, buffer, entryId)) {
                     // callback immediately
                     if (rCtx.getLacUpdateTimestamp().isPresent()) {
                         long elapsedMicros = TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()
                                 - rCtx.getLacUpdateTimestamp().get());
                         elapsedMicros = Math.max(elapsedMicros, 0);
-                        lh.bk.getReadLacAndEntryRespLogger()
-                            .registerSuccessfulEvent(elapsedMicros, TimeUnit.MICROSECONDS);
+                        lh.getBk().getReadLacAndEntryRespLogger()
+                                .registerSuccessfulEvent(elapsedMicros, TimeUnit.MICROSECONDS);
                     }
 
-                    submitCallback(BKException.Code.OK);
-                    requestComplete.set(true);
+                    // if the request has already completed, the buffer is not going to be used anymore, release it.
+                    if (!completeRequest()) {
+                        buffer.release();
+                    }
                     heardFromHostsBitSet.set(rCtx.getBookieIndex(), true);
                 } else {
                     buffer.release();
@@ -611,8 +613,9 @@ class ReadLastConfirmedAndEntryOp implements BookkeeperInternalCallbacks.ReadEnt
         }
     }
 
-    private void completeRequest() {
-        if (requestComplete.compareAndSet(false, true)) {
+    private boolean completeRequest() {
+        boolean requestCompleted = requestComplete.compareAndSet(false, true);
+        if (requestCompleted) {
             if (!hasValidResponse) {
                 // no success called
                 submitCallback(request.getFirstError());
@@ -621,11 +624,12 @@ class ReadLastConfirmedAndEntryOp implements BookkeeperInternalCallbacks.ReadEnt
                 submitCallback(BKException.Code.OK);
             }
         }
+        return requestCompleted;
     }
 
     @Override
     public String toString() {
-        return String.format("ReadLastConfirmedAndEntryOp(lid=%d, prevEntryId=%d])", lh.ledgerId, prevEntryId);
+        return String.format("ReadLastConfirmedAndEntryOp(lid=%d, prevEntryId=%d])", lh.getId(), prevEntryId);
     }
 
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LastConfirmedAndEntryImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LastConfirmedAndEntryImpl.java
index 8f1924a..8090214 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LastConfirmedAndEntryImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LastConfirmedAndEntryImpl.java
@@ -41,11 +41,15 @@ public class LastConfirmedAndEntryImpl implements LastConfirmedAndEntry {
     public static LastConfirmedAndEntryImpl create(long lac, org.apache.bookkeeper.client.LedgerEntry entry) {
         LastConfirmedAndEntryImpl entryImpl = RECYCLER.get();
         entryImpl.lac = lac;
-        entryImpl.entry = LedgerEntryImpl.create(
-            entry.getLedgerId(),
-            entry.getEntryId(),
-            entry.getLength(),
-            entry.getEntryBuffer());
+        if (null == entry) {
+            entryImpl.entry = null;
+        } else {
+            entryImpl.entry = LedgerEntryImpl.create(
+                entry.getLedgerId(),
+                entry.getEntryId(),
+                entry.getLength(),
+                entry.getEntryBuffer());
+        }
         return entryImpl;
     }
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java
new file mode 100644
index 0000000..f0a03b0
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.client;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.common.base.Optional;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException.Code;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.ReadLastConfirmedAndEntryOp.LastConfirmedAndEntryCallback;
+import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
+import org.apache.bookkeeper.client.impl.LastConfirmedAndEntryImpl;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.proto.BookieProtocol;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.proto.ReadLastConfirmedAndEntryContext;
+import org.apache.bookkeeper.proto.checksum.DigestManager;
+import org.apache.bookkeeper.proto.checksum.DummyDigestManager;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.test.TestStatsProvider;
+import org.apache.bookkeeper.util.ByteBufList;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test {@link ReadLastConfirmedAndEntryOp} with mocks.
+ */
+@Slf4j
+public class ReadLastConfirmedAndEntryOpTest {
+
+    private static final long LEDGERID = System.currentTimeMillis();
+
+    private final TestStatsProvider testStatsProvider = new TestStatsProvider();
+    private OpStatsLogger readLacAndEntryOpLogger;
+    private BookieClient mockBookieClient;
+    private BookKeeper mockBk;
+    private LedgerHandle mockLh;
+    private ScheduledExecutorService scheduler;
+    private OrderedScheduler orderedScheduler;
+    private SpeculativeRequestExecutionPolicy speculativePolicy;
+    private LedgerMetadata ledgerMetadata;
+    private DistributionSchedule distributionSchedule;
+    private DigestManager digestManager;
+
+    @Before
+    public void setup() throws Exception {
+        // stats
+        this.readLacAndEntryOpLogger = testStatsProvider
+            .getStatsLogger("").getOpStatsLogger("readLacAndEntry");
+        // policy
+        this.speculativePolicy = new DefaultSpeculativeRequestExecutionPolicy(
+            100, 200, 2);
+        // metadata
+        this.ledgerMetadata =
+            new LedgerMetadata(3, 3, 2, DigestType.CRC32, new byte[0]);
+        ArrayList<BookieSocketAddress> ensemble = new ArrayList<>(3);
+        for (int i = 0; i < 3; i++) {
+            ensemble.add(new BookieSocketAddress("127.0.0.1", 3181 + i));
+        }
+        this.ledgerMetadata.addEnsemble(0L, ensemble);
+        this.distributionSchedule = new RoundRobinDistributionSchedule(3, 2, 3);
+        // schedulers
+        this.scheduler = Executors.newSingleThreadScheduledExecutor();
+        this.orderedScheduler = OrderedScheduler.newSchedulerBuilder()
+            .name("test-ordered-scheduler")
+            .numThreads(1)
+            .build();
+
+        this.mockBookieClient = mock(BookieClient.class);
+
+        this.mockBk = mock(BookKeeper.class);
+        when(mockBk.getReadLACSpeculativeRequestPolicy()).thenReturn(Optional.of(speculativePolicy));
+        when(mockBk.getBookieClient()).thenReturn(mockBookieClient);
+        when(mockBk.getReadLacAndEntryOpLogger()).thenReturn(readLacAndEntryOpLogger);
+        when(mockBk.getMainWorkerPool()).thenReturn(orderedScheduler);
+        EnsemblePlacementPolicy mockPlacementPolicy = mock(EnsemblePlacementPolicy.class);
+        when(mockBk.getPlacementPolicy()).thenReturn(mockPlacementPolicy);
+        this.mockLh = mock(LedgerHandle.class);
+        when(mockLh.getBk()).thenReturn(mockBk);
+        when(mockLh.getId()).thenReturn(LEDGERID);
+        when(mockLh.getLedgerMetadata()).thenReturn(ledgerMetadata);
+        when(mockLh.getDistributionSchedule()).thenReturn(distributionSchedule);
+        digestManager = new DummyDigestManager(LEDGERID, false);
+        when(mockLh.getDigestManager()).thenReturn(digestManager);
+    }
+
+    @After
+    public void teardown() {
+        this.scheduler.shutdown();
+        this.orderedScheduler.shutdown();
+    }
+
+    @Data
+    static class ReadLastConfirmedAndEntryHolder {
+
+        private final BookieSocketAddress address;
+        private final ReadEntryCallback callback;
+        private final ReadLastConfirmedAndEntryContext context;
+
+    }
+
+    /**
+     * Test case: handling different speculative responses. one speculative response might return a valid response
+     * with a read entry, while the other speculative response might return a valid response without an entry.
+     * {@link ReadLastConfirmedAndEntryOp} should handle both responses well.
+     *
+     * <p>This test case covers {@link https://github.com/apache/bookkeeper/issues/1476}.
+     */
+    @Test
+    public void testSpeculativeResponses() throws Exception {
+        final long entryId = 2L;
+        final long lac = 1L;
+
+        ByteBuf data = Unpooled.copiedBuffer("test-speculative-responses", UTF_8);
+        ByteBufList dataWithDigest = digestManager.computeDigestAndPackageForSending(
+            entryId, lac, data.readableBytes(), data);
+        byte[] bytesWithDigest = new byte[dataWithDigest.readableBytes()];
+        assertEquals(bytesWithDigest.length, dataWithDigest.getBytes(bytesWithDigest));
+
+        final Map<BookieSocketAddress, ReadLastConfirmedAndEntryHolder> callbacks =
+            Collections.synchronizedMap(new HashMap<>());
+        doAnswer(invocationOnMock -> {
+            BookieSocketAddress address = invocationOnMock.getArgument(0);
+            ReadEntryCallback callback = invocationOnMock.getArgument(6);
+            ReadLastConfirmedAndEntryContext context = invocationOnMock.getArgument(7);
+
+            ReadLastConfirmedAndEntryHolder holder = new ReadLastConfirmedAndEntryHolder(address, callback, context);
+
+            log.info("Received read request to bookie {}", address);
+
+            callbacks.put(address, holder);
+            return null;
+        }).when(mockBookieClient).readEntryWaitForLACUpdate(
+            any(BookieSocketAddress.class),
+            anyLong(),
+            anyLong(),
+            anyLong(),
+            anyLong(),
+            anyBoolean(),
+            any(ReadEntryCallback.class),
+            any()
+        );
+
+        CompletableFuture<LastConfirmedAndEntry> resultFuture = new CompletableFuture<>();
+        LastConfirmedAndEntryCallback resultCallback = (rc, lastAddConfirmed, entry) -> {
+            if (Code.OK != rc) {
+                FutureUtils.completeExceptionally(resultFuture, BKException.create(rc));
+            } else {
+                FutureUtils.complete(resultFuture, LastConfirmedAndEntryImpl.create(lastAddConfirmed, entry));
+            }
+        };
+
+        ReadLastConfirmedAndEntryOp op = new ReadLastConfirmedAndEntryOp(
+            mockLh,
+            resultCallback,
+            1L,
+            10000,
+            scheduler
+        );
+        op.initiate();
+
+        // wait until all speculative requests are sent
+        while (callbacks.size() < 3) {
+            log.info("Received {} read requests", callbacks.size());
+            Thread.sleep(100);
+        }
+
+        log.info("All speculative reads are outstanding now.");
+
+        // once all the speculative reads are outstanding. complete the requests in following sequence:
+
+        // 1) complete one bookie with empty response (OK, entryId = INVALID_ENTRY_ID)
+        // 2) complete second bookie with valid entry response. this will trigger double-release bug described in
+        //    {@link https://github.com/apache/bookkeeper/issues/1476}
+
+        Iterator<Entry<BookieSocketAddress, ReadLastConfirmedAndEntryHolder>> iter = callbacks.entrySet().iterator();
+        assertTrue(iter.hasNext());
+        Entry<BookieSocketAddress, ReadLastConfirmedAndEntryHolder> firstBookieEntry = iter.next();
+        ReadLastConfirmedAndEntryHolder firstBookieHolder = firstBookieEntry.getValue();
+        ReadLastConfirmedAndEntryContext firstContext = firstBookieHolder.context;
+        firstContext.setLastAddConfirmed(entryId);
+        firstBookieHolder.getCallback()
+            .readEntryComplete(Code.OK, LEDGERID, BookieProtocol.INVALID_ENTRY_ID, null, firstContext);
+
+        // readEntryComplete above will release the entry impl back to the object pools.
+        // we want to make sure after the entry is recycled, it will not be mutated by any future callbacks.
+        LedgerEntryImpl entry = LedgerEntryImpl.create(LEDGERID, Long.MAX_VALUE);
+
+        assertTrue(iter.hasNext());
+        Entry<BookieSocketAddress, ReadLastConfirmedAndEntryHolder> secondBookieEntry = iter.next();
+        ReadLastConfirmedAndEntryHolder secondBookieHolder = secondBookieEntry.getValue();
+        ReadLastConfirmedAndEntryContext secondContext = secondBookieHolder.context;
+        secondContext.setLastAddConfirmed(entryId);
+        secondBookieHolder.getCallback().readEntryComplete(
+            Code.OK, LEDGERID, entryId, Unpooled.wrappedBuffer(bytesWithDigest), secondContext);
+
+        // the recycled entry shouldn't be updated by any future callbacks.
+        assertNull(entry.getEntryBuffer());
+        entry.close();
+
+        // wait for results
+        try (LastConfirmedAndEntry lacAndEntry = FutureUtils.result(resultFuture)) {
+            assertEquals(entryId, lacAndEntry.getLastAddConfirmed());
+            assertNull(lacAndEntry.getEntry());
+        }
+    }
+
+}

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.