You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2019/04/19 00:40:11 UTC
[bookkeeper] branch master updated: k
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 44ee320 k
44ee320 is described below
commit 44ee320b64ee1d47ee8b278a843bd033660b7aa0
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Fri Apr 19 02:40:07 2019 +0200
k
Otherwise the watch objects will accumulate and eventually cause an
OOM on the bookie, if LAC doesn't progress.
Reviewers: Enrico Olivelli <eo...@gmail.com>, Matteo Merli <mm...@apache.org>, Venkateswararao Jujjuri (JV) <None>
This closes #2051 from ivankelly/tme-oom
---
.../java/org/apache/bookkeeper/bookie/Bookie.java | 7 ++
.../org/apache/bookkeeper/bookie/FileInfo.java | 4 +
.../bookkeeper/bookie/IndexPersistenceMgr.java | 13 +++
.../bookie/InterleavedLedgerStorage.java | 6 ++
.../InterleavedStorageRegenerateIndexOp.java | 6 ++
.../org/apache/bookkeeper/bookie/LedgerCache.java | 2 +
.../apache/bookkeeper/bookie/LedgerCacheImpl.java | 7 ++
.../apache/bookkeeper/bookie/LedgerDescriptor.java | 2 +
.../bookkeeper/bookie/LedgerDescriptorImpl.java | 5 +
.../apache/bookkeeper/bookie/LedgerStorage.java | 10 ++
.../bookkeeper/bookie/SortedLedgerStorage.java | 7 ++
.../bookie/storage/ldb/DbLedgerStorage.java | 7 ++
.../ldb/SingleDirectoryDbLedgerStorage.java | 7 ++
.../bookie/storage/ldb/TransientLedgerInfo.java | 5 +
.../proto/LongPollReadEntryProcessorV3.java | 9 +-
.../bookkeeper/proto/ReadEntryProcessorV3.java | 4 +-
.../apache/bookkeeper/bookie/TestSyncThread.java | 6 ++
.../org/apache/bookkeeper/meta/GcLedgersTest.java | 6 ++
.../bookkeeper/meta/LedgerManagerTestCase.java | 6 ++
.../proto/LongPollReadEntryProcessorV3Test.java | 114 +++++++++++++++++++++
20 files changed, 227 insertions(+), 6 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 06ccd58..e253fdc 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -1409,6 +1409,13 @@ public class Bookie extends BookieCriticalThread {
return handle.waitForLastAddConfirmedUpdate(previousLAC, watcher);
}
+ public void cancelWaitForLastAddConfirmedUpdate(long ledgerId,
+ Watcher<LastAddConfirmedUpdateNotification> watcher)
+ throws IOException {
+ LedgerDescriptor handle = handles.getReadOnlyHandle(ledgerId);
+ handle.cancelWaitForLastAddConfirmedUpdate(watcher);
+ }
+
@VisibleForTesting
public LedgerStorage getLedgerStorage() {
return ledgerStorage;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
index f9bcd28..e8571be 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
@@ -150,6 +150,10 @@ class FileInfo extends Watchable<LastAddConfirmedUpdateNotification> {
return true;
}
+ synchronized void cancelWaitForLastAddConfirmedUpdate(Watcher<LastAddConfirmedUpdateNotification> watcher) {
+ deleteWatcher(watcher);
+ }
+
public boolean isClosed() {
return isClosed;
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
index 8d8f5cf..b728df3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
@@ -374,6 +374,19 @@ public class IndexPersistenceMgr {
}
}
+ void cancelWaitForLastAddConfirmedUpdate(long ledgerId,
+ Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException {
+ CachedFileInfo fi = null;
+ try {
+ fi = getFileInfo(ledgerId, null);
+ fi.cancelWaitForLastAddConfirmedUpdate(watcher);
+ } finally {
+ if (null != fi) {
+ fi.release();
+ }
+ }
+ }
+
long updateLastAddConfirmed(long ledgerId, long lac) throws IOException {
CachedFileInfo fi = null;
try {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index 59ea9ec..becb16b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -352,6 +352,12 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
return ledgerCache.waitForLastAddConfirmedUpdate(ledgerId, previousLAC, watcher);
}
+ @Override
+ public void cancelWaitForLastAddConfirmedUpdate(long ledgerId,
+ Watcher<LastAddConfirmedUpdateNotification> watcher)
+ throws IOException {
+ ledgerCache.cancelWaitForLastAddConfirmedUpdate(ledgerId, watcher);
+ }
@Override
public long addEntry(ByteBuf entry) throws IOException {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java
index f5e7c17..880d3c1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java
@@ -217,6 +217,12 @@ public class InterleavedStorageRegenerateIndexOp {
throw new UnsupportedOperationException();
}
@Override
+ public void cancelWaitForLastAddConfirmedUpdate(long ledgerId,
+ Watcher<LastAddConfirmedUpdateNotification> watcher)
+ throws IOException {
+ throw new UnsupportedOperationException();
+ }
+ @Override
public void deleteLedger(long ledgerId) throws IOException {
}
@Override
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
index 993bd0c..cae8bb4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
@@ -53,6 +53,8 @@ public interface LedgerCache extends Closeable {
boolean waitForLastAddConfirmedUpdate(long ledgerId,
long previousLAC,
Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException;
+ void cancelWaitForLastAddConfirmedUpdate(long ledgerId,
+ Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException;
void deleteLedger(long ledgerId) throws IOException;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
index 119a4f4..e6de2f9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
@@ -92,6 +92,13 @@ public class LedgerCacheImpl implements LedgerCache {
}
@Override
+ public void cancelWaitForLastAddConfirmedUpdate(long ledgerId,
+ Watcher<LastAddConfirmedUpdateNotification> watcher)
+ throws IOException {
+ indexPersistenceManager.cancelWaitForLastAddConfirmedUpdate(ledgerId, watcher);
+ }
+
+ @Override
public void putEntryOffset(long ledger, long entry, long offset) throws IOException {
indexPageManager.putEntryOffset(ledger, entry, offset);
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
index 23840be..74bc8b5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
@@ -80,6 +80,8 @@ public abstract class LedgerDescriptor {
abstract boolean waitForLastAddConfirmedUpdate(long previousLAC,
Watcher<LastAddConfirmedUpdateNotification> watcher)
throws IOException;
+ abstract void cancelWaitForLastAddConfirmedUpdate(Watcher<LastAddConfirmedUpdateNotification> watcher)
+ throws IOException;
abstract void setExplicitLac(ByteBuf entry) throws IOException;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
index ee91ed0..563494e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
@@ -168,4 +168,9 @@ public class LedgerDescriptorImpl extends LedgerDescriptor {
Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException {
return ledgerStorage.waitForLastAddConfirmedUpdate(ledgerId, previousLAC, watcher);
}
+
+ @Override
+ void cancelWaitForLastAddConfirmedUpdate(Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException {
+ ledgerStorage.cancelWaitForLastAddConfirmedUpdate(ledgerId, watcher);
+ }
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
index 1353e8b..2c47b28 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
@@ -137,6 +137,16 @@ public interface LedgerStorage {
Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException;
/**
+ * Cancel a previous wait for last add confirmed update.
+ *
+ * @param ledgerId The ledger being watched.
+ * @param watcher The watcher to cancel.
+ * @throws IOException
+ */
+ void cancelWaitForLastAddConfirmedUpdate(long ledgerId,
+ Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException;
+
+ /**
* Flushes all data in the storage. Once this is called,
* add data written to the LedgerStorage up until this point
* has been persisted to perminant storage
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
index 77653db..23e0716 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
@@ -228,6 +228,13 @@ public class SortedLedgerStorage
}
@Override
+ public void cancelWaitForLastAddConfirmedUpdate(long ledgerId,
+ Watcher<LastAddConfirmedUpdateNotification> watcher)
+ throws IOException {
+ interleavedLedgerStorage.cancelWaitForLastAddConfirmedUpdate(ledgerId, watcher);
+ }
+
+ @Override
public void checkpoint(final Checkpoint checkpoint) throws IOException {
long numBytesFlushed = memTable.flush(this, checkpoint);
interleavedLedgerStorage.getEntryLogger().prepareSortedLedgerStorageCheckpoint(numBytesFlushed);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
index e118349..a68cd1c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
@@ -210,6 +210,13 @@ public class DbLedgerStorage implements LedgerStorage {
}
@Override
+ public void cancelWaitForLastAddConfirmedUpdate(long ledgerId,
+ Watcher<LastAddConfirmedUpdateNotification> watcher)
+ throws IOException {
+ getLedgerSorage(ledgerId).cancelWaitForLastAddConfirmedUpdate(ledgerId, watcher);
+ }
+
+ @Override
public void flush() throws IOException {
for (LedgerStorage ls : ledgerStorageList) {
ls.flush();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
index 3289b3d..4c41235 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
@@ -757,6 +757,13 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage
}
@Override
+ public void cancelWaitForLastAddConfirmedUpdate(long ledgerId,
+ Watcher<LastAddConfirmedUpdateNotification> watcher)
+ throws IOException {
+ getOrAddLedgerInfo(ledgerId).cancelWaitForLastAddConfirmedUpdate(watcher);
+ }
+
+ @Override
public void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException {
getOrAddLedgerInfo(ledgerId).setExplicitLac(lac);
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/TransientLedgerInfo.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/TransientLedgerInfo.java
index 91f3fbb..17a8462 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/TransientLedgerInfo.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/TransientLedgerInfo.java
@@ -99,6 +99,11 @@ class TransientLedgerInfo extends Watchable<LastAddConfirmedUpdateNotification>
return true;
}
+ synchronized void cancelWaitForLastAddConfirmedUpdate(Watcher<LastAddConfirmedUpdateNotification> watcher)
+ throws IOException {
+ deleteWatcher(watcher);
+ }
+
public ByteBuf getExplicitLac() {
ByteBuf retLac = null;
synchronized (this) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
index 6f25d68..c04ecc9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
@@ -142,7 +142,7 @@ class LongPollReadEntryProcessorV3 extends ReadEntryProcessorV3 implements Watch
final boolean watched;
try {
- watched = requestProcessor.bookie.waitForLastAddConfirmedUpdate(ledgerId, previousLAC, this);
+ watched = requestProcessor.getBookie().waitForLastAddConfirmedUpdate(ledgerId, previousLAC, this);
} catch (Bookie.NoLedgerException e) {
logger.info("No ledger found while longpoll reading ledger {}, previous lac = {}.",
ledgerId, previousLAC);
@@ -163,9 +163,10 @@ class LongPollReadEntryProcessorV3 extends ReadEntryProcessorV3 implements Watch
}
synchronized (this) {
expirationTimerTask = requestTimer.newTimeout(timeout -> {
- // When the timeout expires just get whatever is the current
- // readLastConfirmed
- LongPollReadEntryProcessorV3.this.scheduleDeferredRead(true);
+ requestProcessor.getBookie().cancelWaitForLastAddConfirmedUpdate(ledgerId, this);
+ // When the timeout expires just get whatever is the current
+ // readLastConfirmed
+ LongPollReadEntryProcessorV3.this.scheduleDeferredRead(true);
}, readRequest.getTimeOut(), TimeUnit.MILLISECONDS);
}
return null;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
index 88b7662..a8ecc11 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
@@ -174,7 +174,7 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 {
boolean readLACPiggyBack,
Stopwatch startTimeSw)
throws IOException {
- ByteBuf entryBody = requestProcessor.bookie.readEntry(ledgerId, entryId);
+ ByteBuf entryBody = requestProcessor.getBookie().readEntry(ledgerId, entryId);
if (null != fenceResult) {
handleReadResultForFenceRead(entryBody, readResponseBuilder, entryId, startTimeSw);
return null;
@@ -184,7 +184,7 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 {
if (readLACPiggyBack) {
readResponseBuilder.setEntryId(entryId);
} else {
- long knownLAC = requestProcessor.bookie.readLastAddConfirmed(ledgerId);
+ long knownLAC = requestProcessor.getBookie().readLastAddConfirmed(ledgerId);
readResponseBuilder.setMaxLAC(knownLAC);
}
registerSuccessfulEvent(readStats, startTimeSw);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
index 707eb81..f57d388 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
@@ -356,6 +356,12 @@ public class TestSyncThread {
}
@Override
+ public void cancelWaitForLastAddConfirmedUpdate(long ledgerId,
+ Watcher<LastAddConfirmedUpdateNotification> watcher)
+ throws IOException {
+ }
+
+ @Override
public void checkpoint(Checkpoint checkpoint)
throws IOException {
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
index abc6614..37eacf4 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
@@ -678,5 +678,11 @@ public class GcLedgersTest extends LedgerManagerTestCase {
throws IOException {
return false;
}
+
+ @Override
+ public void cancelWaitForLastAddConfirmedUpdate(long ledgerId,
+ Watcher<LastAddConfirmedUpdateNotification> watcher)
+ throws IOException {
+ }
}
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
index cf9b3dd..a673d8c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
@@ -275,6 +275,12 @@ public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase {
}
@Override
+ public void cancelWaitForLastAddConfirmedUpdate(long ledgerId,
+ Watcher<LastAddConfirmedUpdateNotification> watcher)
+ throws IOException {
+ }
+
+ @Override
public void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException {
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java
new file mode 100644
index 0000000..393d5dd
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.proto;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.protobuf.ByteString;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.util.HashedWheelTimer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ProtocolVersion;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+
+
+
+/**
+ * Unit test {@link LongPollReadEntryProcessorV3}.
+ */
+public class LongPollReadEntryProcessorV3Test {
+ ExecutorService executor;
+ HashedWheelTimer timer;
+
+ @Before
+ public void setup() {
+ executor = Executors.newSingleThreadExecutor();
+ timer = new HashedWheelTimer();
+ }
+
+ @After
+ public void teardown() {
+ timer.stop();
+ executor.shutdownNow();
+ }
+
+ @Test
+ public void testWatchIsCancelledOnTimeout() throws Exception {
+ Request request = Request.newBuilder()
+ .setHeader(BKPacketHeader.newBuilder()
+ .setTxnId(System.currentTimeMillis())
+ .setVersion(ProtocolVersion.VERSION_THREE)
+ .setOperation(OperationType.READ_ENTRY)
+ .build())
+ .setReadRequest(ReadRequest.newBuilder()
+ .setLedgerId(10)
+ .setEntryId(1)
+ .setMasterKey(ByteString.copyFrom(new byte[0]))
+ .setPreviousLAC(0)
+ .setTimeOut(1)
+ .build())
+ .build();
+
+ Channel channel = mock(Channel.class);
+ Bookie bookie = mock(Bookie.class);
+
+ BookieRequestProcessor requestProcessor = mock(BookieRequestProcessor.class);
+ when(requestProcessor.getBookie()).thenReturn(bookie);
+ when(requestProcessor.getRequestStats()).thenReturn(new RequestStats(NullStatsLogger.INSTANCE));
+
+ when(bookie.waitForLastAddConfirmedUpdate(anyLong(), anyLong(), any()))
+ .thenReturn(true);
+ when(bookie.readEntry(anyLong(), anyLong())).thenReturn(Unpooled.buffer());
+ when(bookie.readLastAddConfirmed(anyLong())).thenReturn(Long.valueOf(1));
+
+ CompletableFuture<Void> cancelFuture = new CompletableFuture<>();
+
+ doAnswer(invocationOnMock -> {
+ cancelFuture.complete(null);
+ return null;
+ }).when(bookie).cancelWaitForLastAddConfirmedUpdate(anyLong(), any());
+
+ LongPollReadEntryProcessorV3 processor = new LongPollReadEntryProcessorV3(
+ request,
+ channel,
+ requestProcessor,
+ executor, executor, timer);
+
+ processor.run();
+
+ cancelFuture.get(10, TimeUnit.SECONDS);
+ }
+}