You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2019/04/19 00:40:11 UTC

[bookkeeper] branch master updated: k

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 44ee320  k
44ee320 is described below

commit 44ee320b64ee1d47ee8b278a843bd033660b7aa0
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Fri Apr 19 02:40:07 2019 +0200

    k
    
    Otherwise the watch objects will accumulate and eventually cause an
    OOM on the bookie, if LAC doesn't progress.
    
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Matteo Merli <mm...@apache.org>, Venkateswararao Jujjuri (JV) <None>
    
    This closes #2051 from ivankelly/tme-oom
---
 .../java/org/apache/bookkeeper/bookie/Bookie.java  |   7 ++
 .../org/apache/bookkeeper/bookie/FileInfo.java     |   4 +
 .../bookkeeper/bookie/IndexPersistenceMgr.java     |  13 +++
 .../bookie/InterleavedLedgerStorage.java           |   6 ++
 .../InterleavedStorageRegenerateIndexOp.java       |   6 ++
 .../org/apache/bookkeeper/bookie/LedgerCache.java  |   2 +
 .../apache/bookkeeper/bookie/LedgerCacheImpl.java  |   7 ++
 .../apache/bookkeeper/bookie/LedgerDescriptor.java |   2 +
 .../bookkeeper/bookie/LedgerDescriptorImpl.java    |   5 +
 .../apache/bookkeeper/bookie/LedgerStorage.java    |  10 ++
 .../bookkeeper/bookie/SortedLedgerStorage.java     |   7 ++
 .../bookie/storage/ldb/DbLedgerStorage.java        |   7 ++
 .../ldb/SingleDirectoryDbLedgerStorage.java        |   7 ++
 .../bookie/storage/ldb/TransientLedgerInfo.java    |   5 +
 .../proto/LongPollReadEntryProcessorV3.java        |   9 +-
 .../bookkeeper/proto/ReadEntryProcessorV3.java     |   4 +-
 .../apache/bookkeeper/bookie/TestSyncThread.java   |   6 ++
 .../org/apache/bookkeeper/meta/GcLedgersTest.java  |   6 ++
 .../bookkeeper/meta/LedgerManagerTestCase.java     |   6 ++
 .../proto/LongPollReadEntryProcessorV3Test.java    | 114 +++++++++++++++++++++
 20 files changed, 227 insertions(+), 6 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 06ccd58..e253fdc 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -1409,6 +1409,13 @@ public class Bookie extends BookieCriticalThread {
         return handle.waitForLastAddConfirmedUpdate(previousLAC, watcher);
     }
 
+    public void cancelWaitForLastAddConfirmedUpdate(long ledgerId,
+                                                    Watcher<LastAddConfirmedUpdateNotification> watcher)
+            throws IOException {
+        LedgerDescriptor handle = handles.getReadOnlyHandle(ledgerId);
+        handle.cancelWaitForLastAddConfirmedUpdate(watcher);
+    }
+
     @VisibleForTesting
     public LedgerStorage getLedgerStorage() {
         return ledgerStorage;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
index f9bcd28..e8571be 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
@@ -150,6 +150,10 @@ class FileInfo extends Watchable<LastAddConfirmedUpdateNotification> {
         return true;
     }
 
+    synchronized void cancelWaitForLastAddConfirmedUpdate(Watcher<LastAddConfirmedUpdateNotification> watcher) {
+        deleteWatcher(watcher);
+    }
+
     public boolean isClosed() {
         return isClosed;
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
index 8d8f5cf..b728df3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
@@ -374,6 +374,19 @@ public class IndexPersistenceMgr {
         }
     }
 
+    void cancelWaitForLastAddConfirmedUpdate(long ledgerId,
+                                          Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException {
+        CachedFileInfo fi = null;
+        try {
+            fi = getFileInfo(ledgerId, null);
+            fi.cancelWaitForLastAddConfirmedUpdate(watcher);
+        } finally {
+            if (null != fi) {
+                fi.release();
+            }
+        }
+    }
+
     long updateLastAddConfirmed(long ledgerId, long lac) throws IOException {
         CachedFileInfo fi = null;
         try {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index 59ea9ec..becb16b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -352,6 +352,12 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
         return ledgerCache.waitForLastAddConfirmedUpdate(ledgerId, previousLAC, watcher);
     }
 
+    @Override
+    public void cancelWaitForLastAddConfirmedUpdate(long ledgerId,
+                                                    Watcher<LastAddConfirmedUpdateNotification> watcher)
+            throws IOException {
+        ledgerCache.cancelWaitForLastAddConfirmedUpdate(ledgerId, watcher);
+    }
 
     @Override
     public long addEntry(ByteBuf entry) throws IOException {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java
index f5e7c17..880d3c1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java
@@ -217,6 +217,12 @@ public class InterleavedStorageRegenerateIndexOp {
             throw new UnsupportedOperationException();
         }
         @Override
+        public void cancelWaitForLastAddConfirmedUpdate(long ledgerId,
+                                                        Watcher<LastAddConfirmedUpdateNotification> watcher)
+                throws IOException {
+            throw new UnsupportedOperationException();
+        }
+        @Override
         public void deleteLedger(long ledgerId) throws IOException {
         }
         @Override
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
index 993bd0c..cae8bb4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
@@ -53,6 +53,8 @@ public interface LedgerCache extends Closeable {
     boolean waitForLastAddConfirmedUpdate(long ledgerId,
                                           long previousLAC,
                                           Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException;
+    void cancelWaitForLastAddConfirmedUpdate(long ledgerId,
+                                             Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException;
 
     void deleteLedger(long ledgerId) throws IOException;
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
index 119a4f4..e6de2f9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
@@ -92,6 +92,13 @@ public class LedgerCacheImpl implements LedgerCache {
     }
 
     @Override
+    public void cancelWaitForLastAddConfirmedUpdate(long ledgerId,
+                                                    Watcher<LastAddConfirmedUpdateNotification> watcher)
+            throws IOException {
+        indexPersistenceManager.cancelWaitForLastAddConfirmedUpdate(ledgerId, watcher);
+    }
+
+    @Override
     public void putEntryOffset(long ledger, long entry, long offset) throws IOException {
         indexPageManager.putEntryOffset(ledger, entry, offset);
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
index 23840be..74bc8b5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
@@ -80,6 +80,8 @@ public abstract class LedgerDescriptor {
     abstract boolean waitForLastAddConfirmedUpdate(long previousLAC,
                                                    Watcher<LastAddConfirmedUpdateNotification> watcher)
         throws IOException;
+    abstract void cancelWaitForLastAddConfirmedUpdate(Watcher<LastAddConfirmedUpdateNotification> watcher)
+            throws IOException;
 
     abstract void setExplicitLac(ByteBuf entry) throws IOException;
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
index ee91ed0..563494e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
@@ -168,4 +168,9 @@ public class LedgerDescriptorImpl extends LedgerDescriptor {
                                           Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException {
         return ledgerStorage.waitForLastAddConfirmedUpdate(ledgerId, previousLAC, watcher);
     }
+
+    @Override
+    void cancelWaitForLastAddConfirmedUpdate(Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException {
+        ledgerStorage.cancelWaitForLastAddConfirmedUpdate(ledgerId, watcher);
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
index 1353e8b..2c47b28 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
@@ -137,6 +137,16 @@ public interface LedgerStorage {
                                           Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException;
 
     /**
+     * Cancel a previous wait for last add confirmed update.
+     *
+     * @param ledgerId The ledger being watched.
+     * @param watcher The watcher to cancel.
+     * @throws IOException
+     */
+    void cancelWaitForLastAddConfirmedUpdate(long ledgerId,
+                                                Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException;
+
+    /**
      * Flushes all data in the storage. Once this is called,
      * add data written to the LedgerStorage up until this point
      * has been persisted to perminant storage
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
index 77653db..23e0716 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
@@ -228,6 +228,13 @@ public class SortedLedgerStorage
     }
 
     @Override
+    public void cancelWaitForLastAddConfirmedUpdate(long ledgerId,
+                                                    Watcher<LastAddConfirmedUpdateNotification> watcher)
+            throws IOException {
+        interleavedLedgerStorage.cancelWaitForLastAddConfirmedUpdate(ledgerId, watcher);
+    }
+
+    @Override
     public void checkpoint(final Checkpoint checkpoint) throws IOException {
         long numBytesFlushed = memTable.flush(this, checkpoint);
         interleavedLedgerStorage.getEntryLogger().prepareSortedLedgerStorageCheckpoint(numBytesFlushed);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
index e118349..a68cd1c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
@@ -210,6 +210,13 @@ public class DbLedgerStorage implements LedgerStorage {
     }
 
     @Override
+    public void cancelWaitForLastAddConfirmedUpdate(long ledgerId,
+                                                    Watcher<LastAddConfirmedUpdateNotification> watcher)
+            throws IOException {
+        getLedgerSorage(ledgerId).cancelWaitForLastAddConfirmedUpdate(ledgerId, watcher);
+    }
+
+    @Override
     public void flush() throws IOException {
         for (LedgerStorage ls : ledgerStorageList) {
             ls.flush();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
index 3289b3d..4c41235 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
@@ -757,6 +757,13 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage
     }
 
     @Override
+    public void cancelWaitForLastAddConfirmedUpdate(long ledgerId,
+                                                    Watcher<LastAddConfirmedUpdateNotification> watcher)
+            throws IOException {
+        getOrAddLedgerInfo(ledgerId).cancelWaitForLastAddConfirmedUpdate(watcher);
+    }
+
+    @Override
     public void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException {
         getOrAddLedgerInfo(ledgerId).setExplicitLac(lac);
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/TransientLedgerInfo.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/TransientLedgerInfo.java
index 91f3fbb..17a8462 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/TransientLedgerInfo.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/TransientLedgerInfo.java
@@ -99,6 +99,11 @@ class TransientLedgerInfo extends Watchable<LastAddConfirmedUpdateNotification>
         return true;
     }
 
+    synchronized void cancelWaitForLastAddConfirmedUpdate(Watcher<LastAddConfirmedUpdateNotification> watcher)
+            throws IOException {
+        deleteWatcher(watcher);
+    }
+
     public ByteBuf getExplicitLac() {
         ByteBuf retLac = null;
         synchronized (this) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
index 6f25d68..c04ecc9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
@@ -142,7 +142,7 @@ class LongPollReadEntryProcessorV3 extends ReadEntryProcessorV3 implements Watch
 
             final boolean watched;
             try {
-                watched = requestProcessor.bookie.waitForLastAddConfirmedUpdate(ledgerId, previousLAC, this);
+                watched = requestProcessor.getBookie().waitForLastAddConfirmedUpdate(ledgerId, previousLAC, this);
             } catch (Bookie.NoLedgerException e) {
                 logger.info("No ledger found while longpoll reading ledger {}, previous lac = {}.",
                         ledgerId, previousLAC);
@@ -163,9 +163,10 @@ class LongPollReadEntryProcessorV3 extends ReadEntryProcessorV3 implements Watch
                 }
                 synchronized (this) {
                     expirationTimerTask = requestTimer.newTimeout(timeout -> {
-                        // When the timeout expires just get whatever is the current
-                        // readLastConfirmed
-                        LongPollReadEntryProcessorV3.this.scheduleDeferredRead(true);
+                            requestProcessor.getBookie().cancelWaitForLastAddConfirmedUpdate(ledgerId, this);
+                            // When the timeout expires just get whatever is the current
+                            // readLastConfirmed
+                            LongPollReadEntryProcessorV3.this.scheduleDeferredRead(true);
                     }, readRequest.getTimeOut(), TimeUnit.MILLISECONDS);
                 }
                 return null;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
index 88b7662..a8ecc11 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
@@ -174,7 +174,7 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 {
                                      boolean readLACPiggyBack,
                                      Stopwatch startTimeSw)
         throws IOException {
-        ByteBuf entryBody = requestProcessor.bookie.readEntry(ledgerId, entryId);
+        ByteBuf entryBody = requestProcessor.getBookie().readEntry(ledgerId, entryId);
         if (null != fenceResult) {
             handleReadResultForFenceRead(entryBody, readResponseBuilder, entryId, startTimeSw);
             return null;
@@ -184,7 +184,7 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 {
                 if (readLACPiggyBack) {
                     readResponseBuilder.setEntryId(entryId);
                 } else {
-                    long knownLAC = requestProcessor.bookie.readLastAddConfirmed(ledgerId);
+                    long knownLAC = requestProcessor.getBookie().readLastAddConfirmed(ledgerId);
                     readResponseBuilder.setMaxLAC(knownLAC);
                 }
                 registerSuccessfulEvent(readStats, startTimeSw);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
index 707eb81..f57d388 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
@@ -356,6 +356,12 @@ public class TestSyncThread {
         }
 
         @Override
+        public void cancelWaitForLastAddConfirmedUpdate(long ledgerId,
+                                                        Watcher<LastAddConfirmedUpdateNotification> watcher)
+                throws IOException {
+        }
+
+        @Override
         public void checkpoint(Checkpoint checkpoint)
                 throws IOException {
         }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
index abc6614..37eacf4 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
@@ -678,5 +678,11 @@ public class GcLedgersTest extends LedgerManagerTestCase {
                 throws IOException {
             return false;
         }
+
+        @Override
+        public void cancelWaitForLastAddConfirmedUpdate(long ledgerId,
+                                                        Watcher<LastAddConfirmedUpdateNotification> watcher)
+                throws IOException {
+        }
     }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
index cf9b3dd..a673d8c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
@@ -275,6 +275,12 @@ public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase {
         }
 
         @Override
+        public void cancelWaitForLastAddConfirmedUpdate(long ledgerId,
+                                                        Watcher<LastAddConfirmedUpdateNotification> watcher)
+                throws IOException {
+        }
+
+        @Override
         public void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException {
         }
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java
new file mode 100644
index 0000000..393d5dd
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.proto;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.protobuf.ByteString;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.util.HashedWheelTimer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ProtocolVersion;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+
+
+
+/**
+ * Unit test {@link LongPollReadEntryProcessorV3}.
+ */
+public class LongPollReadEntryProcessorV3Test {
+    ExecutorService executor;
+    HashedWheelTimer timer;
+
+    @Before
+    public void setup() {
+        executor = Executors.newSingleThreadExecutor();
+        timer = new HashedWheelTimer();
+    }
+
+    @After
+    public void teardown() {
+        timer.stop();
+        executor.shutdownNow();
+    }
+
+    @Test
+    public void testWatchIsCancelledOnTimeout() throws Exception {
+        Request request = Request.newBuilder()
+            .setHeader(BKPacketHeader.newBuilder()
+                       .setTxnId(System.currentTimeMillis())
+                       .setVersion(ProtocolVersion.VERSION_THREE)
+                       .setOperation(OperationType.READ_ENTRY)
+                       .build())
+            .setReadRequest(ReadRequest.newBuilder()
+                            .setLedgerId(10)
+                            .setEntryId(1)
+                            .setMasterKey(ByteString.copyFrom(new byte[0]))
+                            .setPreviousLAC(0)
+                            .setTimeOut(1)
+                            .build())
+            .build();
+
+        Channel channel = mock(Channel.class);
+        Bookie bookie = mock(Bookie.class);
+
+        BookieRequestProcessor requestProcessor = mock(BookieRequestProcessor.class);
+        when(requestProcessor.getBookie()).thenReturn(bookie);
+        when(requestProcessor.getRequestStats()).thenReturn(new RequestStats(NullStatsLogger.INSTANCE));
+
+        when(bookie.waitForLastAddConfirmedUpdate(anyLong(), anyLong(), any()))
+            .thenReturn(true);
+        when(bookie.readEntry(anyLong(), anyLong())).thenReturn(Unpooled.buffer());
+        when(bookie.readLastAddConfirmed(anyLong())).thenReturn(Long.valueOf(1));
+
+        CompletableFuture<Void> cancelFuture = new CompletableFuture<>();
+
+        doAnswer(invocationOnMock -> {
+                cancelFuture.complete(null);
+                return null;
+            }).when(bookie).cancelWaitForLastAddConfirmedUpdate(anyLong(), any());
+
+        LongPollReadEntryProcessorV3 processor = new LongPollReadEntryProcessorV3(
+            request,
+            channel,
+            requestProcessor,
+            executor, executor, timer);
+
+        processor.run();
+
+        cancelFuture.get(10, TimeUnit.SECONDS);
+    }
+}