You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by mm...@apache.org on 2023/03/05 17:43:45 UTC
[bookkeeper] branch master updated: Pass BookieRequestHandler instead of Channel to the request processors (#3835)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new dfde3d6836 Pass BookieRequestHandler instead of Channel to the request processors (#3835)
dfde3d6836 is described below
commit dfde3d6836094cc39d8a3603cb268ca0f633350b
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sun Mar 5 09:43:37 2023 -0800
Pass BookieRequestHandler instead of Channel to the request processors (#3835)
* Pass BookieRequestHandler instead of Channel to the request processors
* Fixed checkstyle
* Fixed ForceLedgerProcessorV3Test
* Fixed TestBookieRequestProcessor
* Fixed line length
---
.../bookkeeper/processor/RequestProcessor.java | 4 +-
.../bookkeeper/proto/BookieRequestHandler.java | 11 ++-
.../bookkeeper/proto/BookieRequestProcessor.java | 94 ++++++++++++----------
.../bookkeeper/proto/ForceLedgerProcessorV3.java | 7 +-
.../bookkeeper/proto/GetBookieInfoProcessorV3.java | 5 +-
.../proto/GetListOfEntriesOfLedgerProcessorV3.java | 5 +-
.../proto/LongPollReadEntryProcessorV3.java | 5 +-
.../bookkeeper/proto/PacketProcessorBase.java | 28 ++++---
.../bookkeeper/proto/PacketProcessorBaseV3.java | 7 +-
.../bookkeeper/proto/ReadEntryProcessor.java | 14 ++--
.../bookkeeper/proto/ReadEntryProcessorV3.java | 11 +--
.../bookkeeper/proto/ReadLacProcessorV3.java | 5 +-
.../bookkeeper/proto/WriteEntryProcessor.java | 12 +--
.../bookkeeper/proto/WriteEntryProcessorV3.java | 13 +--
.../bookkeeper/proto/WriteLacProcessorV3.java | 8 +-
.../proto/ForceLedgerProcessorV3Test.java | 19 ++++-
.../proto/GetBookieInfoProcessorV3Test.java | 12 ++-
.../proto/LongPollReadEntryProcessorV3Test.java | 9 ++-
.../bookkeeper/proto/ReadEntryProcessorTest.java | 15 +++-
.../proto/TestBookieRequestProcessor.java | 14 +++-
.../bookkeeper/proto/WriteEntryProcessorTest.java | 45 ++++++-----
.../proto/WriteEntryProcessorV3Test.java | 13 ++-
22 files changed, 223 insertions(+), 133 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java
index 50b97e906b..5a4238e64d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java
@@ -20,7 +20,7 @@
*/
package org.apache.bookkeeper.processor;
-import io.netty.channel.Channel;
+import org.apache.bookkeeper.proto.BookieRequestHandler;
/**
* A request processor that is used for processing requests at bookie side.
@@ -41,5 +41,5 @@ public interface RequestProcessor extends AutoCloseable {
* @param channel
* channel received the given request <i>r</i>
*/
- void processRequest(Object r, Channel channel);
+ void processRequest(Object r, BookieRequestHandler channel);
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java
index 93be69cd37..c9d65a7317 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java
@@ -32,20 +32,27 @@ import org.slf4j.LoggerFactory;
/**
* Serverside handler for bookkeeper requests.
*/
-class BookieRequestHandler extends ChannelInboundHandlerAdapter {
+public class BookieRequestHandler extends ChannelInboundHandlerAdapter {
private static final Logger LOG = LoggerFactory.getLogger(BookieRequestHandler.class);
private final RequestProcessor requestProcessor;
private final ChannelGroup allChannels;
+ private ChannelHandlerContext ctx;
+
BookieRequestHandler(ServerConfiguration conf, RequestProcessor processor, ChannelGroup allChannels) {
this.requestProcessor = processor;
this.allChannels = allChannels;
}
+ public ChannelHandlerContext ctx() {
+ return ctx;
+ }
+
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
LOG.info("Channel connected {}", ctx.channel());
+ this.ctx = ctx;
super.channelActive(ctx);
}
@@ -75,6 +82,6 @@ class BookieRequestHandler extends ChannelInboundHandlerAdapter {
ctx.fireChannelRead(msg);
return;
}
- requestProcessor.processRequest(msg, ctx.channel());
+ requestProcessor.processRequest(msg, this);
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index f7f4eceda3..9237c451ed 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -299,7 +299,8 @@ public class BookieRequestProcessor implements RequestProcessor {
}
@Override
- public void processRequest(Object msg, Channel c) {
+ public void processRequest(Object msg, BookieRequestHandler requestHandler) {
+ Channel channel = requestHandler.ctx().channel();
// If we can decode this packet as a Request protobuf packet, process
// it as a version 3 packet. Else, just use the old protocol.
if (msg instanceof BookkeeperProtocol.Request) {
@@ -309,16 +310,16 @@ public class BookieRequestProcessor implements RequestProcessor {
BookkeeperProtocol.BKPacketHeader header = r.getHeader();
switch (header.getOperation()) {
case ADD_ENTRY:
- processAddRequestV3(r, c);
+ processAddRequestV3(r, requestHandler);
break;
case READ_ENTRY:
- processReadRequestV3(r, c);
+ processReadRequestV3(r, requestHandler);
break;
case FORCE_LEDGER:
- processForceLedgerRequestV3(r, c);
+ processForceLedgerRequestV3(r, requestHandler);
break;
case AUTH:
- LOG.info("Ignoring auth operation from client {}", c.remoteAddress());
+ LOG.info("Ignoring auth operation from client {}", channel.remoteAddress());
BookkeeperProtocol.AuthMessage message = BookkeeperProtocol.AuthMessage
.newBuilder()
.setAuthPluginName(AuthProviderFactoryFactory.AUTHENTICATION_DISABLED_PLUGIN_NAME)
@@ -328,29 +329,29 @@ public class BookieRequestProcessor implements RequestProcessor {
.newBuilder().setHeader(r.getHeader())
.setStatus(BookkeeperProtocol.StatusCode.EOK)
.setAuthResponse(message);
- c.writeAndFlush(authResponse.build());
+ channel.writeAndFlush(authResponse.build());
break;
case WRITE_LAC:
- processWriteLacRequestV3(r, c);
+ processWriteLacRequestV3(r, requestHandler);
break;
case READ_LAC:
- processReadLacRequestV3(r, c);
+ processReadLacRequestV3(r, requestHandler);
break;
case GET_BOOKIE_INFO:
- processGetBookieInfoRequestV3(r, c);
+ processGetBookieInfoRequestV3(r, requestHandler);
break;
case START_TLS:
- processStartTLSRequestV3(r, c);
+ processStartTLSRequestV3(r, requestHandler);
break;
case GET_LIST_OF_ENTRIES_OF_LEDGER:
- processGetListOfEntriesOfLedgerProcessorV3(r, c);
+ processGetListOfEntriesOfLedgerProcessorV3(r, requestHandler);
break;
default:
LOG.info("Unknown operation type {}", header.getOperation());
BookkeeperProtocol.Response.Builder response =
BookkeeperProtocol.Response.newBuilder().setHeader(r.getHeader())
.setStatus(BookkeeperProtocol.StatusCode.EBADREQ);
- c.writeAndFlush(response.build());
+ channel.writeAndFlush(response.build());
if (statsEnabled) {
bkStats.getOpStats(BKStats.STATS_UNKNOWN).incrementFailedOps();
}
@@ -365,26 +366,27 @@ public class BookieRequestProcessor implements RequestProcessor {
switch (r.getOpCode()) {
case BookieProtocol.ADDENTRY:
checkArgument(r instanceof BookieProtocol.ParsedAddRequest);
- processAddRequest((BookieProtocol.ParsedAddRequest) r, c);
+ processAddRequest((BookieProtocol.ParsedAddRequest) r, requestHandler);
break;
case BookieProtocol.READENTRY:
checkArgument(r instanceof BookieProtocol.ReadRequest);
- processReadRequest((BookieProtocol.ReadRequest) r, c);
+ processReadRequest((BookieProtocol.ReadRequest) r, requestHandler);
break;
case BookieProtocol.AUTH:
- LOG.info("Ignoring auth operation from client {}", c.remoteAddress());
+ LOG.info("Ignoring auth operation from client {}",
+ requestHandler.ctx().channel().remoteAddress());
BookkeeperProtocol.AuthMessage message = BookkeeperProtocol.AuthMessage
.newBuilder()
.setAuthPluginName(AuthProviderFactoryFactory.AUTHENTICATION_DISABLED_PLUGIN_NAME)
.setPayload(ByteString.copyFrom(AuthToken.NULL.getData()))
.build();
- c.writeAndFlush(new BookieProtocol.AuthResponse(
+ channel.writeAndFlush(new BookieProtocol.AuthResponse(
BookieProtocol.CURRENT_PROTOCOL_VERSION, message));
break;
default:
LOG.error("Unknown op type {}, sending error", r.getOpCode());
- c.writeAndFlush(ResponseBuilder.buildErrorResponse(BookieProtocol.EBADREQ, r));
+ channel.writeAndFlush(ResponseBuilder.buildErrorResponse(BookieProtocol.EBADREQ, r));
if (statsEnabled) {
bkStats.getOpStats(BKStats.STATS_UNKNOWN).incrementFailedOps();
}
@@ -402,8 +404,9 @@ public class BookieRequestProcessor implements RequestProcessor {
}
}
- private void processWriteLacRequestV3(final BookkeeperProtocol.Request r, final Channel c) {
- WriteLacProcessorV3 writeLac = new WriteLacProcessorV3(r, c, this);
+ private void processWriteLacRequestV3(final BookkeeperProtocol.Request r,
+ final BookieRequestHandler requestHandler) {
+ WriteLacProcessorV3 writeLac = new WriteLacProcessorV3(r, requestHandler, this);
if (null == writeThreadPool) {
writeLac.run();
} else {
@@ -411,8 +414,9 @@ public class BookieRequestProcessor implements RequestProcessor {
}
}
- private void processReadLacRequestV3(final BookkeeperProtocol.Request r, final Channel c) {
- ReadLacProcessorV3 readLac = new ReadLacProcessorV3(r, c, this);
+ private void processReadLacRequestV3(final BookkeeperProtocol.Request r,
+ final BookieRequestHandler requestHandler) {
+ ReadLacProcessorV3 readLac = new ReadLacProcessorV3(r, requestHandler, this);
if (null == readThreadPool) {
readLac.run();
} else {
@@ -420,8 +424,8 @@ public class BookieRequestProcessor implements RequestProcessor {
}
}
- private void processAddRequestV3(final BookkeeperProtocol.Request r, final Channel c) {
- WriteEntryProcessorV3 write = new WriteEntryProcessorV3(r, c, this);
+ private void processAddRequestV3(final BookkeeperProtocol.Request r, final BookieRequestHandler requestHandler) {
+ WriteEntryProcessorV3 write = new WriteEntryProcessorV3(r, requestHandler, this);
final OrderedExecutor threadPool;
if (RequestUtils.isHighPriority(r)) {
@@ -455,8 +459,9 @@ public class BookieRequestProcessor implements RequestProcessor {
}
}
- private void processForceLedgerRequestV3(final BookkeeperProtocol.Request r, final Channel c) {
- ForceLedgerProcessorV3 forceLedger = new ForceLedgerProcessorV3(r, c, this);
+ private void processForceLedgerRequestV3(final BookkeeperProtocol.Request r,
+ final BookieRequestHandler requestHandler) {
+ ForceLedgerProcessorV3 forceLedger = new ForceLedgerProcessorV3(r, requestHandler, this);
final OrderedExecutor threadPool;
if (RequestUtils.isHighPriority(r)) {
@@ -492,19 +497,20 @@ public class BookieRequestProcessor implements RequestProcessor {
}
}
- private void processReadRequestV3(final BookkeeperProtocol.Request r, final Channel c) {
- ExecutorService fenceThread = null == highPriorityThreadPool ? null : highPriorityThreadPool.chooseThread(c);
+ private void processReadRequestV3(final BookkeeperProtocol.Request r, final BookieRequestHandler requestHandler) {
+ ExecutorService fenceThread = null == highPriorityThreadPool ? null :
+ highPriorityThreadPool.chooseThread(requestHandler.ctx());
final ReadEntryProcessorV3 read;
final OrderedExecutor threadPool;
if (RequestUtils.isLongPollReadRequest(r.getReadRequest())) {
- ExecutorService lpThread = longPollThreadPool.chooseThread(c);
+ ExecutorService lpThread = longPollThreadPool.chooseThread(requestHandler.ctx());
- read = new LongPollReadEntryProcessorV3(r, c, this, fenceThread,
+ read = new LongPollReadEntryProcessorV3(r, requestHandler, this, fenceThread,
lpThread, requestTimer);
threadPool = longPollThreadPool;
} else {
- read = new ReadEntryProcessorV3(r, c, this, fenceThread);
+ read = new ReadEntryProcessorV3(r, requestHandler, this, fenceThread);
// If it's a high priority read (fencing or as part of recovery process), we want to make sure it
// gets executed as fast as possible, so bypass the normal readThreadPool
@@ -544,13 +550,16 @@ public class BookieRequestProcessor implements RequestProcessor {
}
}
- private void processStartTLSRequestV3(final BookkeeperProtocol.Request r, final Channel c) {
+ private void processStartTLSRequestV3(final BookkeeperProtocol.Request r,
+ final BookieRequestHandler requestHandler) {
BookkeeperProtocol.Response.Builder response = BookkeeperProtocol.Response.newBuilder();
BookkeeperProtocol.BKPacketHeader.Builder header = BookkeeperProtocol.BKPacketHeader.newBuilder();
header.setVersion(BookkeeperProtocol.ProtocolVersion.VERSION_THREE);
header.setOperation(r.getHeader().getOperation());
header.setTxnId(r.getHeader().getTxnId());
response.setHeader(header.build());
+ final Channel c = requestHandler.ctx().channel();
+
if (shFactory == null) {
LOG.error("Got StartTLS request but TLS not configured");
response.setStatus(BookkeeperProtocol.StatusCode.EBADREQ);
@@ -596,8 +605,9 @@ public class BookieRequestProcessor implements RequestProcessor {
}
}
- private void processGetBookieInfoRequestV3(final BookkeeperProtocol.Request r, final Channel c) {
- GetBookieInfoProcessorV3 getBookieInfo = new GetBookieInfoProcessorV3(r, c, this);
+ private void processGetBookieInfoRequestV3(final BookkeeperProtocol.Request r,
+ final BookieRequestHandler requestHandler) {
+ GetBookieInfoProcessorV3 getBookieInfo = new GetBookieInfoProcessorV3(r, requestHandler, this);
if (null == readThreadPool) {
getBookieInfo.run();
} else {
@@ -605,9 +615,10 @@ public class BookieRequestProcessor implements RequestProcessor {
}
}
- private void processGetListOfEntriesOfLedgerProcessorV3(final BookkeeperProtocol.Request r, final Channel c) {
- GetListOfEntriesOfLedgerProcessorV3 getListOfEntriesOfLedger = new GetListOfEntriesOfLedgerProcessorV3(r, c,
- this);
+ private void processGetListOfEntriesOfLedgerProcessorV3(final BookkeeperProtocol.Request r,
+ final BookieRequestHandler requestHandler) {
+ GetListOfEntriesOfLedgerProcessorV3 getListOfEntriesOfLedger =
+ new GetListOfEntriesOfLedgerProcessorV3(r, requestHandler, this);
if (null == readThreadPool) {
getListOfEntriesOfLedger.run();
} else {
@@ -615,8 +626,8 @@ public class BookieRequestProcessor implements RequestProcessor {
}
}
- private void processAddRequest(final BookieProtocol.ParsedAddRequest r, final Channel c) {
- WriteEntryProcessor write = WriteEntryProcessor.create(r, c, this);
+ private void processAddRequest(final BookieProtocol.ParsedAddRequest r, final BookieRequestHandler requestHandler) {
+ WriteEntryProcessor write = WriteEntryProcessor.create(r, requestHandler, this);
// If it's a high priority add (usually as part of recovery process), we want to make sure it gets
// executed as fast as possible, so bypass the normal writeThreadPool and execute in highPriorityThreadPool
@@ -647,10 +658,11 @@ public class BookieRequestProcessor implements RequestProcessor {
}
}
- private void processReadRequest(final BookieProtocol.ReadRequest r, final Channel c) {
+ private void processReadRequest(final BookieProtocol.ReadRequest r, final BookieRequestHandler requestHandler) {
ExecutorService fenceThreadPool =
- null == highPriorityThreadPool ? null : highPriorityThreadPool.chooseThread(c);
- ReadEntryProcessor read = ReadEntryProcessor.create(r, c, this, fenceThreadPool, throttleReadResponses);
+ null == highPriorityThreadPool ? null : highPriorityThreadPool.chooseThread(requestHandler.ctx());
+ ReadEntryProcessor read = ReadEntryProcessor.create(r, requestHandler,
+ this, fenceThreadPool, throttleReadResponses);
// If it's a high priority read (fencing or as part of recovery process), we want to make sure it
// gets executed as fast as possible, so bypass the normal readThreadPool
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3.java
index de73f95011..c1627579c6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3.java
@@ -22,7 +22,6 @@ package org.apache.bookkeeper.proto;
import static com.google.common.base.Preconditions.checkArgument;
-import io.netty.channel.Channel;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.bookie.BookieImpl;
import org.apache.bookkeeper.net.BookieId;
@@ -39,9 +38,9 @@ import org.slf4j.LoggerFactory;
class ForceLedgerProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(ForceLedgerProcessorV3.class);
- public ForceLedgerProcessorV3(Request request, Channel channel,
+ public ForceLedgerProcessorV3(Request request, BookieRequestHandler requestHandler,
BookieRequestProcessor requestProcessor) {
- super(request, channel, requestProcessor);
+ super(request, requestHandler, requestProcessor);
}
// Returns null if there is no exception thrown
@@ -98,7 +97,7 @@ class ForceLedgerProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
};
StatusCode status = null;
try {
- requestProcessor.getBookie().forceLedger(ledgerId, wcb, channel);
+ requestProcessor.getBookie().forceLedger(ledgerId, wcb, requestHandler);
status = StatusCode.EOK;
} catch (Throwable t) {
logger.error("Unexpected exception while forcing ledger {} : ", ledgerId, t);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java
index 6f24255586..8795263a5b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java
@@ -20,7 +20,6 @@
*/
package org.apache.bookkeeper.proto;
-import io.netty.channel.Channel;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.proto.BookkeeperProtocol.GetBookieInfoRequest;
@@ -38,9 +37,9 @@ import org.slf4j.LoggerFactory;
public class GetBookieInfoProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(GetBookieInfoProcessorV3.class);
- public GetBookieInfoProcessorV3(Request request, Channel channel,
+ public GetBookieInfoProcessorV3(Request request, BookieRequestHandler requestHandler,
BookieRequestProcessor requestProcessor) {
- super(request, channel, requestProcessor);
+ super(request, requestHandler, requestProcessor);
}
private GetBookieInfoResponse getGetBookieInfoResponse() {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetListOfEntriesOfLedgerProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetListOfEntriesOfLedgerProcessorV3.java
index 57f72208d8..90c850841d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetListOfEntriesOfLedgerProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetListOfEntriesOfLedgerProcessorV3.java
@@ -21,7 +21,6 @@
package org.apache.bookkeeper.proto;
import com.google.protobuf.ByteString;
-import io.netty.channel.Channel;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.bookie.Bookie;
@@ -44,9 +43,9 @@ public class GetListOfEntriesOfLedgerProcessorV3 extends PacketProcessorBaseV3 i
protected final GetListOfEntriesOfLedgerRequest getListOfEntriesOfLedgerRequest;
protected final long ledgerId;
- public GetListOfEntriesOfLedgerProcessorV3(Request request, Channel channel,
+ public GetListOfEntriesOfLedgerProcessorV3(Request request, BookieRequestHandler requestHandler,
BookieRequestProcessor requestProcessor) {
- super(request, channel, requestProcessor);
+ super(request, requestHandler, requestProcessor);
this.getListOfEntriesOfLedgerRequest = request.getGetListOfEntriesOfLedgerRequest();
this.ledgerId = getListOfEntriesOfLedgerRequest.getLedgerId();
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
index f61a2688ba..658c37c594 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
@@ -18,7 +18,6 @@
package org.apache.bookkeeper.proto;
import com.google.common.base.Stopwatch;
-import io.netty.channel.Channel;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import java.io.IOException;
@@ -55,12 +54,12 @@ class LongPollReadEntryProcessorV3 extends ReadEntryProcessorV3 implements Watch
private boolean shouldReadEntry = false;
LongPollReadEntryProcessorV3(Request request,
- Channel channel,
+ BookieRequestHandler requestHandler,
BookieRequestProcessor requestProcessor,
ExecutorService fenceThreadPool,
ExecutorService longPollThreadPool,
HashedWheelTimer requestTimer) {
- super(request, channel, requestProcessor, fenceThreadPool);
+ super(request, requestHandler, requestProcessor, fenceThreadPool);
this.previousLAC = readRequest.getPreviousLAC();
this.longPollThreadPool = longPollThreadPool;
this.requestTimer = requestTimer;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
index 8d079504b1..c9798156c2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
@@ -35,20 +35,20 @@ import org.slf4j.LoggerFactory;
abstract class PacketProcessorBase<T extends Request> implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(PacketProcessorBase.class);
T request;
- Channel channel;
+ BookieRequestHandler requestHandler;
BookieRequestProcessor requestProcessor;
long enqueueNanos;
- protected void init(T request, Channel channel, BookieRequestProcessor requestProcessor) {
+ protected void init(T request, BookieRequestHandler requestHandler, BookieRequestProcessor requestProcessor) {
this.request = request;
- this.channel = channel;
+ this.requestHandler = requestHandler;
this.requestProcessor = requestProcessor;
this.enqueueNanos = MathUtils.nowInNano();
}
protected void reset() {
request = null;
- channel = null;
+ requestHandler = null;
requestProcessor = null;
enqueueNanos = -1;
}
@@ -82,8 +82,10 @@ abstract class PacketProcessorBase<T extends Request> implements Runnable {
protected void sendResponse(int rc, Object response, OpStatsLogger statsLogger) {
final long writeNanos = MathUtils.nowInNano();
-
final long timeOut = requestProcessor.getWaitTimeoutOnBackpressureMillis();
+
+ Channel channel = requestHandler.ctx().channel();
+
if (timeOut >= 0 && !channel.isWritable()) {
if (!requestProcessor.isBlacklisted(channel)) {
synchronized (channel) {
@@ -120,18 +122,23 @@ abstract class PacketProcessorBase<T extends Request> implements Runnable {
}
if (channel.isActive()) {
- ChannelPromise promise = channel.newPromise().addListener(future -> {
- if (!future.isSuccess()) {
- logger.debug("Netty channel write exception. ", future.cause());
- }
- });
+ ChannelPromise promise = channel.voidPromise();
+ if (logger.isDebugEnabled()) {
+ promise = channel.newPromise().addListener(future -> {
+ if (!future.isSuccess()) {
+ logger.debug("Netty channel write exception. ", future.cause());
+ }
+ });
+ }
channel.writeAndFlush(response, promise);
} else {
if (response instanceof BookieProtocol.Response) {
((BookieProtocol.Response) response).release();
}
+ if (logger.isDebugEnabled()) {
logger.debug("Netty channel {} is inactive, "
+ "hence bypassing netty channel writeAndFlush during sendResponse", channel);
+ }
}
if (BookieProtocol.EOK == rc) {
statsLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS);
@@ -149,6 +156,7 @@ abstract class PacketProcessorBase<T extends Request> implements Runnable {
*/
protected void sendResponseAndWait(int rc, Object response, OpStatsLogger statsLogger) {
try {
+ Channel channel = requestHandler.ctx().channel();
ChannelFuture future = channel.writeAndFlush(response);
if (!channel.eventLoop().inEventLoop()) {
future.get();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
index ccc452ae60..dac454933c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
@@ -40,14 +40,14 @@ import org.apache.bookkeeper.util.StringUtils;
public abstract class PacketProcessorBaseV3 implements Runnable {
final Request request;
- final Channel channel;
+ final BookieRequestHandler requestHandler;
final BookieRequestProcessor requestProcessor;
final long enqueueNanos;
- public PacketProcessorBaseV3(Request request, Channel channel,
+ public PacketProcessorBaseV3(Request request, BookieRequestHandler requestHandler,
BookieRequestProcessor requestProcessor) {
this.request = request;
- this.channel = channel;
+ this.requestHandler = requestHandler;
this.requestProcessor = requestProcessor;
this.enqueueNanos = MathUtils.nowInNano();
}
@@ -55,6 +55,7 @@ public abstract class PacketProcessorBaseV3 implements Runnable {
protected void sendResponse(StatusCode code, Object response, OpStatsLogger statsLogger) {
final long writeNanos = MathUtils.nowInNano();
+ Channel channel = requestHandler.ctx().channel();
final long timeOut = requestProcessor.getWaitTimeoutOnBackpressureMillis();
if (timeOut >= 0 && !channel.isWritable()) {
if (!requestProcessor.isBlacklisted(channel)) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
index 71ee51f3fa..6935ca8be6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
@@ -18,7 +18,6 @@
package org.apache.bookkeeper.proto;
import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
import io.netty.util.Recycler;
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
@@ -44,15 +43,15 @@ class ReadEntryProcessor extends PacketProcessorBase<ReadRequest> {
private boolean throttleReadResponses;
public static ReadEntryProcessor create(ReadRequest request,
- Channel channel,
+ BookieRequestHandler requestHandler,
BookieRequestProcessor requestProcessor,
ExecutorService fenceThreadPool,
boolean throttleReadResponses) {
ReadEntryProcessor rep = RECYCLER.get();
- rep.init(request, channel, requestProcessor);
+ rep.init(request, requestHandler, requestProcessor);
rep.fenceThreadPool = fenceThreadPool;
rep.throttleReadResponses = throttleReadResponses;
- requestProcessor.onReadRequestStart(channel);
+ requestProcessor.onReadRequestStart(requestHandler.ctx().channel());
return rep;
}
@@ -61,9 +60,9 @@ class ReadEntryProcessor extends PacketProcessorBase<ReadRequest> {
if (LOG.isDebugEnabled()) {
LOG.debug("Received new read request: {}", request);
}
- if (!channel.isOpen()) {
+ if (!requestHandler.ctx().channel().isOpen()) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Dropping read request for closed channel: {}", channel);
+ LOG.debug("Dropping read request for closed channel: {}", requestHandler.ctx().channel());
}
requestProcessor.onReadRequestFinish();
return;
@@ -74,7 +73,8 @@ class ReadEntryProcessor extends PacketProcessorBase<ReadRequest> {
try {
CompletableFuture<Boolean> fenceResult = null;
if (request.isFencing()) {
- LOG.warn("Ledger: {} fenced by: {}", request.getLedgerId(), channel.remoteAddress());
+ LOG.warn("Ledger: {} fenced by: {}", request.getLedgerId(),
+ requestHandler.ctx().channel().remoteAddress());
if (request.hasMasterKey()) {
fenceResult = requestProcessor.getBookie().fenceLedger(request.getLedgerId(),
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
index 4672d592d8..999b8095db 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
@@ -57,11 +57,11 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 {
protected final OpStatsLogger reqStats;
public ReadEntryProcessorV3(Request request,
- Channel channel,
+ BookieRequestHandler requestHandler,
BookieRequestProcessor requestProcessor,
ExecutorService fenceThreadPool) {
- super(request, channel, requestProcessor);
- requestProcessor.onReadRequestStart(channel);
+ super(request, requestHandler, requestProcessor);
+ requestProcessor.onReadRequestStart(requestHandler.ctx().channel());
this.readRequest = request.getReadRequest();
this.ledgerId = readRequest.getLedgerId();
@@ -194,6 +194,7 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 {
protected ReadResponse getReadResponse() {
final Stopwatch startTimeSw = Stopwatch.createStarted();
+ final Channel channel = requestHandler.ctx().channel();
final ReadResponse.Builder readResponse = ReadResponse.newBuilder()
.setLedgerId(ledgerId)
@@ -249,9 +250,9 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 {
public void run() {
requestProcessor.getRequestStats().getReadEntrySchedulingDelayStats().registerSuccessfulEvent(
MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS);
- if (!channel.isOpen()) {
+ if (!requestHandler.ctx().channel().isOpen()) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Dropping read request for closed channel: {}", channel);
+ LOG.debug("Dropping read request for closed channel: {}", requestHandler.ctx().channel());
}
requestProcessor.onReadRequestFinish();
return;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
index abb7d61646..25fe9530ad 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
@@ -22,7 +22,6 @@ package org.apache.bookkeeper.proto;
import com.google.protobuf.ByteString;
import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
@@ -43,9 +42,9 @@ import org.slf4j.LoggerFactory;
class ReadLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(ReadLacProcessorV3.class);
- public ReadLacProcessorV3(Request request, Channel channel,
+ public ReadLacProcessorV3(Request request, BookieRequestHandler requestHandler,
BookieRequestProcessor requestProcessor) {
- super(request, channel, requestProcessor);
+ super(request, requestHandler, requestProcessor);
}
// Returns null if there is no exception thrown
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
index 531dfecccc..7e8f9fa768 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
@@ -19,7 +19,6 @@ package org.apache.bookkeeper.proto;
import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
import io.netty.util.Recycler;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
@@ -47,11 +46,11 @@ class WriteEntryProcessor extends PacketProcessorBase<ParsedAddRequest> implemen
startTimeNanos = -1L;
}
- public static WriteEntryProcessor create(ParsedAddRequest request, Channel channel,
+ public static WriteEntryProcessor create(ParsedAddRequest request, BookieRequestHandler requestHandler,
BookieRequestProcessor requestProcessor) {
WriteEntryProcessor wep = RECYCLER.get();
- wep.init(request, channel, requestProcessor);
- requestProcessor.onAddRequestStart(channel);
+ wep.init(request, requestHandler, requestProcessor);
+ requestProcessor.onAddRequestStart(requestHandler.ctx().channel());
return wep;
}
@@ -74,9 +73,10 @@ class WriteEntryProcessor extends PacketProcessorBase<ParsedAddRequest> implemen
ByteBuf addData = request.getData();
try {
if (request.isRecoveryAdd()) {
- requestProcessor.getBookie().recoveryAddEntry(addData, this, channel, request.getMasterKey());
+ requestProcessor.getBookie().recoveryAddEntry(addData, this, requestHandler, request.getMasterKey());
} else {
- requestProcessor.getBookie().addEntry(addData, false, this, channel, request.getMasterKey());
+ requestProcessor.getBookie().addEntry(addData, false, this,
+ requestHandler, request.getMasterKey());
}
} catch (OperationRejectedException e) {
requestProcessor.getRequestStats().getAddEntryRejectedCounter().inc();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
index 7d59858732..36aff7ad92 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
@@ -22,7 +22,6 @@ package org.apache.bookkeeper.proto;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
-import io.netty.channel.Channel;
import java.io.IOException;
import java.util.EnumSet;
import java.util.concurrent.TimeUnit;
@@ -43,10 +42,10 @@ import org.slf4j.LoggerFactory;
class WriteEntryProcessorV3 extends PacketProcessorBaseV3 {
private static final Logger logger = LoggerFactory.getLogger(WriteEntryProcessorV3.class);
- public WriteEntryProcessorV3(Request request, Channel channel,
+ public WriteEntryProcessorV3(Request request, BookieRequestHandler requestHandler,
BookieRequestProcessor requestProcessor) {
- super(request, channel, requestProcessor);
- requestProcessor.onAddRequestStart(channel);
+ super(request, requestHandler, requestProcessor);
+ requestProcessor.onAddRequestStart(requestHandler.ctx().channel());
}
// Returns null if there is no exception thrown
@@ -118,9 +117,11 @@ class WriteEntryProcessorV3 extends PacketProcessorBaseV3 {
ByteBuf entryToAdd = Unpooled.wrappedBuffer(addRequest.getBody().asReadOnlyByteBuffer());
try {
if (RequestUtils.hasFlag(addRequest, AddRequest.Flag.RECOVERY_ADD)) {
- requestProcessor.getBookie().recoveryAddEntry(entryToAdd, wcb, channel, masterKey);
+ requestProcessor.getBookie().recoveryAddEntry(entryToAdd, wcb,
+ requestHandler.ctx().channel(), masterKey);
} else {
- requestProcessor.getBookie().addEntry(entryToAdd, ackBeforeSync, wcb, channel, masterKey);
+ requestProcessor.getBookie().addEntry(entryToAdd, ackBeforeSync, wcb,
+ requestHandler.ctx().channel(), masterKey);
}
status = StatusCode.EOK;
} catch (OperationRejectedException e) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
index d8a427c905..293cea3bb0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
@@ -21,7 +21,6 @@
package org.apache.bookkeeper.proto;
import io.netty.buffer.Unpooled;
-import io.netty.channel.Channel;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
@@ -40,9 +39,9 @@ import org.slf4j.LoggerFactory;
class WriteLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(WriteLacProcessorV3.class);
- public WriteLacProcessorV3(Request request, Channel channel,
+ public WriteLacProcessorV3(Request request, BookieRequestHandler requestHandler,
BookieRequestProcessor requestProcessor) {
- super(request, channel, requestProcessor);
+ super(request, requestHandler, requestProcessor);
}
// Returns null if there is no exception thrown
@@ -103,7 +102,8 @@ class WriteLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
byte[] masterKey = writeLacRequest.getMasterKey().toByteArray();
try {
- requestProcessor.bookie.setExplicitLac(Unpooled.wrappedBuffer(lacToAdd), writeCallback, channel, masterKey);
+ requestProcessor.bookie.setExplicitLac(Unpooled.wrappedBuffer(lacToAdd),
+ writeCallback, requestHandler, masterKey);
status = StatusCode.EOK;
} catch (IOException e) {
logger.error("Error saving lac {} for ledger:{}",
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
index 54460770f1..3bc9cbee42 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
@@ -30,6 +30,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import java.util.concurrent.CountDownLatch;
@@ -55,6 +56,8 @@ public class ForceLedgerProcessorV3Test {
private Request request;
private ForceLedgerProcessorV3 processor;
+
+ private BookieRequestHandler requestHandler;
private Channel channel;
private BookieRequestProcessor requestProcessor;
private Bookie bookie;
@@ -71,17 +74,25 @@ public class ForceLedgerProcessorV3Test {
.setLedgerId(System.currentTimeMillis())
.build())
.build();
+
+
channel = mock(Channel.class);
when(channel.isOpen()).thenReturn(true);
+ when(channel.isActive()).thenReturn(true);
+
+ requestHandler = mock(BookieRequestHandler.class);
+ ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+ when(ctx.channel()).thenReturn(channel);
+ when(requestHandler.ctx()).thenReturn(ctx);
+
bookie = mock(Bookie.class);
requestProcessor = mock(BookieRequestProcessor.class);
when(requestProcessor.getBookie()).thenReturn(bookie);
when(requestProcessor.getWaitTimeoutOnBackpressureMillis()).thenReturn(-1L);
when(requestProcessor.getRequestStats()).thenReturn(new RequestStats(NullStatsLogger.INSTANCE));
- when(channel.isActive()).thenReturn(true);
processor = new ForceLedgerProcessorV3(
request,
- channel,
+ requestHandler,
requestProcessor);
}
@@ -102,7 +113,7 @@ public class ForceLedgerProcessorV3Test {
}).when(bookie).forceLedger(
eq(request.getForceLedgerRequest().getLedgerId()),
any(WriteCallback.class),
- same(channel));
+ same(requestHandler));
ChannelPromise promise = new DefaultChannelPromise(channel);
AtomicReference<Object> writtenObject = new AtomicReference<>();
@@ -117,7 +128,7 @@ public class ForceLedgerProcessorV3Test {
verify(bookie, times(1))
.forceLedger(eq(request.getForceLedgerRequest().getLedgerId()),
- any(WriteCallback.class), same(channel));
+ any(WriteCallback.class), same(requestHandler));
verify(channel, times(1)).writeAndFlush(any(Response.class));
latch.await();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3Test.java
index e294a55ccf..5e986515e9 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3Test.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3Test.java
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.bookie.Bookie;
@@ -38,6 +39,7 @@ import org.junit.Test;
*/
public class GetBookieInfoProcessorV3Test {
+ private BookieRequestHandler requestHandler;
private Channel channel;
private BookieRequestProcessor requestProcessor;
private Bookie bookie;
@@ -55,9 +57,17 @@ public class GetBookieInfoProcessorV3Test {
requestProcessor = mock(BookieRequestProcessor.class);
bookie = mock(Bookie.class);
when(requestProcessor.getBookie()).thenReturn(bookie);
+
+ requestHandler = mock(BookieRequestHandler.class);
+
channel = mock(Channel.class);
when(channel.isOpen()).thenReturn(true);
when(channel.isActive()).thenReturn(true);
+
+ ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+ when(ctx.channel()).thenReturn(channel);
+ when(requestHandler.ctx()).thenReturn(ctx);
+
when(requestProcessor.getRequestStats()).thenReturn(requestStats);
when(requestProcessor.getRequestStats().getGetBookieInfoStats())
.thenReturn(getBookieInfoStats);
@@ -85,7 +95,7 @@ public class GetBookieInfoProcessorV3Test {
.build();
GetBookieInfoProcessorV3 getBookieInfo = new GetBookieInfoProcessorV3(
- getBookieInfoRequest, channel, requestProcessor);
+ getBookieInfoRequest, requestHandler, requestProcessor);
getBookieInfo.run();
// get BookieInfo succeeded.
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java
index 9eae1b9c0d..33a4fdc829 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java
@@ -27,6 +27,7 @@ import static org.mockito.Mockito.when;
import com.google.protobuf.ByteString;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
import io.netty.util.HashedWheelTimer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@@ -84,6 +85,12 @@ public class LongPollReadEntryProcessorV3Test {
Channel channel = mock(Channel.class);
when(channel.isOpen()).thenReturn(true);
+
+ BookieRequestHandler requestHandler = mock(BookieRequestHandler.class);
+ ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+ when(ctx.channel()).thenReturn(channel);
+ when(requestHandler.ctx()).thenReturn(ctx);
+
Bookie bookie = mock(Bookie.class);
BookieRequestProcessor requestProcessor = mock(BookieRequestProcessor.class);
@@ -104,7 +111,7 @@ public class LongPollReadEntryProcessorV3Test {
LongPollReadEntryProcessorV3 processor = new LongPollReadEntryProcessorV3(
request,
- channel,
+ requestHandler,
requestProcessor,
executor, executor, timer);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java
index f3e9764e71..91e100d809 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java
@@ -29,6 +29,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import io.netty.channel.EventLoop;
@@ -53,6 +54,7 @@ import org.junit.Test;
public class ReadEntryProcessorTest {
private Channel channel;
+ private BookieRequestHandler requestHandler;
private BookieRequestProcessor requestProcessor;
private Bookie bookie;
@@ -60,6 +62,12 @@ public class ReadEntryProcessorTest {
public void setup() throws IOException, BookieException {
channel = mock(Channel.class);
when(channel.isOpen()).thenReturn(true);
+
+ requestHandler = mock(BookieRequestHandler.class);
+ ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+ when(ctx.channel()).thenReturn(channel);
+ when(requestHandler.ctx()).thenReturn(ctx);
+
bookie = mock(Bookie.class);
requestProcessor = mock(BookieRequestProcessor.class);
when(requestProcessor.getBookie()).thenReturn(bookie);
@@ -101,7 +109,8 @@ public class ReadEntryProcessorTest {
long ledgerId = System.currentTimeMillis();
ReadRequest request = new ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId,
1, BookieProtocol.FLAG_DO_FENCING, new byte[]{});
- ReadEntryProcessor processor = ReadEntryProcessor.create(request, channel, requestProcessor, service, true);
+ ReadEntryProcessor processor = ReadEntryProcessor.create(
+ request, requestHandler, requestProcessor, service, true);
processor.run();
fenceResult.complete(result);
@@ -143,7 +152,7 @@ public class ReadEntryProcessorTest {
long ledgerId = System.currentTimeMillis();
ReadRequest request = new ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId,
1, BookieProtocol.FLAG_DO_FENCING, new byte[]{});
- ReadEntryProcessor processor = ReadEntryProcessor.create(request, channel, requestProcessor, null, true);
+ ReadEntryProcessor processor = ReadEntryProcessor.create(request, requestHandler, requestProcessor, null, true);
fenceResult.complete(result);
processor.run();
@@ -173,7 +182,7 @@ public class ReadEntryProcessorTest {
long ledgerId = System.currentTimeMillis();
ReadRequest request = new ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId,
1, (short) 0, new byte[]{});
- ReadEntryProcessor processor = ReadEntryProcessor.create(request, channel, requestProcessor, null, true);
+ ReadEntryProcessor processor = ReadEntryProcessor.create(request, requestHandler, requestProcessor, null, true);
processor.run();
latch.await();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java
index b88f819949..d5ee8f5275 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java
@@ -27,9 +27,12 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import com.google.protobuf.ByteString;
import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
@@ -137,7 +140,14 @@ public class TestBookieRequestProcessor {
.setBody(ByteString.copyFrom("entrydata".getBytes())).build();
Request request = Request.newBuilder().setHeader(header).setAddRequest(addRequest).build();
- WriteEntryProcessorV3 writeEntryProcessorV3 = new WriteEntryProcessorV3(request, null, requestProcessor);
+ Channel channel = mock(Channel.class);
+ ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+ when(ctx.channel()).thenReturn(channel);
+ BookieRequestHandler requestHandler = mock(BookieRequestHandler.class);
+ when(requestHandler.ctx()).thenReturn(ctx);
+
+ WriteEntryProcessorV3 writeEntryProcessorV3 = new WriteEntryProcessorV3(request, requestHandler,
+ requestProcessor);
String toString = writeEntryProcessorV3.toString();
assertFalse("writeEntryProcessorV3's toString should have filtered out body", toString.contains("body"));
assertFalse("writeEntryProcessorV3's toString should have filtered out masterKey",
@@ -155,7 +165,7 @@ public class TestBookieRequestProcessor {
.setBody(ByteString.copyFrom("entrydata".getBytes())).setFlag(Flag.RECOVERY_ADD).setWriteFlags(0)
.build();
request = Request.newBuilder().setHeader(header).setAddRequest(addRequest).build();
- writeEntryProcessorV3 = new WriteEntryProcessorV3(request, null, requestProcessor);
+ writeEntryProcessorV3 = new WriteEntryProcessorV3(request, requestHandler, requestProcessor);
toString = writeEntryProcessorV3.toString();
assertFalse("writeEntryProcessorV3's toString should have filtered out body", toString.contains("body"));
assertFalse("writeEntryProcessorV3's toString should have filtered out masterKey",
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
index 4479ea4a7a..8fc3a89f00 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
@@ -33,6 +33,7 @@ import static org.mockito.Mockito.when;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import java.util.concurrent.CountDownLatch;
@@ -53,6 +54,8 @@ public class WriteEntryProcessorTest {
private ParsedAddRequest request;
private WriteEntryProcessor processor;
private Channel channel;
+ private ChannelHandlerContext ctx;
+ private BookieRequestHandler requestHandler;
private BookieRequestProcessor requestProcessor;
private Bookie bookie;
@@ -67,6 +70,12 @@ public class WriteEntryProcessorTest {
Unpooled.wrappedBuffer("test-entry-data".getBytes(UTF_8)));
channel = mock(Channel.class);
when(channel.isOpen()).thenReturn(true);
+
+ requestHandler = mock(BookieRequestHandler.class);
+ ctx = mock(ChannelHandlerContext.class);
+ when(ctx.channel()).thenReturn(channel);
+ when(requestHandler.ctx()).thenReturn(ctx);
+
bookie = mock(Bookie.class);
requestProcessor = mock(BookieRequestProcessor.class);
when(requestProcessor.getBookie()).thenReturn(bookie);
@@ -75,7 +84,7 @@ public class WriteEntryProcessorTest {
when(channel.isWritable()).thenReturn(true);
processor = WriteEntryProcessor.create(
request,
- channel,
+ requestHandler,
requestProcessor);
}
@@ -93,7 +102,7 @@ public class WriteEntryProcessorTest {
Unpooled.wrappedBuffer("test-entry-data".getBytes(UTF_8)));
processor = WriteEntryProcessor.create(
request,
- channel,
+ requestHandler,
requestProcessor);
}
@@ -110,11 +119,11 @@ public class WriteEntryProcessorTest {
writtenObject.set(invocationOnMock.getArgument(0));
latch.countDown();
return null;
- }).when(channel).writeAndFlush(any(), any(ChannelPromise.class));
+ }).when(channel).writeAndFlush(any(), any());
processor.run();
- verify(channel, times(1)).writeAndFlush(any(), any(ChannelPromise.class));
+ verify(channel, times(1)).writeAndFlush(any(), any());
latch.await();
@@ -142,11 +151,11 @@ public class WriteEntryProcessorTest {
writtenObject.set(invocationOnMock.getArgument(0));
latch.countDown();
return null;
- }).when(channel).writeAndFlush(any(), any(ChannelPromise.class));
+ }).when(channel).writeAndFlush(any(), any());
processor.run();
- verify(channel, times(1)).writeAndFlush(any(), any(ChannelPromise.class));
+ verify(channel, times(1)).writeAndFlush(any(), any());
latch.await();
@@ -170,7 +179,7 @@ public class WriteEntryProcessorTest {
doAnswer(invocationOnMock -> {
processor.writeComplete(0, request.ledgerId, request.entryId, null, null);
return null;
- }).when(bookie).addEntry(any(ByteBuf.class), eq(false), same(processor), same(channel), eq(new byte[0]));
+ }).when(bookie).addEntry(any(ByteBuf.class), eq(false), same(processor), same(requestHandler), eq(new byte[0]));
AtomicReference<Object> writtenObject = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
@@ -178,13 +187,13 @@ public class WriteEntryProcessorTest {
writtenObject.set(invocationOnMock.getArgument(0));
latch.countDown();
return null;
- }).when(channel).writeAndFlush(any(), any(ChannelPromise.class));
+ }).when(channel).writeAndFlush(any(), any());
processor.run();
verify(bookie, times(1))
- .addEntry(any(ByteBuf.class), eq(false), same(processor), same(channel), eq(new byte[0]));
- verify(channel, times(1)).writeAndFlush(any(), any(ChannelPromise.class));
+ .addEntry(any(ByteBuf.class), eq(false), same(processor), same(requestHandler), eq(new byte[0]));
+ verify(channel, times(1)).writeAndFlush(any(), any());
latch.await();
@@ -205,7 +214,7 @@ public class WriteEntryProcessorTest {
doAnswer(invocationOnMock -> {
processor.writeComplete(0, request.ledgerId, request.entryId, null, null);
return null;
- }).when(bookie).addEntry(any(ByteBuf.class), eq(false), same(processor), same(channel), eq(new byte[0]));
+ }).when(bookie).addEntry(any(ByteBuf.class), eq(false), same(processor), same(requestHandler), eq(new byte[0]));
AtomicReference<Object> writtenObject = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
@@ -213,13 +222,13 @@ public class WriteEntryProcessorTest {
writtenObject.set(invocationOnMock.getArgument(0));
latch.countDown();
return null;
- }).when(channel).writeAndFlush(any(), any(ChannelPromise.class));
+ }).when(channel).writeAndFlush(any(), any());
processor.run();
verify(bookie, times(1))
- .addEntry(any(ByteBuf.class), eq(false), same(processor), same(channel), eq(new byte[0]));
- verify(channel, times(1)).writeAndFlush(any(), any(ChannelPromise.class));
+ .addEntry(any(ByteBuf.class), eq(false), same(processor), same(requestHandler), eq(new byte[0]));
+ verify(channel, times(1)).writeAndFlush(any(), any());
latch.await();
@@ -241,7 +250,7 @@ public class WriteEntryProcessorTest {
doAnswer(invocationOnMock -> {
throw new BookieException.OperationRejectedException();
}).when(bookie).addEntry(
- any(ByteBuf.class), eq(false), same(processor), same(channel), eq(new byte[0]));
+ any(ByteBuf.class), eq(false), same(processor), same(requestHandler), eq(new byte[0]));
ChannelPromise promise = new DefaultChannelPromise(channel);
AtomicReference<Object> writtenObject = new AtomicReference<>();
@@ -250,13 +259,13 @@ public class WriteEntryProcessorTest {
writtenObject.set(invocationOnMock.getArgument(0));
latch.countDown();
return promise;
- }).when(channel).writeAndFlush(any(), any(ChannelPromise.class));
+ }).when(channel).writeAndFlush(any(), any());
processor.run();
verify(bookie, times(1))
- .addEntry(any(ByteBuf.class), eq(false), same(processor), same(channel), eq(new byte[0]));
- verify(channel, times(1)).writeAndFlush(any(Response.class), any(ChannelPromise.class));
+ .addEntry(any(ByteBuf.class), eq(false), same(processor), same(requestHandler), eq(new byte[0]));
+ verify(channel, times(1)).writeAndFlush(any(Response.class), any());
latch.await();
assertTrue(writtenObject.get() instanceof Response);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
index 2175b26a5b..40eb662cc5 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
@@ -32,6 +32,7 @@ import static org.mockito.Mockito.when;
import com.google.protobuf.ByteString;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import java.util.concurrent.CountDownLatch;
@@ -57,7 +58,9 @@ public class WriteEntryProcessorV3Test {
private Request request;
private WriteEntryProcessorV3 processor;
+
private Channel channel;
+ private BookieRequestHandler requestHandler;
private BookieRequestProcessor requestProcessor;
private Bookie bookie;
@@ -78,6 +81,12 @@ public class WriteEntryProcessorV3Test {
.build();
channel = mock(Channel.class);
when(channel.isOpen()).thenReturn(true);
+
+ requestHandler = mock(BookieRequestHandler.class);
+ ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+ when(ctx.channel()).thenReturn(channel);
+ when(requestHandler.ctx()).thenReturn(ctx);
+
bookie = mock(Bookie.class);
requestProcessor = mock(BookieRequestProcessor.class);
when(requestProcessor.getBookie()).thenReturn(bookie);
@@ -86,7 +95,7 @@ public class WriteEntryProcessorV3Test {
when(channel.isActive()).thenReturn(true);
processor = new WriteEntryProcessorV3(
request,
- channel,
+ requestHandler,
requestProcessor);
}
@@ -99,7 +108,7 @@ public class WriteEntryProcessorV3Test {
processor = new WriteEntryProcessorV3(
request,
- channel,
+ requestHandler,
requestProcessor);
}