You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by yo...@apache.org on 2022/06/13 07:37:51 UTC
[bookkeeper] 01/02: Revert "Testing the memory limit"
This is an automated email from the ASF dual-hosted git repository.
yong pushed a commit to branch branch-4.14
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit 4b446619872ead696f01ea092e590a8a0e582134
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Mon Jun 13 15:37:15 2022 +0800
Revert "Testing the memory limit"
This reverts commit dab1e981bc1497341f108e55de2dc3c5f1447615.
---
.../bookkeeper/conf/ClientConfiguration.java | 22 -------
.../apache/bookkeeper/proto/BookieClientImpl.java | 75 ++--------------------
.../bookkeeper/proto/BookieRequestProcessor.java | 18 ------
.../proto/BookkeeperInternalCallbacks.java | 4 --
.../bookkeeper/proto/PerChannelBookieClient.java | 24 ++-----
.../apache/bookkeeper/test/BookieClientTest.java | 16 -----
6 files changed, 11 insertions(+), 148 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index e900388485..80c2ad4b9d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -198,10 +198,6 @@ public class ClientConfiguration extends AbstractConfiguration<ClientConfigurati
protected static final String CLIENT_CONNECT_BOOKIE_UNAVAILABLE_LOG_THROTTLING =
"clientConnectBookieUnavailableLogThrottling";
- // client memory limit options
- protected static final String CLIENT_MEMORY_LIMIT_ENABLED = "clientMemoryLimitEnabled";
- protected static final String CLIENT_MEMORY_LIMIT_BY_BYTES = "clientMemoryLimitByBytes";
-
/**
* Construct a default client-side configuration.
*/
@@ -2012,24 +2008,6 @@ public class ClientConfiguration extends AbstractConfiguration<ClientConfigurati
return getLong(CLIENT_CONNECT_BOOKIE_UNAVAILABLE_LOG_THROTTLING, 5_000L);
}
- public ClientConfiguration setClientMemoryLimitEnabled(boolean enabled) {
- setProperty(CLIENT_MEMORY_LIMIT_ENABLED, enabled);
- return this;
- }
-
- public boolean getClientMemoryLimitEnabled() {
- return getBoolean(CLIENT_MEMORY_LIMIT_ENABLED, false);
- }
-
- public ClientConfiguration setClientMemoryLimitByBytes(int bytes) {
- setProperty(CLIENT_MEMORY_LIMIT_BY_BYTES, bytes);
- return this;
- }
-
- public int getClientMemoryLimitByBytes() {
- return getInt(CLIENT_MEMORY_LIMIT_BY_BYTES, 64 * 1024 * 1024);
- }
-
@Override
protected ClientConfiguration getThis() {
return this;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
index b9493f43b2..03e3c068e4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
@@ -38,7 +38,6 @@ import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.util.EnumSet;
import java.util.List;
-import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
@@ -53,7 +52,6 @@ import org.apache.bookkeeper.auth.ClientAuthProvider;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
import org.apache.bookkeeper.client.api.WriteFlag;
-import org.apache.bookkeeper.common.util.MemoryLimitController;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.SafeRunnable;
import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -67,7 +65,6 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteAndFlushCallback;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.tls.SecurityException;
@@ -106,7 +103,6 @@ public class BookieClientImpl implements BookieClient, PerChannelBookieClientFac
private final BookieAddressResolver bookieAddressResolver;
private final long bookieErrorThresholdPerInterval;
- private Optional<MemoryLimitController> memoryLimitController;
public BookieClientImpl(ClientConfiguration conf, EventLoopGroup eventLoopGroup,
ByteBufAllocator allocator,
@@ -141,16 +137,6 @@ public class BookieClientImpl implements BookieClient, PerChannelBookieClientFac
} else {
this.timeoutFuture = null;
}
-
- if (conf.getClientMemoryLimitEnabled()) {
- memoryLimitController = Optional.of(new MemoryLimitController(conf.getClientMemoryLimitByBytes()));
- } else {
- memoryLimitController = Optional.empty();
- }
- }
-
- public Optional<MemoryLimitController> getMemoryLimitController() {
- return memoryLimitController;
}
private int getRc(int rc) {
@@ -339,33 +325,11 @@ public class BookieClientImpl implements BookieClient, PerChannelBookieClientFac
// Retain the buffer, since the connection could be obtained after
// the PendingApp might have already failed
toSend.retain();
- Optional<WriteAndFlushCallback> callback = Optional.empty();
- try {
- callback = setMemoryLimit(entryId, toSend.readableBytes());
- } catch (InterruptedException e) {
- completeAdd(getRc(BKException.Code.IllegalOpException), ledgerId, entryId, addr, cb, ctx);
- LOG.error("Failed to set memory limit when adding entry {}:{}", ledgerId, entryId, e);
- return;
- }
- client.obtain(ChannelReadyForAddEntryCallback.create(
- this, toSend, ledgerId, entryId, addr,
- ctx, cb, options, masterKey, allowFastFail, writeFlags, callback),
- ledgerId);
- }
- private Optional<WriteAndFlushCallback> setMemoryLimit(final long entryId, final long entrySize) throws InterruptedException {
- if (getMemoryLimitController().isPresent()) {
- MemoryLimitController mlc = getMemoryLimitController().get();
- mlc.reserveMemory(entrySize);
- LOG.debug("Acquire memory size {} for entry {}, current usage {} ", entrySize, entryId,
- mlc.currentUsage());
- WriteAndFlushCallbackImpl callback = new WriteAndFlushCallbackImpl();
- callback.setBookieClient(this);
- callback.setSize(entrySize);
- callback.setEntryId(entryId);
- return Optional.of(callback);
- }
- return Optional.empty();
+ client.obtain(ChannelReadyForAddEntryCallback.create(
+ this, toSend, ledgerId, entryId, addr,
+ ctx, cb, options, masterKey, allowFastFail, writeFlags),
+ ledgerId);
}
@Override
@@ -414,31 +378,6 @@ public class BookieClientImpl implements BookieClient, PerChannelBookieClientFac
}
}
- private static class WriteAndFlushCallbackImpl implements WriteAndFlushCallback {
-
- private BookieClientImpl bookieClient;
- private long size;
- private long entryId;
-
- public void setBookieClient(BookieClientImpl bookieClient) {
- this.bookieClient = bookieClient;
- }
-
- public void setSize(long size) {
- this.size = size;
- }
-
- public void setEntryId(long entryId) {
- this.entryId = entryId;
- }
-
- @Override
- public void complete() {
- bookieClient.getMemoryLimitController().get().releaseMemory(size);
- LOG.debug("Release memory size {} for entry {}", size, entryId);
- }
- }
-
private static class ChannelReadyForAddEntryCallback
implements GenericCallback<PerChannelBookieClient> {
private final Handle<ChannelReadyForAddEntryCallback> recyclerHandle;
@@ -454,13 +393,12 @@ public class BookieClientImpl implements BookieClient, PerChannelBookieClientFac
private byte[] masterKey;
private boolean allowFastFail;
private EnumSet<WriteFlag> writeFlags;
- private Optional<WriteAndFlushCallback> writeAndFlushCallback;
static ChannelReadyForAddEntryCallback create(
BookieClientImpl bookieClient, ByteBufList toSend, long ledgerId,
long entryId, BookieId addr, Object ctx,
WriteCallback cb, int options, byte[] masterKey, boolean allowFastFail,
- EnumSet<WriteFlag> writeFlags, Optional<WriteAndFlushCallback> writeAndFlushCallback) {
+ EnumSet<WriteFlag> writeFlags) {
ChannelReadyForAddEntryCallback callback = RECYCLER.get();
callback.bookieClient = bookieClient;
callback.toSend = toSend;
@@ -473,7 +411,6 @@ public class BookieClientImpl implements BookieClient, PerChannelBookieClientFac
callback.masterKey = masterKey;
callback.allowFastFail = allowFastFail;
callback.writeFlags = writeFlags;
- callback.writeAndFlushCallback = writeAndFlushCallback;
return callback;
}
@@ -484,7 +421,7 @@ public class BookieClientImpl implements BookieClient, PerChannelBookieClientFac
bookieClient.completeAdd(rc, ledgerId, entryId, addr, cb, ctx);
} else {
pcbc.addEntry(ledgerId, masterKey, entryId,
- toSend, cb, ctx, options, allowFastFail, writeFlags, writeAndFlushCallback);
+ toSend, cb, ctx, options, allowFastFail, writeFlags);
}
toSend.release();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index e36b579942..67f83e9ce5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -298,8 +298,6 @@ public class BookieRequestProcessor implements RequestProcessor {
}
}
- private static final String SLEEP_TIME = System.getenv("SLEEP_TIME_FOR_TESTING");
-
@Override
public void processRequest(Object msg, Channel c) {
// If we can decode this packet as a Request protobuf packet, process
@@ -311,14 +309,6 @@ public class BookieRequestProcessor implements RequestProcessor {
BookkeeperProtocol.BKPacketHeader header = r.getHeader();
switch (header.getOperation()) {
case ADD_ENTRY:
- LOG.info("Using v3 protocol to resolve the request, wait for {}", SLEEP_TIME);
- int seconds = Integer.parseInt(SLEEP_TIME);
- try {
- TimeUnit.MILLISECONDS.sleep(seconds);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- LOG.info("Continue to process the add entry request");
processAddRequestV3(r, c);
break;
case READ_ENTRY:
@@ -374,14 +364,6 @@ public class BookieRequestProcessor implements RequestProcessor {
// process packet
switch (r.getOpCode()) {
case BookieProtocol.ADDENTRY:
- LOG.info("Using v2 protocol to resolve the request, wait for {}", SLEEP_TIME);
- int seconds = Integer.parseInt(SLEEP_TIME);
- try {
- TimeUnit.MILLISECONDS.sleep(seconds);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- LOG.info("Continue to process the add entry request");
checkArgument(r instanceof BookieProtocol.ParsedAddRequest);
processAddRequest((BookieProtocol.ParsedAddRequest) r, c);
break;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
index f6333f6ac5..9904c90aef 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
@@ -80,10 +80,6 @@ public class BookkeeperInternalCallbacks {
void writeComplete(int rc, long ledgerId, long entryId, BookieId addr, Object ctx);
}
- public interface WriteAndFlushCallback {
- void complete();
- }
-
/**
* A last-add-confirmed (LAC) reader callback interface.
*/
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 56d93b77dd..12209a636d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -104,7 +104,6 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.StartTLSCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteAndFlushCallback;
import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
import org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse;
import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader;
@@ -755,8 +754,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
* WriteFlags
*/
void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBufList toSend, WriteCallback cb,
- Object ctx, final int options, boolean allowFastFail, final EnumSet<WriteFlag> writeFlags,
- Optional<WriteAndFlushCallback> wfc) {
+ Object ctx, final int options, boolean allowFastFail, final EnumSet<WriteFlag> writeFlags) {
Object request = null;
CompletionKey completionKey = null;
if (useV2WireProtocol) {
@@ -765,7 +763,6 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
executor.executeOrdered(ledgerId, () -> {
cb.writeComplete(BKException.Code.IllegalOpException, ledgerId, entryId, bookieId, ctx);
});
- wfc.ifPresent(WriteAndFlushCallback::complete);
return;
}
completionKey = acquireV2Key(ledgerId, entryId, OperationType.ADD_ENTRY);
@@ -825,11 +822,10 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
// because we need to release toSend.
errorOut(completionKey);
toSend.release();
- wfc.ifPresent(WriteAndFlushCallback::complete);
return;
} else {
// addEntry times out on backpressure
- writeAndFlush(c, completionKey, request, allowFastFail, wfc);
+ writeAndFlush(c, completionKey, request, allowFastFail);
}
}
@@ -1111,18 +1107,9 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
}
private void writeAndFlush(final Channel channel,
- final CompletionKey key,
- final Object request,
- final boolean allowFastFail) {
- writeAndFlush(channel, key, request, allowFastFail, Optional.empty());
-
- }
-
- private void writeAndFlush(final Channel channel,
- final CompletionKey key,
- final Object request,
- final boolean allowFastFail,
- final Optional<WriteAndFlushCallback> wfc) {
+ final CompletionKey key,
+ final Object request,
+ final boolean allowFastFail) {
if (channel == null) {
LOG.warn("Operation {} failed: channel == null", StringUtils.requestToString(request));
errorOut(key);
@@ -1156,7 +1143,6 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
} else {
nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
}
- wfc.ifPresent(WriteAndFlushCallback::complete);
});
channel.writeAndFlush(request, promise);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
index 9d3ed9cf0d..944203c3e5 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
@@ -33,7 +33,6 @@ import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
@@ -344,19 +343,4 @@ public class BookieClientTest {
assertEquals("BookieInfoSuccessCount", expectedBookieInfoSuccessCount,
perChannelBookieClientScopeOfThisAddr.getSuccessCount());
}
-
- @Test
- public void testMemoryLimit() throws Exception {
- ResultStruct arc = new ResultStruct();
- BookieId addr = bs.getBookieId();
- byte[] pwd = "".getBytes(StandardCharsets.UTF_8);
-
- ClientConfiguration conf = new ClientConfiguration();
- conf.setClientMemoryLimitEnabled(true);
- conf.setClientMemoryLimitByBytes(10);
- BookieClient bc = new BookieClientImpl();
- ByteBufList bb = createByteBuffer(1, 1, 1);
- System.out.println(bb.readableBytes());
-// bc.addEntry(addr, 1, pwd, 1, );
- }
}