You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2017/11/22 01:11:28 UTC

[GitHub] jiazhai closed pull request #738: Issue 731: refine LedgerEntry interface and implementation

jiazhai closed pull request #738: Issue 731: refine LedgerEntry interface and implementation
URL: https://github.com/apache/bookkeeper/pull/738
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java
index 4e39e5159..8cb31f391 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java
@@ -23,8 +23,8 @@
 import com.google.common.base.Preconditions;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufInputStream;
-
 import java.io.InputStream;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 
 /**
@@ -32,30 +32,28 @@
  * the entry content.
  *
  */
-public class LedgerEntry
-    implements org.apache.bookkeeper.client.api.LedgerEntry {
+public class LedgerEntry {
 
     final long ledgerId;
-    long entryId;
-    long length;
+    final long entryId;
+    final long length;
     ByteBuf data;
 
-    LedgerEntry(long lId, long eId) {
-        this.ledgerId = lId;
-        this.entryId = eId;
+    LedgerEntry(LedgerEntryImpl entry) {
+        this.ledgerId = entry.getLedgerId();
+        this.entryId = entry.getEntryId();
+        this.length = entry.getLength();
+        this.data = entry.getEntryBuffer().retain();
     }
 
-    @Override
     public long getLedgerId() {
         return ledgerId;
     }
 
-    @Override
     public long getEntryId() {
         return entryId;
     }
 
-    @Override
     public long getLength() {
         return length;
     }
@@ -68,7 +66,6 @@ public long getLength() {
      * @return the content of the entry
      * @throws IllegalStateException if this method is called twice
      */
-    @Override
     public byte[] getEntry() {
         Preconditions.checkState(null != data, "entry content can be accessed only once");
         byte[] entry = new byte[data.readableBytes()];
@@ -105,7 +102,6 @@ public InputStream getEntryInputStream() {
      * @throws IllegalStateException if the entry has been retrieved by {@link #getEntry()}
      * or {@link #getEntryInputStream()}.
      */
-    @Override
     public ByteBuf getEntryBuffer() {
         Preconditions.checkState(null != data, "entry content has been retrieved" +
             " by #getEntry or #getEntryInputStream");
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 60ad65f4d..49962a909 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -20,11 +20,14 @@
  */
 package org.apache.bookkeeper.client;
 
+import static org.apache.bookkeeper.client.api.BKException.Code.ClientClosedException;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import com.google.common.collect.Iterators;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.RateLimiter;
 import io.netty.buffer.ByteBuf;
@@ -49,16 +52,21 @@
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
 import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
+import org.apache.bookkeeper.client.BKException.BKIncorrectParameterException;
+import org.apache.bookkeeper.client.BKException.BKReadException;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.SyncCallbackUtils.FutureReadLastConfirmed;
 import org.apache.bookkeeper.client.SyncCallbackUtils.FutureReadLastConfirmedAndEntry;
-import org.apache.bookkeeper.client.SyncCallbackUtils.FutureReadResult;
 import org.apache.bookkeeper.client.SyncCallbackUtils.SyncAddCallback;
 import org.apache.bookkeeper.client.SyncCallbackUtils.SyncCloseCallback;
 import org.apache.bookkeeper.client.SyncCallbackUtils.SyncReadCallback;
 import org.apache.bookkeeper.client.SyncCallbackUtils.SyncReadLastConfirmedCallback;
+import org.apache.bookkeeper.client.api.BKException.Code;
 import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
 import org.apache.bookkeeper.client.api.WriteHandle;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+import org.apache.bookkeeper.common.concurrent.FutureEventListener;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
@@ -69,6 +77,7 @@
 import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback;
 import org.apache.bookkeeper.util.SafeRunnable;
+import org.apache.commons.collections4.IteratorUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -626,9 +635,20 @@ public void asyncReadUnconfirmedEntries(long firstEntry, long lastEntry, ReadCal
      */
     @Override
     public CompletableFuture<Iterable<org.apache.bookkeeper.client.api.LedgerEntry>> read(long firstEntry, long lastEntry) {
-        FutureReadResult result = new FutureReadResult();
-        asyncReadEntries(firstEntry, lastEntry, result, null);
-        return result;
+        // Little sanity check
+        if (firstEntry < 0 || firstEntry > lastEntry) {
+            LOG.error("IncorrectParameterException on ledgerId:{} firstEntry:{} lastEntry:{}",
+                    new Object[] { ledgerId, firstEntry, lastEntry });
+            return FutureUtils.exception(new BKIncorrectParameterException());
+        }
+
+        if (lastEntry > lastAddConfirmed) {
+            LOG.error("ReadException on ledgerId:{} firstEntry:{} lastEntry:{}",
+                    new Object[] { ledgerId, firstEntry, lastEntry });
+            return FutureUtils.exception(new BKReadException());
+        }
+
+        return readEntriesInternalAsync(firstEntry, lastEntry);
     }
 
     /**
@@ -656,14 +676,58 @@ public void asyncReadUnconfirmedEntries(long firstEntry, long lastEntry, ReadCal
      */
     @Override
     public CompletableFuture<Iterable<org.apache.bookkeeper.client.api.LedgerEntry>> readUnconfirmed(long firstEntry, long lastEntry) {
-        FutureReadResult result = new FutureReadResult();
-        asyncReadUnconfirmedEntries(firstEntry, lastEntry, result, null);
-        return result;
+        // Little sanity check
+        if (firstEntry < 0 || firstEntry > lastEntry) {
+            LOG.error("IncorrectParameterException on ledgerId:{} firstEntry:{} lastEntry:{}",
+                    new Object[] { ledgerId, firstEntry, lastEntry });
+            return FutureUtils.exception(new BKIncorrectParameterException());
+        }
+
+        return readEntriesInternalAsync(firstEntry, lastEntry);
     }
 
     void asyncReadEntriesInternal(long firstEntry, long lastEntry, ReadCallback cb, Object ctx) {
-        new PendingReadOp(this, bk.getScheduler(),
-                          firstEntry, lastEntry, cb, ctx).initiate();
+        if(!bk.isClosed()) {
+            readEntriesInternalAsync(firstEntry, lastEntry)
+                .whenCompleteAsync(new FutureEventListener<Iterable<org.apache.bookkeeper.client.api.LedgerEntry>>() {
+                    @Override
+                    public void onSuccess(Iterable<org.apache.bookkeeper.client.api.LedgerEntry> iterable) {
+                        cb.readComplete(
+                            Code.OK,
+                            LedgerHandle.this,
+                            IteratorUtils.asEnumeration(
+                                Iterators.transform(iterable.iterator(), le -> {
+                                    LedgerEntry entry = new LedgerEntry((LedgerEntryImpl) le);
+                                    le.close();
+                                    return entry;
+                                })),
+                            ctx);
+                    }
+
+                    @Override
+                    public void onFailure(Throwable cause) {
+                        if (cause instanceof BKException) {
+                            BKException bke = (BKException) cause;
+                            cb.readComplete(bke.getCode(), LedgerHandle.this, null, ctx);
+                        } else {
+                            cb.readComplete(Code.UnexpectedConditionException, LedgerHandle.this, null, ctx);
+                        }
+                    }
+                }, bk.getMainWorkerPool().chooseThread(ledgerId));
+        } else {
+            cb.readComplete(Code.ClientClosedException, LedgerHandle.this, null, ctx);
+        }
+    }
+
+    CompletableFuture<Iterable<org.apache.bookkeeper.client.api.LedgerEntry>> readEntriesInternalAsync(long firstEntry,
+                                                                                                       long lastEntry) {
+        PendingReadOp op = new PendingReadOp(this, bk.getScheduler(), firstEntry, lastEntry);
+        if(!bk.isClosed()) {
+            bk.getMainWorkerPool().submitOrdered(ledgerId, op);
+        } else {
+            op.future().completeExceptionally(BKException.create(ClientClosedException));
+        }
+        return op.future();
     }
 
     /**
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
index df7c84ed4..cde52a1d0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
@@ -21,10 +21,8 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
-import org.apache.bookkeeper.client.BKException.Code;
 import org.apache.bookkeeper.client.DigestManager.RecoveryData;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryListener;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java
index afb21bf14..290e69bf6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java
@@ -20,15 +20,17 @@
  */
 package org.apache.bookkeeper.client;
 
-import java.util.NoSuchElementException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryListener;
 import org.apache.bookkeeper.util.MathUtils;
 
+@Slf4j
 class ListenerBasedPendingReadOp extends PendingReadOp {
 
     final ReadEntryListener listener;
+    final Object ctx;
 
     ListenerBasedPendingReadOp(LedgerHandle lh,
                                ScheduledExecutorService scheduler,
@@ -53,38 +55,34 @@
                                ReadEntryListener listener,
                                Object ctx,
                                boolean isRecoveryRead) {
-        super(lh, scheduler, startEntryId, endEntryId, null, ctx, isRecoveryRead);
+        super(lh, scheduler, startEntryId, endEntryId, isRecoveryRead);
         this.listener = listener;
+        this.ctx = ctx;
     }
 
     @Override
     protected void submitCallback(int code) {
         LedgerEntryRequest request;
-        while ((request = seq.peek()) != null) {
+        while (!seq.isEmpty() && (request = seq.get(0)) != null) {
             if (!request.isComplete()) {
                 return;
             }
-            seq.remove();
+            seq.remove(0);
             long latencyNanos = MathUtils.elapsedNanos(requestTimeNanos);
+            LedgerEntry entry;
             if (BKException.Code.OK == request.getRc()) {
                 readOpLogger.registerSuccessfulEvent(latencyNanos, TimeUnit.NANOSECONDS);
+                // callback with completed entry
+                entry = new LedgerEntry(request.entryImpl);
             } else {
                 readOpLogger.registerFailedEvent(latencyNanos, TimeUnit.NANOSECONDS);
+                entry = null;
             }
-            // callback with completed entry
-            listener.onEntryComplete(request.getRc(), lh, request, ctx);
+            request.close();
+            listener.onEntryComplete(request.getRc(), lh, entry, ctx);
         }
         // if all entries are already completed.
         cancelSpeculativeTask(true);
     }
 
-    @Override
-    public boolean hasMoreElements() {
-        return false;
-    }
-
-    @Override
-    public LedgerEntry nextElement() throws NoSuchElementException {
-        throw new NoSuchElementException();
-    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index 40da31c2d..e31c5b72f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -20,26 +20,24 @@
  */
 package org.apache.bookkeeper.client;
 
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ListenableFuture;
 import io.netty.buffer.ByteBuf;
-
 import java.util.ArrayList;
 import java.util.BitSet;
-import java.util.Enumeration;
 import java.util.HashSet;
-import java.util.NoSuchElementException;
-import java.util.Queue;
+import java.util.List;
 import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.google.common.util.concurrent.ListenableFuture;
-
-import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+import org.apache.bookkeeper.common.util.SafeRunnable;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallbackCtx;
@@ -55,16 +53,15 @@
  * application as soon as it arrives rather than waiting for the whole thing.
  *
  */
-class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
+class PendingReadOp implements ReadEntryCallback, SafeRunnable {
     private static final Logger LOG = LoggerFactory.getLogger(PendingReadOp.class);
 
-    final private ScheduledExecutorService scheduler;
+    private final ScheduledExecutorService scheduler;
     private ScheduledFuture<?> speculativeTask = null;
-    Queue<LedgerEntryRequest> seq;
+    protected final List<LedgerEntryRequest> seq;
+    private final CompletableFuture<Iterable<LedgerEntry>> future;
     Set<BookieSocketAddress> heardFromHosts;
     BitSet heardFromHostsBitSet;
-    ReadCallback cb;
-    Object ctx;
     LedgerHandle lh;
     long numPendingEntries;
     long startEntryId;
@@ -77,7 +74,7 @@
     boolean parallelRead = false;
     final AtomicBoolean complete = new AtomicBoolean(false);
 
-    abstract class LedgerEntryRequest extends LedgerEntry implements SpeculativeRequestExecutor {
+    abstract class LedgerEntryRequest implements SpeculativeRequestExecutor, AutoCloseable {
 
         final AtomicBoolean complete = new AtomicBoolean(false);
 
@@ -87,10 +84,10 @@
 
         final ArrayList<BookieSocketAddress> ensemble;
         final DistributionSchedule.WriteSet writeSet;
+        final LedgerEntryImpl entryImpl;
 
         LedgerEntryRequest(ArrayList<BookieSocketAddress> ensemble, long lId, long eId) {
-            super(lId, eId);
-
+            this.entryImpl = LedgerEntryImpl.create(lId, eId);
             this.ensemble = ensemble;
 
             if (lh.bk.isReorderReadSequence()) {
@@ -98,12 +95,16 @@
                     .reorderReadSequence(
                             ensemble,
                             lh.bookieFailureHistory.asMap(),
-                            lh.distributionSchedule.getWriteSet(entryId));
+                            lh.distributionSchedule.getWriteSet(eId));
             } else {
-                writeSet = lh.distributionSchedule.getWriteSet(entryId);
+                writeSet = lh.distributionSchedule.getWriteSet(eId);
             }
         }
 
+        public void close() {
+            entryImpl.close();
+        }
+
         /**
          * Execute the read request.
          */
@@ -124,7 +125,7 @@
         boolean complete(int bookieIndex, BookieSocketAddress host, final ByteBuf buffer) {
             ByteBuf content;
             try {
-                content = lh.macManager.verifyDigestAndReturnData(entryId, buffer);
+                content = lh.macManager.verifyDigestAndReturnData(entryImpl.getEntryId(), buffer);
             } catch (BKDigestMatchException e) {
                 logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch", BKException.Code.DigestMatchException);
                 buffer.release();
@@ -137,8 +138,8 @@ boolean complete(int bookieIndex, BookieSocketAddress host, final ByteBuf buffer
                  * The length is a long and it is the last field of the metadata of an entry.
                  * Consequently, we have to subtract 8 from METADATA_LENGTH to get the length.
                  */
-                length = buffer.getLong(DigestManager.METADATA_LENGTH - 8);
-                data = content;
+                entryImpl.setLength(buffer.getLong(DigestManager.METADATA_LENGTH - 8));
+                entryImpl.setEntryBuf(content);
                 writeSet.recycle();
                 return true;
             } else {
@@ -195,12 +196,12 @@ synchronized void logErrorAndReattemptRead(int bookieIndex, BookieSocketAddress
                 ++numMissedEntryReads;
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("No such entry found on bookie.  L{} E{} bookie: {}",
-                        new Object[] { lh.ledgerId, entryId, host });
+                        new Object[] { lh.ledgerId, entryImpl.getEntryId(), host });
                 }
             } else {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug(errMsg + " while reading L{} E{} from bookie: {}",
-                        new Object[]{lh.ledgerId, entryId, host});
+                        new Object[]{lh.ledgerId, entryImpl.getEntryId(), host});
                 }
             }
         }
@@ -235,7 +236,7 @@ int getRc() {
 
         @Override
         public String toString() {
-            return String.format("L%d-E%d", ledgerId, entryId);
+            return String.format("L%d-E%d", entryImpl.getLedgerId(), entryImpl.getEntryId());
         }
 
         /**
@@ -423,16 +424,12 @@ synchronized void logErrorAndReattemptRead(int bookieIndex, BookieSocketAddress
     PendingReadOp(LedgerHandle lh,
                   ScheduledExecutorService scheduler,
                   long startEntryId,
-                  long endEntryId,
-                  ReadCallback cb,
-                  Object ctx) {
+                  long endEntryId) {
         this(
             lh,
             scheduler,
             startEntryId,
             endEntryId,
-            cb,
-            ctx,
             false);
     }
 
@@ -440,12 +437,9 @@ synchronized void logErrorAndReattemptRead(int bookieIndex, BookieSocketAddress
                   ScheduledExecutorService scheduler,
                   long startEntryId,
                   long endEntryId,
-                  ReadCallback cb,
-                  Object ctx,
                   boolean isRecoveryRead) {
-        seq = new ArrayBlockingQueue<LedgerEntryRequest>((int) ((endEntryId + 1) - startEntryId));
-        this.cb = cb;
-        this.ctx = ctx;
+        this.seq = new ArrayList<>((int) ((endEntryId + 1) - startEntryId));
+        this.future = new CompletableFuture<>();
         this.lh = lh;
         this.startEntryId = startEntryId;
         this.endEntryId = endEntryId;
@@ -460,6 +454,10 @@ synchronized void logErrorAndReattemptRead(int bookieIndex, BookieSocketAddress
         readOpLogger = lh.bk.getReadOpLogger();
     }
 
+    CompletableFuture<Iterable<LedgerEntry>> future() {
+        return future;
+    }
+
     protected LedgerMetadata getLedgerMetadata() {
         return lh.metadata;
     }
@@ -476,7 +474,11 @@ PendingReadOp parallelRead(boolean enabled) {
         return this;
     }
 
-    public void initiate() {
+    public void submit() {
+        lh.bk.getMainWorkerPool().submitOrdered(lh.ledgerId, this);
+    }
+
+    void initiate() {
         long nextEnsembleChange = startEntryId, i = startEntryId;
         this.requestTimeNanos = MathUtils.nowInNano();
         ArrayList<BookieSocketAddress> ensemble = null;
@@ -503,6 +505,11 @@ public void initiate() {
         }
     }
 
+    @Override
+    public void safeRun() {
+        initiate();
+    }
+
     private static class ReadContext implements ReadEntryCallbackCtx {
         final int bookieIndex;
         final BookieSocketAddress to;
@@ -531,7 +538,7 @@ void sendReadTo(int bookieIndex, BookieSocketAddress to, LedgerEntryRequest entr
             lh.throttler.acquire();
         }
 
-        lh.bk.getBookieClient().readEntry(to, lh.ledgerId, entry.entryId,
+        lh.bk.getBookieClient().readEntry(to, lh.ledgerId, entry.entryImpl.getEntryId(),
                                      this, new ReadContext(bookieIndex, to, entry));
     }
 
@@ -574,37 +581,27 @@ protected void submitCallback(int code) {
             return;
         }
 
+        cancelSpeculativeTask(true);
+
         long latencyNanos = MathUtils.elapsedNanos(requestTimeNanos);
         if (code != BKException.Code.OK) {
             long firstUnread = LedgerHandle.INVALID_ENTRY_ID;
             for (LedgerEntryRequest req : seq) {
                 if (!req.isComplete()) {
-                    firstUnread = req.getEntryId();
+                    firstUnread = req.entryImpl.getEntryId();
                     break;
                 }
             }
             LOG.error("Read of ledger entry failed: L{} E{}-E{}, Heard from {} : bitset = {}. First unread entry is {}",
                     new Object[] { lh.getId(), startEntryId, endEntryId, heardFromHosts, heardFromHostsBitSet, firstUnread });
             readOpLogger.registerFailedEvent(latencyNanos, TimeUnit.NANOSECONDS);
+            // release the entries
+            seq.forEach(LedgerEntryRequest::close);
+            future.completeExceptionally(BKException.create(code));
         } else {
             readOpLogger.registerSuccessfulEvent(latencyNanos, TimeUnit.NANOSECONDS);
+            future.complete(Lists.transform(seq, input -> input.entryImpl));
         }
-        cancelSpeculativeTask(true);
-        cb.readComplete(code, lh, PendingReadOp.this, PendingReadOp.this.ctx);
-        cb = null;
     }
 
-    @Override
-    public boolean hasMoreElements() {
-        return !seq.isEmpty();
-    }
-
-    @Override
-    public LedgerEntry nextElement() throws NoSuchElementException {
-        return seq.remove();
-    }
-
-    public int size() {
-        return seq.size();
-    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
index ae721d502..9ee75d954 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
@@ -24,17 +24,16 @@
 import io.netty.buffer.ByteBuf;
 import java.util.ArrayList;
 import java.util.BitSet;
-import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
 import org.apache.bookkeeper.proto.ReadLastConfirmedAndEntryContext;
 import org.apache.bookkeeper.util.MathUtils;
-import org.apache.commons.lang3.ArrayUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,7 +64,7 @@
     private long lastAddConfirmed;
     private long timeOutInMillis;
 
-    abstract class ReadLACAndEntryRequest extends LedgerEntry {
+    abstract class ReadLACAndEntryRequest implements AutoCloseable {
 
         final AtomicBoolean complete = new AtomicBoolean(false);
 
@@ -76,12 +75,12 @@
         final ArrayList<BookieSocketAddress> ensemble;
         final DistributionSchedule.WriteSet writeSet;
         final DistributionSchedule.WriteSet orderedEnsemble;
+        final LedgerEntryImpl entryImpl;
 
         ReadLACAndEntryRequest(ArrayList<BookieSocketAddress> ensemble, long lId, long eId) {
-            super(lId, eId);
-
+            this.entryImpl = LedgerEntryImpl.create(lId, eId);
             this.ensemble = ensemble;
-            this.writeSet = lh.distributionSchedule.getWriteSet(entryId);
+            this.writeSet = lh.distributionSchedule.getWriteSet(eId);
             if (lh.bk.reorderReadSequence) {
                 this.orderedEnsemble = lh.bk.placementPolicy.reorderReadLACSequence(ensemble,
                         lh.bookieFailureHistory.asMap(), writeSet.copy());
@@ -90,6 +89,10 @@
             }
         }
 
+        public void close() {
+            entryImpl.close();
+        }
+
         synchronized int getFirstError() {
             return firstError;
         }
@@ -125,13 +128,12 @@ boolean complete(int bookieIndex, BookieSocketAddress host, final ByteBuf buffer
                 writeSet.recycle();
                 orderedEnsemble.recycle();
                 rc = BKException.Code.OK;
-                this.entryId = entryId;
                 /*
                  * The length is a long and it is the last field of the metadata of an entry.
                  * Consequently, we have to subtract 8 from METADATA_LENGTH to get the length.
                  */
-                length = buffer.getLong(DigestManager.METADATA_LENGTH - 8);
-                data = content;
+                entryImpl.setLength(buffer.getLong(DigestManager.METADATA_LENGTH - 8));
+                entryImpl.setEntryBuf(content);
                 return true;
             } else {
                 return false;
@@ -193,13 +195,13 @@ synchronized void logErrorAndReattemptRead(int bookieIndex, BookieSocketAddress
                 // treat these errors as failures if the node from which we received this is part of
                 // the writeSet
                 if (this.writeSet.contains(bookieIndex)) {
-                    lh.registerOperationFailureOnBookie(host, entryId);
+                    lh.registerOperationFailureOnBookie(host, entryImpl.getEntryId());
                 }
                 ++numMissedEntryReads;
             }
 
             if (LOG.isDebugEnabled()) {
-                LOG.debug(errMsg + " while reading entry: " + entryId + " ledgerId: " + lh.ledgerId + " from bookie: "
+                LOG.debug(errMsg + " while reading entry: " + entryImpl.getEntryId() + " ledgerId: " + lh.ledgerId + " from bookie: "
                     + host);
             }
         }
@@ -234,7 +236,7 @@ int getRc() {
 
         @Override
         public String toString() {
-            return String.format("L%d-E%d", ledgerId, entryId);
+            return String.format("L%d-E%d", entryImpl.getLedgerId(), entryImpl.getEntryId());
         }
     }
 
@@ -493,15 +495,24 @@ void sendReadTo(int bookieIndex, BookieSocketAddress to, ReadLACAndEntryRequest
         public void readLastConfirmedAndEntryComplete(int rc, long lastAddConfirmed, LedgerEntry entry);
     }
 
-    private void submitCallback(int rc, long lastAddConfirmed, LedgerEntry entry) {
+    private void submitCallback(int rc) {
         long latencyMicros = MathUtils.elapsedMicroSec(requestTimeNano);
+        LedgerEntry entry;
         if (BKException.Code.OK != rc) {
             lh.bk.getReadLacAndEntryOpLogger()
                 .registerFailedEvent(latencyMicros, TimeUnit.MICROSECONDS);
+            entry = null;
         } else {
+            // could received advanced lac, with no entry
             lh.bk.getReadLacAndEntryOpLogger()
                 .registerSuccessfulEvent(latencyMicros, TimeUnit.MICROSECONDS);
+            if (request.entryImpl.getEntryBuffer() != null) {
+                entry = new LedgerEntry(request.entryImpl);
+            } else {
+                entry = null;
+            }
         }
+        request.close();
         cb.readLastConfirmedAndEntryComplete(rc, lastAddConfirmed, entry);
     }
 
@@ -537,7 +548,7 @@ public void readEntryComplete(int rc, long ledgerId, long entryId, ByteBuf buffe
                             .registerSuccessfulEvent(elapsedMicros, TimeUnit.MICROSECONDS);
                     }
 
-                    submitCallback(BKException.Code.OK, lastAddConfirmed, request);
+                    submitCallback(BKException.Code.OK);
                     requestComplete.set(true);
                     heardFromHostsBitSet.set(rCtx.getBookieIndex(), true);
                 }
@@ -564,7 +575,7 @@ public void readEntryComplete(int rc, long ledgerId, long entryId, ByteBuf buffe
                 return;
             }
         } else if (BKException.Code.UnauthorizedAccessException == rc && !requestComplete.get()) {
-            submitCallback(rc, lastAddConfirmed, null);
+            submitCallback(rc);
             requestComplete.set(true);
         } else {
             request.logErrorAndReattemptRead(rCtx.getBookieIndex(), bookie, "Error: " + BKException.getMessage(rc), rc);
@@ -580,10 +591,10 @@ private void completeRequest() {
         if (requestComplete.compareAndSet(false, true)) {
             if (!hasValidResponse) {
                 // no success called
-                submitCallback(request.getFirstError(), lastAddConfirmed, null);
+                submitCallback(request.getFirstError());
             } else {
                 // callback
-                submitCallback(BKException.Code.OK, lastAddConfirmed, null);
+                submitCallback(BKException.Code.OK);
             }
         }
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/SyncCallbackUtils.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/SyncCallbackUtils.java
index 13bcff87b..5ac30800c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/SyncCallbackUtils.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/SyncCallbackUtils.java
@@ -17,13 +17,11 @@
  */
 package org.apache.bookkeeper.client;
 
-import com.google.common.collect.Iterators;
 import java.util.Enumeration;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
-import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.client.impl.LastConfirmedAndEntryImpl;
 
 /**
@@ -220,34 +218,6 @@ public void readComplete(int rc, LedgerHandle lh,
         }
     }
 
-    static class FutureReadResult
-        extends CompletableFuture<Iterable<org.apache.bookkeeper.client.api.LedgerEntry>>
-        implements AsyncCallback.ReadCallback {
-
-        /**
-         * Implementation of callback interface for read method of {@link ReadHandle}.
-         *
-         * @param rc
-         *          return code
-         * @param lh
-         *          ledger handle
-         * @param seq
-         *          sequence of entries
-         * @param ctx
-         *          control object
-         */
-        @Override
-        @SuppressWarnings("unchecked")
-        public void readComplete(int rc, LedgerHandle lh,
-                                 Enumeration<LedgerEntry> seq, Object ctx) {
-            if (rc != BKException.Code.OK) {
-                this.completeExceptionally(BKException.create(rc).fillInStackTrace());
-            } else {
-                this.complete((Iterable) () -> Iterators.forEnumeration(seq));
-            }
-        }
-    }
-
     static class SyncAddCallback extends CompletableFuture<Long> implements AsyncCallback.AddCallback {
 
         /**
@@ -320,7 +290,7 @@ public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
 
         @Override
         public void readLastConfirmedAndEntryComplete(int rc, long lastConfirmed, LedgerEntry entry, Object ctx) {
-            LastConfirmedAndEntry result = new LastConfirmedAndEntryImpl(lastConfirmed, entry);
+            LastConfirmedAndEntry result = LastConfirmedAndEntryImpl.create(lastConfirmed, entry);
             finish(rc, result, this);
         }
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LastConfirmedAndEntry.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LastConfirmedAndEntry.java
index 3a10d9630..8bbe58e21 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LastConfirmedAndEntry.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LastConfirmedAndEntry.java
@@ -24,14 +24,14 @@
  * This contains LastAddConfirmed entryId and a LedgerEntry wanted to read.
  * It is used for readLastAddConfirmedAndEntry.
  */
-public interface LastConfirmedAndEntry {
+public interface LastConfirmedAndEntry extends AutoCloseable {
 
     /**
      * Gets LastAddConfirmed entryId.
      *
      * @return the LastAddConfirmed
      */
-    Long getLastAddConfirmed();
+    long getLastAddConfirmed();
 
     /**
      * Whether this entity contains an entry.
@@ -47,4 +47,9 @@
      */
     LedgerEntry getEntry();
 
+    /**
+     * {@inheritDoc}
+     */
+    void close();
+
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LedgerEntry.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LedgerEntry.java
index 97fe3777d..43bdc3443 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LedgerEntry.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LedgerEntry.java
@@ -23,7 +23,6 @@
 import io.netty.buffer.ByteBuf;
 import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public;
 import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
-import org.apache.bookkeeper.conf.ClientConfiguration;
 
 /**
  * An entry.
@@ -32,7 +31,7 @@
  */
 @Public
 @Unstable
-public interface LedgerEntry {
+public interface LedgerEntry extends AutoCloseable {
 
     /**
      * The id of the ledger which contains the entry.
@@ -56,25 +55,31 @@
     long getLength();
 
     /**
-     * Returns the content of the entry. This method can be called only once. While using v2 wire protocol this method
-     * will automatically release the internal ByteBuf.
+     * Returns the content of the entry.
      *
      * @return the content of the entry
-     * @throws IllegalStateException if this method is called twice
      */
     byte[] getEntry();
 
     /**
      * Return the internal buffer that contains the entry payload.
      *
-     * <p>Note: Using v2 wire protocol it is responsibility of the caller
-     * to ensure to release the buffer after usage.
-     *
      * @return a ByteBuf which contains the data
-     *
-     * @see ClientConfiguration#setNettyUsePooledBuffers(boolean)
-     * @throws IllegalStateException if the entry has been retrieved by {@link #getEntry()}
      */
     ByteBuf getEntryBuffer();
 
+    /**
+     * Returns a duplicate of this entry.
+     *
+     * <p>This call will retain a slice of the underneath byte buffer.
+     *
+     * @return a duplicated ledger entry.
+     */
+    LedgerEntry duplicate();
+
+    /**
+     * {@inheritDoc}
+     */
+    void close();
+
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LastConfirmedAndEntryImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LastConfirmedAndEntryImpl.java
index 4ed78f9bc..8f1924ac1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LastConfirmedAndEntryImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LastConfirmedAndEntryImpl.java
@@ -20,6 +20,8 @@
  */
 package org.apache.bookkeeper.client.impl;
 
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
 import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
 import org.apache.bookkeeper.client.api.LedgerEntry;
 
@@ -29,19 +31,37 @@
  */
 public class LastConfirmedAndEntryImpl implements LastConfirmedAndEntry {
 
-    private final Long lac;
-    private final LedgerEntry entry;
+    private static final Recycler<LastConfirmedAndEntryImpl> RECYCLER = new Recycler<LastConfirmedAndEntryImpl>() {
+        @Override
+        protected LastConfirmedAndEntryImpl newObject(Handle<LastConfirmedAndEntryImpl> handle) {
+            return new LastConfirmedAndEntryImpl(handle);
+        }
+    };
 
-    public LastConfirmedAndEntryImpl(Long lac, LedgerEntry entry) {
-        this.lac = lac;
-        this.entry = entry;
+    public static LastConfirmedAndEntryImpl create(long lac, org.apache.bookkeeper.client.LedgerEntry entry) {
+        LastConfirmedAndEntryImpl entryImpl = RECYCLER.get();
+        entryImpl.lac = lac;
+        entryImpl.entry = LedgerEntryImpl.create(
+            entry.getLedgerId(),
+            entry.getEntryId(),
+            entry.getLength(),
+            entry.getEntryBuffer());
+        return entryImpl;
+    }
+
+    private final Handle<LastConfirmedAndEntryImpl> recycleHandle;
+    private Long lac;
+    private LedgerEntry entry;
+
+    public LastConfirmedAndEntryImpl(Handle<LastConfirmedAndEntryImpl> handle) {
+        this.recycleHandle = handle;
     }
 
     /**
      * {@inheritDoc}
      */
     @Override
-    public Long getLastAddConfirmed() {
+    public long getLastAddConfirmed() {
         return lac;
     }
 
@@ -60,4 +80,17 @@ public boolean hasEntry() {
     public LedgerEntry getEntry() {
         return entry;
     }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void close() {
+        this.lac = -1L;
+        if (null != entry) {
+            entry.close();
+            entry = null;
+        }
+        recycleHandle.recycle(this);
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LedgerEntryImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LedgerEntryImpl.java
new file mode 100644
index 000000000..b90f299b4
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LedgerEntryImpl.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.client.impl;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+import io.netty.util.ReferenceCountUtil;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+
+/**
+ * Ledger entry. Its a simple tuple containing the ledger id, the entry-id, and
+ * the entry content.
+ */
+public class LedgerEntryImpl implements LedgerEntry {
+
+    private static final Recycler<LedgerEntryImpl> RECYCLER = new Recycler<LedgerEntryImpl>() {
+        @Override
+        protected LedgerEntryImpl newObject(Handle<LedgerEntryImpl> handle) {
+            return new LedgerEntryImpl(handle);
+        }
+    };
+
+    public static LedgerEntryImpl create(long ledgerId,
+                                         long entryId) {
+        LedgerEntryImpl entry = RECYCLER.get();
+        entry.ledgerId = ledgerId;
+        entry.entryId = entryId;
+        return entry;
+    }
+
+    public static LedgerEntryImpl create(long ledgerId,
+                                         long entryId,
+                                         long length,
+                                         ByteBuf buf) {
+        LedgerEntryImpl entry = RECYCLER.get();
+        entry.ledgerId = ledgerId;
+        entry.entryId = entryId;
+        entry.length = length;
+        entry.entryBuf = buf;
+        return entry;
+    }
+
+    public static LedgerEntryImpl duplicate(LedgerEntry entry) {
+        return create(
+            entry.getLedgerId(),
+            entry.getEntryId(),
+            entry.getLength(),
+            entry.getEntryBuffer().retainedSlice());
+    }
+
+    private final Handle<LedgerEntryImpl> recycleHandle;
+    private long ledgerId;
+    private long entryId;
+    private long length;
+    private ByteBuf entryBuf;
+
+    private LedgerEntryImpl(Handle<LedgerEntryImpl> handle) {
+        this.recycleHandle = handle;
+    }
+
+    public void setEntryId(long entryId) {
+        this.entryId = entryId;
+    }
+
+    public void setLength(long length) {
+        this.length = length;
+    }
+
+    public void setEntryBuf(ByteBuf buf) {
+        this.entryBuf = buf;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public long getLedgerId() {
+        return ledgerId;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public long getEntryId() {
+        return entryId;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public long getLength() {
+        return length;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public byte[] getEntry() {
+        return ByteBufUtil.getBytes(entryBuf);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public ByteBuf getEntryBuffer() {
+        return entryBuf;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public LedgerEntryImpl duplicate() {
+        return duplicate(this);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void close() {
+        recycle();
+    }
+
+    private void recycle() {
+        this.ledgerId = -1L;
+        this.entryId = -1L;
+        this.length = -1L;
+        ReferenceCountUtil.release(entryBuf);
+        this.entryBuf = null;
+        recycleHandle.recycle(this);
+    }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
index 91ba83042..8fb5c0271 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
@@ -53,7 +53,6 @@
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.junit.After;
 import org.junit.Before;
-import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
@@ -336,13 +335,13 @@ protected void setupBookieClientReadEntry() {
                 fencedLedgers.add(ledgerId);
                 MockEntry mockEntry = getMockLedgerEntry(ledgerId, bookieSocketAddress, entryId);
                 if (mockEntry != null) {
-                    LOG.info("readEntryAndFenceLedger - found mock entry {}@{} at {}", ledgerId, entryId, bookieSocketAddress);
+                    LOG.info("readEntryAndFenceLedger - found mock entry {}@{} at {}", entryId, ledgerId, bookieSocketAddress);
                     ByteBuf entry = macManager.computeDigestAndPackageForSending(entryId, mockEntry.lastAddConfirmed,
                         mockEntry.payload.length, Unpooled.wrappedBuffer(mockEntry.payload));
                     callback.readEntryComplete(BKException.Code.OK, ledgerId, entryId, Unpooled.copiedBuffer(entry), args[5]);
                     entry.release();
                 } else {
-                    LOG.info("readEntryAndFenceLedger - no such mock entry {}@{} at {}", ledgerId, entryId, bookieSocketAddress);
+                    LOG.info("readEntryAndFenceLedger - no such mock entry {}@{} at {}", entryId, ledgerId, bookieSocketAddress);
                     callback.readEntryComplete(BKException.Code.NoSuchEntryException, ledgerId, entryId, null, args[5]);
                 }
             });
@@ -361,13 +360,13 @@ protected void setupBookieClientReadEntry() {
                 DigestManager macManager = new CRC32DigestManager(ledgerId);
                 MockEntry mockEntry = getMockLedgerEntry(ledgerId, bookieSocketAddress, entryId);
                 if (mockEntry != null) {
-                    LOG.info("readEntry - found mock entry {}@{} at {}", ledgerId, entryId, bookieSocketAddress);
+                    LOG.info("readEntry - found mock entry {}@{} at {}", entryId, ledgerId, bookieSocketAddress);
                     ByteBuf entry = macManager.computeDigestAndPackageForSending(entryId,
                         mockEntry.lastAddConfirmed, mockEntry.payload.length, Unpooled.wrappedBuffer(mockEntry.payload));
                     callback.readEntryComplete(BKException.Code.OK, ledgerId, entryId, Unpooled.copiedBuffer(entry), args[4]);
                     entry.release();
                 } else {
-                    LOG.info("readEntry - no such mock entry {}@{} at {}", ledgerId, entryId, bookieSocketAddress);
+                    LOG.info("readEntry - no such mock entry {}@{} at {}", entryId, ledgerId, bookieSocketAddress);
                     callback.readEntryComplete(BKException.Code.NoSuchEntryException, ledgerId, entryId, null, args[4]);
                 }
             });
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
index 17ba6f17e..eff8c3a70 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
@@ -20,13 +20,20 @@
  */
 package org.apache.bookkeeper.client;
 
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
-import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import org.apache.bookkeeper.client.BKException.Code;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.api.LedgerEntry;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
@@ -34,10 +41,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Enumeration;
-import java.util.concurrent.CountDownLatch;
-
 /**
  * Unit tests for parallel reading
  */
@@ -63,39 +66,6 @@ long getLedgerToRead(int ensemble, int writeQuorum, int ackQuorum, int numEntrie
         return lh.getId();
     }
 
-    static class LatchCallback implements ReadCallback {
-
-        final CountDownLatch l = new CountDownLatch(1);
-        int rc = -0x1314;
-        Enumeration<LedgerEntry> entries;
-
-        Enumeration<LedgerEntry> getEntries() {
-            return entries;
-        }
-
-        int getRc() {
-            return rc;
-        }
-
-        @Override
-        public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
-            this.rc = rc;
-            this.entries = seq;
-            l.countDown();
-        }
-
-        void expectSuccess() throws Exception {
-            l.await();
-            assertTrue(BKException.Code.OK == rc);
-        }
-
-        void expectFail() throws Exception {
-            l.await();
-            assertFalse(BKException.Code.OK == rc);
-        }
-
-    }
-
     @Test
     public void testNormalParallelRead() throws Exception {
         int numEntries = 10;
@@ -105,34 +75,34 @@ public void testNormalParallelRead() throws Exception {
 
         // read single entry
         for (int i = 0; i < numEntries; i++) {
-            LatchCallback latch = new LatchCallback();
             PendingReadOp readOp =
-                    new PendingReadOp(lh, lh.bk.scheduler, i, i, latch, null);
-            readOp.parallelRead(true).initiate();
-            latch.expectSuccess();
-            Enumeration<LedgerEntry> entries = latch.getEntries();
-            assertNotNull(entries);
-            assertTrue(entries.hasMoreElements());
-            LedgerEntry entry = entries.nextElement();
+                    new PendingReadOp(lh, lh.bk.scheduler, i, i);
+            readOp.parallelRead(true).submit();
+            Iterable<LedgerEntry> iterable = readOp.future().get();
+            assertNotNull(iterable);
+            Iterator<LedgerEntry> entries = iterable.iterator();
+            assertTrue(entries.hasNext());
+            LedgerEntry entry = entries.next();
             assertNotNull(entry);
             assertEquals(i, Integer.parseInt(new String(entry.getEntry())));
-            assertFalse(entries.hasMoreElements());
+            entry.close();
+            assertFalse(entries.hasNext());
         }
 
         // read multiple entries
-        LatchCallback latch = new LatchCallback();
         PendingReadOp readOp =
-                new PendingReadOp(lh, lh.bk.scheduler, 0, numEntries - 1, latch, null);
-        readOp.parallelRead(true).initiate();
-        latch.expectSuccess();
-        Enumeration<LedgerEntry> entries = latch.getEntries();
-        assertNotNull(entries);
+                new PendingReadOp(lh, lh.bk.scheduler, 0, numEntries - 1);
+        readOp.parallelRead(true).submit();
+        Iterable<LedgerEntry> iterable = readOp.future().get();
+        assertNotNull(iterable);
+        Iterator<LedgerEntry> iterator = iterable.iterator();
 
         int numReads = 0;
-        while (entries.hasMoreElements()) {
-            LedgerEntry entry = entries.nextElement();
+        while (iterator.hasNext()) {
+            LedgerEntry entry = iterator.next();
             assertNotNull(entry);
             assertEquals(numReads, Integer.parseInt(new String(entry.getEntry())));
+            entry.close();
             ++numReads;
         }
         assertEquals(numEntries, numReads);
@@ -140,6 +110,17 @@ public void testNormalParallelRead() throws Exception {
         lh.close();
     }
 
+    private static <T> void expectFail(CompletableFuture<T> future, int expectedRc) {
+        try {
+            result(future);
+            fail("Expect to fail");
+        } catch (Exception e) {
+            assertTrue(e instanceof BKException);
+            BKException bke = (BKException) e;
+            assertEquals(expectedRc, bke.getCode());
+        }
+    }
+
     @Test
     public void testParallelReadMissingEntries() throws Exception {
         int numEntries = 10;
@@ -148,19 +129,15 @@ public void testParallelReadMissingEntries() throws Exception {
         LedgerHandle lh = bkc.openLedger(id, digestType, passwd);
 
         // read single entry
-        LatchCallback latch = new LatchCallback();
         PendingReadOp readOp =
-                new PendingReadOp(lh, lh.bk.scheduler, 11, 11, latch, null);
-        readOp.parallelRead(true).initiate();
-        latch.expectFail();
-        assertEquals(BKException.Code.NoSuchEntryException, latch.getRc());
+                new PendingReadOp(lh, lh.bk.scheduler, 11, 11);
+        readOp.parallelRead(true).submit();
+        expectFail(readOp.future(), Code.NoSuchEntryException);
 
         // read multiple entries
-        latch = new LatchCallback();
-        readOp = new PendingReadOp(lh, lh.bk.scheduler, 8, 11, latch, null);
-        readOp.parallelRead(true).initiate();
-        latch.expectFail();
-        assertEquals(BKException.Code.NoSuchEntryException, latch.getRc());
+        readOp = new PendingReadOp(lh, lh.bk.scheduler, 8, 11);
+        readOp.parallelRead(true).submit();
+        expectFail(readOp.future(), Code.NoSuchEntryException);
 
         lh.close();
     }
@@ -186,13 +163,11 @@ public void testFailParallelReadMissingEntryImmediately() throws Exception {
         sleepBookie(ensemble.get(0), latch1);
         sleepBookie(ensemble.get(1), latch2);
 
-        LatchCallback latchCallback = new LatchCallback();
         PendingReadOp readOp =
-                new PendingReadOp(lh, lh.bk.scheduler, 10, 10, latchCallback, null);
-        readOp.parallelRead(true).initiate();
+                new PendingReadOp(lh, lh.bk.scheduler, 10, 10);
+        readOp.parallelRead(true).submit();
         // would fail immediately if found missing entries don't cover ack quorum
-        latchCallback.expectFail();
-        assertEquals(BKException.Code.NoSuchEntryException, latchCallback.getRc());
+        expectFail(readOp.future(), Code.NoSuchEntryException);
         latch1.countDown();
         latch2.countDown();
 
@@ -220,17 +195,16 @@ public void testParallelReadWithFailedBookies() throws Exception {
         killBookie(ensemble.get(1));
 
         // read multiple entries
-        LatchCallback latch = new LatchCallback();
         PendingReadOp readOp =
-                new PendingReadOp(lh, lh.bk.scheduler, 0, numEntries - 1, latch, null);
-        readOp.parallelRead(true).initiate();
-        latch.expectSuccess();
-        Enumeration<LedgerEntry> entries = latch.getEntries();
-        assertNotNull(entries);
+                new PendingReadOp(lh, lh.bk.scheduler, 0, numEntries - 1);
+        readOp.parallelRead(true).submit();
+        Iterable<LedgerEntry> iterable = readOp.future().get();
+        assertNotNull(iterable);
+        Iterator<LedgerEntry> entries = iterable.iterator();
 
         int numReads = 0;
-        while (entries.hasMoreElements()) {
-            LedgerEntry entry = entries.nextElement();
+        while (entries.hasNext()) {
+            LedgerEntry entry = entries.next();
             assertNotNull(entry);
             assertEquals(numReads, Integer.parseInt(new String(entry.getEntry())));
             ++numReads;
@@ -262,12 +236,10 @@ public void testParallelReadFailureWithFailedBookies() throws Exception {
         killBookie(ensemble.get(2));
 
         // read multiple entries
-        LatchCallback latch = new LatchCallback();
         PendingReadOp readOp =
-                new PendingReadOp(lh, lh.bk.scheduler, 0, numEntries - 1, latch, null);
-        readOp.parallelRead(true).initiate();
-        latch.expectFail();
-        assertEquals(BKException.Code.BookieHandleNotAvailableException, latch.getRc());
+                new PendingReadOp(lh, lh.bk.scheduler, 0, numEntries - 1);
+        readOp.parallelRead(true).submit();
+        expectFail(readOp.future(), Code.BookieHandleNotAvailableException);
 
         lh.close();
         newBk.close();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadEntryListener.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadEntryListener.java
index 11e46e914..fb9a7471b 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadEntryListener.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadEntryListener.java
@@ -24,6 +24,10 @@
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryListener;
@@ -32,11 +36,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-
 /**
  * Unit tests for {@link org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryListener}.
  */
@@ -87,11 +86,17 @@ long getLedgerToRead(int ensemble, int writeQuorum, int ackQuorum, int numEntrie
 
         @Override
         public void onEntryComplete(int rc, LedgerHandle lh, LedgerEntry entry, Object ctx) {
-            if (nextEntryId != entry.getEntryId()) {
-                inOrder = false;
+            long entryId;
+            if (BKException.Code.OK == rc) {
+                if (nextEntryId != entry.getEntryId()) {
+                    inOrder = false;
+                }
+                entryId = entry.getEntryId();
+            } else {
+                entryId = nextEntryId;
             }
+            resultCodes.put(entryId, new EntryWithRC(rc, entry));
             ++nextEntryId;
-            resultCodes.put(entry.getEntryId(), new EntryWithRC(rc, entry));
             l.countDown();
         }
 
@@ -115,7 +120,7 @@ void basicReadTest(boolean parallelRead) throws Exception {
             LatchListener listener = new LatchListener(i, 1);
             ListenerBasedPendingReadOp readOp =
                     new ListenerBasedPendingReadOp(lh, lh.bk.scheduler, i, i, listener, null);
-            readOp.parallelRead(parallelRead).initiate();
+            readOp.parallelRead(parallelRead).submit();
             listener.expectComplete();
             assertEquals(1, listener.resultCodes.size());
             EntryWithRC entry = listener.resultCodes.get((long) i);
@@ -129,7 +134,7 @@ void basicReadTest(boolean parallelRead) throws Exception {
         LatchListener listener = new LatchListener(0L, numEntries);
         ListenerBasedPendingReadOp readOp =
                 new ListenerBasedPendingReadOp(lh, lh.bk.scheduler, 0, numEntries - 1, listener, null);
-        readOp.parallelRead(parallelRead).initiate();
+        readOp.parallelRead(parallelRead).submit();
         listener.expectComplete();
         assertEquals(numEntries, listener.resultCodes.size());
         for (int i = 0; i < numEntries; i++) {
@@ -163,7 +168,7 @@ private void readMissingEntriesTest(boolean parallelRead) throws Exception {
         LatchListener listener = new LatchListener(11L, 1);
         ListenerBasedPendingReadOp readOp =
                 new ListenerBasedPendingReadOp(lh, lh.bk.scheduler, 11, 11, listener, null);
-        readOp.parallelRead(parallelRead).initiate();
+        readOp.parallelRead(parallelRead).submit();
         listener.expectComplete();
         assertEquals(1, listener.resultCodes.size());
         EntryWithRC entry = listener.resultCodes.get(11L);
@@ -174,7 +179,7 @@ private void readMissingEntriesTest(boolean parallelRead) throws Exception {
         // read multiple missing entries
         listener = new LatchListener(11L, 3);
         readOp = new ListenerBasedPendingReadOp(lh, lh.bk.scheduler, 11, 13, listener, null);
-        readOp.parallelRead(parallelRead).initiate();
+        readOp.parallelRead(parallelRead).submit();
         listener.expectComplete();
         assertEquals(3, listener.resultCodes.size());
         assertTrue(listener.isInOrder());
@@ -188,7 +193,7 @@ private void readMissingEntriesTest(boolean parallelRead) throws Exception {
         // read multiple entries with missing entries
         listener = new LatchListener(5L, 10);
         readOp = new ListenerBasedPendingReadOp(lh, lh.bk.scheduler, 5L, 14L, listener, null);
-        readOp.parallelRead(parallelRead).initiate();
+        readOp.parallelRead(parallelRead).submit();
         listener.expectComplete();
         assertEquals(10, listener.resultCodes.size());
         assertTrue(listener.isInOrder());
@@ -234,7 +239,7 @@ private void readWithFailedBookiesTest(boolean parallelRead) throws Exception {
         LatchListener listener = new LatchListener(0L, numEntries);
         ListenerBasedPendingReadOp readOp =
                 new ListenerBasedPendingReadOp(lh, lh.bk.scheduler, 0, numEntries - 1, listener, null);
-        readOp.parallelRead(parallelRead).initiate();
+        readOp.parallelRead(parallelRead).submit();
         listener.expectComplete();
         assertEquals(numEntries, listener.resultCodes.size());
         for (int i = 0; i < numEntries; i++) {
@@ -265,7 +270,7 @@ private void readFailureWithFailedBookiesTest(boolean parallelRead) throws Excep
         LedgerHandle lh = bkc.openLedger(id, digestType, passwd);
 
         ArrayList<BookieSocketAddress> ensemble =
-                lh.getLedgerMetadata().getEnsemble(5);
+            lh.getLedgerMetadata().getEnsemble(5);
         // kill bookies
         killBookie(ensemble.get(0));
         killBookie(ensemble.get(1));
@@ -274,8 +279,8 @@ private void readFailureWithFailedBookiesTest(boolean parallelRead) throws Excep
         // read multiple entries
         LatchListener listener = new LatchListener(0L, numEntries);
         ListenerBasedPendingReadOp readOp =
-                new ListenerBasedPendingReadOp(lh, lh.bk.scheduler, 0, numEntries - 1, listener, null);
-        readOp.parallelRead(parallelRead).initiate();
+            new ListenerBasedPendingReadOp(lh, lh.bk.scheduler, 0, numEntries - 1, listener, null);
+        readOp.parallelRead(parallelRead).submit();
         listener.expectComplete();
         assertEquals(numEntries, listener.resultCodes.size());
         for (int i = 0; i < numEntries; i++) {
@@ -294,11 +299,11 @@ private void readFailureWithFailedBookiesTest(boolean parallelRead) throws Excep
 
     @Test
     public void testReadFailureWithFailedBookiesEnableParallelRead() throws Exception {
-        readWithFailedBookiesTest(true);
+        readFailureWithFailedBookiesTest(true);
     }
 
     @Test
     public void testReadFailureWithFailedBookiesDisableParallelRead() throws Exception {
-        readWithFailedBookiesTest(false);
+        readFailureWithFailedBookiesTest(false);
     }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
index 5ca752a4e..acfeb8cba 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
@@ -20,6 +20,10 @@
  */
 package org.apache.bookkeeper.client;
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Enumeration;
@@ -34,8 +38,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.junit.Assert.*;
-
 /**
  * This unit test tests ledger fencing;
  *
@@ -292,9 +294,7 @@ public void testSpeculativeReadScheduling() throws Exception {
         secondHostOnly.set(1, true);
         PendingReadOp.LedgerEntryRequest req0 = null, req2 = null, req4 = null;
         try {
-            LatchCallback latch0 = new LatchCallback();
-            PendingReadOp op = new PendingReadOp(l, bkspec.scheduler,
-                                                 0, 5, latch0, null);
+            PendingReadOp op = new PendingReadOp(l, bkspec.scheduler, 0, 5);
 
             // if we've already heard from all hosts,
             // we only send the initial read
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java
index 8a3d68fea..bbaa358b6 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java
@@ -207,8 +207,9 @@ public void testOpenLedgerRead() throws Exception {
             // test readLastAddConfirmedAndEntry
             LastConfirmedAndEntry lastConfirmedAndEntry =
                 result(reader.readLastAddConfirmedAndEntry(0, 999, false));
-            assertEquals(2, lastConfirmedAndEntry.getLastAddConfirmed().intValue());
+            assertEquals(2L, lastConfirmedAndEntry.getLastAddConfirmed());
             assertArrayEquals(data, lastConfirmedAndEntry.getEntry().getEntry());
+            lastConfirmedAndEntry.close();
         }
     }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services