You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by yo...@apache.org on 2022/06/13 07:37:51 UTC

[bookkeeper] 01/02: Revert "Testing the memory limit"

This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-4.14
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 4b446619872ead696f01ea092e590a8a0e582134
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Mon Jun 13 15:37:15 2022 +0800

    Revert "Testing the memory limit"
    
    This reverts commit dab1e981bc1497341f108e55de2dc3c5f1447615.
---
 .../bookkeeper/conf/ClientConfiguration.java       | 22 -------
 .../apache/bookkeeper/proto/BookieClientImpl.java  | 75 ++--------------------
 .../bookkeeper/proto/BookieRequestProcessor.java   | 18 ------
 .../proto/BookkeeperInternalCallbacks.java         |  4 --
 .../bookkeeper/proto/PerChannelBookieClient.java   | 24 ++-----
 .../apache/bookkeeper/test/BookieClientTest.java   | 16 -----
 6 files changed, 11 insertions(+), 148 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index e900388485..80c2ad4b9d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -198,10 +198,6 @@ public class ClientConfiguration extends AbstractConfiguration<ClientConfigurati
     protected static final String CLIENT_CONNECT_BOOKIE_UNAVAILABLE_LOG_THROTTLING =
             "clientConnectBookieUnavailableLogThrottling";
 
-    // client memory limit options
-    protected static final String CLIENT_MEMORY_LIMIT_ENABLED = "clientMemoryLimitEnabled";
-    protected static final String CLIENT_MEMORY_LIMIT_BY_BYTES = "clientMemoryLimitByBytes";
-
     /**
      * Construct a default client-side configuration.
      */
@@ -2012,24 +2008,6 @@ public class ClientConfiguration extends AbstractConfiguration<ClientConfigurati
         return getLong(CLIENT_CONNECT_BOOKIE_UNAVAILABLE_LOG_THROTTLING, 5_000L);
     }
 
-    public ClientConfiguration setClientMemoryLimitEnabled(boolean enabled) {
-        setProperty(CLIENT_MEMORY_LIMIT_ENABLED, enabled);
-        return this;
-    }
-
-    public boolean getClientMemoryLimitEnabled() {
-        return getBoolean(CLIENT_MEMORY_LIMIT_ENABLED, false);
-    }
-
-    public ClientConfiguration setClientMemoryLimitByBytes(int bytes) {
-        setProperty(CLIENT_MEMORY_LIMIT_BY_BYTES, bytes);
-        return this;
-    }
-
-    public int getClientMemoryLimitByBytes() {
-        return getInt(CLIENT_MEMORY_LIMIT_BY_BYTES, 64 * 1024 * 1024);
-    }
-
     @Override
     protected ClientConfiguration getThis() {
         return this;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
index b9493f43b2..03e3c068e4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
@@ -38,7 +38,6 @@ import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.IOException;
 import java.util.EnumSet;
 import java.util.List;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
@@ -53,7 +52,6 @@ import org.apache.bookkeeper.auth.ClientAuthProvider;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
 import org.apache.bookkeeper.client.api.WriteFlag;
-import org.apache.bookkeeper.common.util.MemoryLimitController;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.common.util.SafeRunnable;
 import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -67,7 +65,6 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteAndFlushCallback;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.tls.SecurityException;
@@ -106,7 +103,6 @@ public class BookieClientImpl implements BookieClient, PerChannelBookieClientFac
     private final BookieAddressResolver bookieAddressResolver;
 
     private final long bookieErrorThresholdPerInterval;
-    private Optional<MemoryLimitController> memoryLimitController;
 
     public BookieClientImpl(ClientConfiguration conf, EventLoopGroup eventLoopGroup,
                             ByteBufAllocator allocator,
@@ -141,16 +137,6 @@ public class BookieClientImpl implements BookieClient, PerChannelBookieClientFac
         } else {
             this.timeoutFuture = null;
         }
-
-        if (conf.getClientMemoryLimitEnabled()) {
-            memoryLimitController = Optional.of(new MemoryLimitController(conf.getClientMemoryLimitByBytes()));
-        } else {
-            memoryLimitController = Optional.empty();
-        }
-    }
-
-    public Optional<MemoryLimitController> getMemoryLimitController() {
-        return memoryLimitController;
     }
 
     private int getRc(int rc) {
@@ -339,33 +325,11 @@ public class BookieClientImpl implements BookieClient, PerChannelBookieClientFac
         // Retain the buffer, since the connection could be obtained after
         // the PendingApp might have already failed
         toSend.retain();
-        Optional<WriteAndFlushCallback> callback = Optional.empty();
-        try {
-            callback = setMemoryLimit(entryId, toSend.readableBytes());
-        } catch (InterruptedException e) {
-            completeAdd(getRc(BKException.Code.IllegalOpException), ledgerId, entryId, addr, cb, ctx);
-            LOG.error("Failed to set memory limit when adding entry {}:{}", ledgerId, entryId, e);
-            return;
-        }
-        client.obtain(ChannelReadyForAddEntryCallback.create(
-                this, toSend, ledgerId, entryId, addr,
-                ctx, cb, options, masterKey, allowFastFail, writeFlags, callback),
-            ledgerId);
-    }
 
-    private Optional<WriteAndFlushCallback> setMemoryLimit(final long entryId, final long entrySize) throws InterruptedException {
-        if (getMemoryLimitController().isPresent()) {
-            MemoryLimitController mlc = getMemoryLimitController().get();
-            mlc.reserveMemory(entrySize);
-            LOG.debug("Acquire memory size {} for entry {}, current usage {} ", entrySize, entryId,
-                mlc.currentUsage());
-            WriteAndFlushCallbackImpl callback = new WriteAndFlushCallbackImpl();
-            callback.setBookieClient(this);
-            callback.setSize(entrySize);
-            callback.setEntryId(entryId);
-            return Optional.of(callback);
-        }
-        return Optional.empty();
+        client.obtain(ChannelReadyForAddEntryCallback.create(
+                              this, toSend, ledgerId, entryId, addr,
+                                  ctx, cb, options, masterKey, allowFastFail, writeFlags),
+                      ledgerId);
     }
 
     @Override
@@ -414,31 +378,6 @@ public class BookieClientImpl implements BookieClient, PerChannelBookieClientFac
         }
     }
 
-    private static class WriteAndFlushCallbackImpl implements WriteAndFlushCallback {
-
-        private BookieClientImpl bookieClient;
-        private long size;
-        private long entryId;
-
-        public void setBookieClient(BookieClientImpl bookieClient) {
-            this.bookieClient = bookieClient;
-        }
-
-        public void setSize(long size) {
-            this.size = size;
-        }
-
-        public void setEntryId(long entryId) {
-            this.entryId = entryId;
-        }
-
-        @Override
-        public void complete() {
-            bookieClient.getMemoryLimitController().get().releaseMemory(size);
-            LOG.debug("Release memory size {} for entry {}", size, entryId);
-        }
-    }
-
     private static class ChannelReadyForAddEntryCallback
         implements GenericCallback<PerChannelBookieClient> {
         private final Handle<ChannelReadyForAddEntryCallback> recyclerHandle;
@@ -454,13 +393,12 @@ public class BookieClientImpl implements BookieClient, PerChannelBookieClientFac
         private byte[] masterKey;
         private boolean allowFastFail;
         private EnumSet<WriteFlag> writeFlags;
-        private Optional<WriteAndFlushCallback> writeAndFlushCallback;
 
         static ChannelReadyForAddEntryCallback create(
                 BookieClientImpl bookieClient, ByteBufList toSend, long ledgerId,
                 long entryId, BookieId addr, Object ctx,
                 WriteCallback cb, int options, byte[] masterKey, boolean allowFastFail,
-                EnumSet<WriteFlag> writeFlags, Optional<WriteAndFlushCallback> writeAndFlushCallback) {
+                EnumSet<WriteFlag> writeFlags) {
             ChannelReadyForAddEntryCallback callback = RECYCLER.get();
             callback.bookieClient = bookieClient;
             callback.toSend = toSend;
@@ -473,7 +411,6 @@ public class BookieClientImpl implements BookieClient, PerChannelBookieClientFac
             callback.masterKey = masterKey;
             callback.allowFastFail = allowFastFail;
             callback.writeFlags = writeFlags;
-            callback.writeAndFlushCallback = writeAndFlushCallback;
             return callback;
         }
 
@@ -484,7 +421,7 @@ public class BookieClientImpl implements BookieClient, PerChannelBookieClientFac
                 bookieClient.completeAdd(rc, ledgerId, entryId, addr, cb, ctx);
             } else {
                 pcbc.addEntry(ledgerId, masterKey, entryId,
-                    toSend, cb, ctx, options, allowFastFail, writeFlags, writeAndFlushCallback);
+                              toSend, cb, ctx, options, allowFastFail, writeFlags);
             }
 
             toSend.release();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index e36b579942..67f83e9ce5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -298,8 +298,6 @@ public class BookieRequestProcessor implements RequestProcessor {
         }
     }
 
-    private static final String SLEEP_TIME = System.getenv("SLEEP_TIME_FOR_TESTING");
-
     @Override
     public void processRequest(Object msg, Channel c) {
         // If we can decode this packet as a Request protobuf packet, process
@@ -311,14 +309,6 @@ public class BookieRequestProcessor implements RequestProcessor {
                 BookkeeperProtocol.BKPacketHeader header = r.getHeader();
                 switch (header.getOperation()) {
                     case ADD_ENTRY:
-                        LOG.info("Using v3 protocol to resolve the request, wait for {}", SLEEP_TIME);
-                        int seconds = Integer.parseInt(SLEEP_TIME);
-                        try {
-                            TimeUnit.MILLISECONDS.sleep(seconds);
-                        } catch (InterruptedException e) {
-                            e.printStackTrace();
-                        }
-                        LOG.info("Continue to process the add entry request");
                         processAddRequestV3(r, c);
                         break;
                     case READ_ENTRY:
@@ -374,14 +364,6 @@ public class BookieRequestProcessor implements RequestProcessor {
             // process packet
             switch (r.getOpCode()) {
                 case BookieProtocol.ADDENTRY:
-                    LOG.info("Using v2 protocol to resolve the request, wait for {}", SLEEP_TIME);
-                    int seconds = Integer.parseInt(SLEEP_TIME);
-                    try {
-                        TimeUnit.MILLISECONDS.sleep(seconds);
-                    } catch (InterruptedException e) {
-                        e.printStackTrace();
-                    }
-                    LOG.info("Continue to process the add entry request");
                     checkArgument(r instanceof BookieProtocol.ParsedAddRequest);
                     processAddRequest((BookieProtocol.ParsedAddRequest) r, c);
                     break;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
index f6333f6ac5..9904c90aef 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
@@ -80,10 +80,6 @@ public class BookkeeperInternalCallbacks {
         void writeComplete(int rc, long ledgerId, long entryId, BookieId addr, Object ctx);
     }
 
-    public interface WriteAndFlushCallback {
-        void complete();
-    }
-
     /**
      * A last-add-confirmed (LAC) reader callback interface.
      */
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 56d93b77dd..12209a636d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -104,7 +104,6 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.StartTLSCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteAndFlushCallback;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader;
@@ -755,8 +754,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
      *          WriteFlags
      */
     void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBufList toSend, WriteCallback cb,
-                  Object ctx, final int options, boolean allowFastFail, final EnumSet<WriteFlag> writeFlags,
-                  Optional<WriteAndFlushCallback> wfc) {
+                  Object ctx, final int options, boolean allowFastFail, final EnumSet<WriteFlag> writeFlags) {
         Object request = null;
         CompletionKey completionKey = null;
         if (useV2WireProtocol) {
@@ -765,7 +763,6 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                 executor.executeOrdered(ledgerId, () -> {
                     cb.writeComplete(BKException.Code.IllegalOpException, ledgerId, entryId, bookieId, ctx);
                 });
-                wfc.ifPresent(WriteAndFlushCallback::complete);
                 return;
             }
             completionKey = acquireV2Key(ledgerId, entryId, OperationType.ADD_ENTRY);
@@ -825,11 +822,10 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
             // because we need to release toSend.
             errorOut(completionKey);
             toSend.release();
-            wfc.ifPresent(WriteAndFlushCallback::complete);
             return;
         } else {
             // addEntry times out on backpressure
-            writeAndFlush(c, completionKey, request, allowFastFail, wfc);
+            writeAndFlush(c, completionKey, request, allowFastFail);
         }
     }
 
@@ -1111,18 +1107,9 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
     }
 
     private void writeAndFlush(final Channel channel,
-                               final CompletionKey key,
-                               final Object request,
-                               final boolean allowFastFail) {
-        writeAndFlush(channel, key, request, allowFastFail, Optional.empty());
-
-    }
-
-    private void writeAndFlush(final Channel channel,
-                               final CompletionKey key,
-                               final Object request,
-                               final boolean allowFastFail,
-                               final Optional<WriteAndFlushCallback> wfc) {
+                           final CompletionKey key,
+                           final Object request,
+                           final boolean allowFastFail) {
         if (channel == null) {
             LOG.warn("Operation {} failed: channel == null", StringUtils.requestToString(request));
             errorOut(key);
@@ -1156,7 +1143,6 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                 } else {
                     nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
                 }
-                wfc.ifPresent(WriteAndFlushCallback::complete);
             });
 
             channel.writeAndFlush(request, promise);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
index 9d3ed9cf0d..944203c3e5 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
@@ -33,7 +33,6 @@ import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
@@ -344,19 +343,4 @@ public class BookieClientTest {
         assertEquals("BookieInfoSuccessCount", expectedBookieInfoSuccessCount,
                 perChannelBookieClientScopeOfThisAddr.getSuccessCount());
     }
-
-    @Test
-    public void testMemoryLimit() throws Exception {
-        ResultStruct arc = new ResultStruct();
-        BookieId addr = bs.getBookieId();
-        byte[] pwd = "".getBytes(StandardCharsets.UTF_8);
-
-        ClientConfiguration conf = new ClientConfiguration();
-        conf.setClientMemoryLimitEnabled(true);
-        conf.setClientMemoryLimitByBytes(10);
-        BookieClient bc = new BookieClientImpl();
-        ByteBufList bb = createByteBuffer(1, 1, 1);
-        System.out.println(bb.readableBytes());
-//        bc.addEntry(addr, 1, pwd, 1, );
-    }
 }