You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by yo...@apache.org on 2022/06/13 07:37:50 UTC

[bookkeeper] branch branch-4.14 updated (dab1e981bc -> 5370d63391)

This is an automated email from the ASF dual-hosted git repository.

yong pushed a change to branch branch-4.14
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


    from dab1e981bc Testing the memory limit
     new 4b44661987 Revert "Testing the memory limit"
     new 5370d63391  Fix the V2 AddRequest object leak issue (#3323)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../bookkeeper/conf/ClientConfiguration.java       | 22 -------
 .../apache/bookkeeper/proto/BookieClientImpl.java  | 75 ++--------------------
 .../bookkeeper/proto/BookieRequestProcessor.java   | 18 ------
 .../proto/BookkeeperInternalCallbacks.java         |  4 --
 .../bookkeeper/proto/PerChannelBookieClient.java   | 32 ++++-----
 .../apache/bookkeeper/test/BookieClientTest.java   | 16 -----
 6 files changed, 19 insertions(+), 148 deletions(-)


[bookkeeper] 02/02: Fix the V2 AddRequest object leak issue (#3323)

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-4.14
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 5370d63391e5683d27ac9c4f4166281eaf026a10
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Mon Jun 13 15:27:51 2022 +0800

     Fix the V2 AddRequest object leak issue (#3323)
    
    ---
    
    **Motivation**
    
    If the request is a V2 add request, we retained the data's
    reference when creating the AddRequest object. To avoid the
    object leak, we need to release the reference if we met
    any errors before sending it.
    
    (cherry picked from commit f887f8d7a507800b71b4143a40b0e45902f5f170)
---
 .../java/org/apache/bookkeeper/proto/PerChannelBookieClient.java  | 8 ++++++++
 1 file changed, 8 insertions(+)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 12209a636d..aeef8f8e19 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -1127,6 +1127,14 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                     StringUtils.requestToString(request));
 
             errorOut(key, BKException.Code.TooManyRequestsException);
+
+            // If the request is a V2 add request, we retained the data's reference when creating the AddRequest
+            // object. To avoid the object leak, we need to release the reference if we met any errors
+            // before sending it.
+            if (request instanceof BookieProtocol.AddRequest) {
+                BookieProtocol.AddRequest ar = (BookieProtocol.AddRequest) request;
+                ar.recycle();
+            }
             return;
         }
 


[bookkeeper] 01/02: Revert "Testing the memory limit"

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-4.14
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 4b446619872ead696f01ea092e590a8a0e582134
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Mon Jun 13 15:37:15 2022 +0800

    Revert "Testing the memory limit"
    
    This reverts commit dab1e981bc1497341f108e55de2dc3c5f1447615.
---
 .../bookkeeper/conf/ClientConfiguration.java       | 22 -------
 .../apache/bookkeeper/proto/BookieClientImpl.java  | 75 ++--------------------
 .../bookkeeper/proto/BookieRequestProcessor.java   | 18 ------
 .../proto/BookkeeperInternalCallbacks.java         |  4 --
 .../bookkeeper/proto/PerChannelBookieClient.java   | 24 ++-----
 .../apache/bookkeeper/test/BookieClientTest.java   | 16 -----
 6 files changed, 11 insertions(+), 148 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index e900388485..80c2ad4b9d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -198,10 +198,6 @@ public class ClientConfiguration extends AbstractConfiguration<ClientConfigurati
     protected static final String CLIENT_CONNECT_BOOKIE_UNAVAILABLE_LOG_THROTTLING =
             "clientConnectBookieUnavailableLogThrottling";
 
-    // client memory limit options
-    protected static final String CLIENT_MEMORY_LIMIT_ENABLED = "clientMemoryLimitEnabled";
-    protected static final String CLIENT_MEMORY_LIMIT_BY_BYTES = "clientMemoryLimitByBytes";
-
     /**
      * Construct a default client-side configuration.
      */
@@ -2012,24 +2008,6 @@ public class ClientConfiguration extends AbstractConfiguration<ClientConfigurati
         return getLong(CLIENT_CONNECT_BOOKIE_UNAVAILABLE_LOG_THROTTLING, 5_000L);
     }
 
-    public ClientConfiguration setClientMemoryLimitEnabled(boolean enabled) {
-        setProperty(CLIENT_MEMORY_LIMIT_ENABLED, enabled);
-        return this;
-    }
-
-    public boolean getClientMemoryLimitEnabled() {
-        return getBoolean(CLIENT_MEMORY_LIMIT_ENABLED, false);
-    }
-
-    public ClientConfiguration setClientMemoryLimitByBytes(int bytes) {
-        setProperty(CLIENT_MEMORY_LIMIT_BY_BYTES, bytes);
-        return this;
-    }
-
-    public int getClientMemoryLimitByBytes() {
-        return getInt(CLIENT_MEMORY_LIMIT_BY_BYTES, 64 * 1024 * 1024);
-    }
-
     @Override
     protected ClientConfiguration getThis() {
         return this;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
index b9493f43b2..03e3c068e4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
@@ -38,7 +38,6 @@ import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.IOException;
 import java.util.EnumSet;
 import java.util.List;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
@@ -53,7 +52,6 @@ import org.apache.bookkeeper.auth.ClientAuthProvider;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
 import org.apache.bookkeeper.client.api.WriteFlag;
-import org.apache.bookkeeper.common.util.MemoryLimitController;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.common.util.SafeRunnable;
 import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -67,7 +65,6 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteAndFlushCallback;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.tls.SecurityException;
@@ -106,7 +103,6 @@ public class BookieClientImpl implements BookieClient, PerChannelBookieClientFac
     private final BookieAddressResolver bookieAddressResolver;
 
     private final long bookieErrorThresholdPerInterval;
-    private Optional<MemoryLimitController> memoryLimitController;
 
     public BookieClientImpl(ClientConfiguration conf, EventLoopGroup eventLoopGroup,
                             ByteBufAllocator allocator,
@@ -141,16 +137,6 @@ public class BookieClientImpl implements BookieClient, PerChannelBookieClientFac
         } else {
             this.timeoutFuture = null;
         }
-
-        if (conf.getClientMemoryLimitEnabled()) {
-            memoryLimitController = Optional.of(new MemoryLimitController(conf.getClientMemoryLimitByBytes()));
-        } else {
-            memoryLimitController = Optional.empty();
-        }
-    }
-
-    public Optional<MemoryLimitController> getMemoryLimitController() {
-        return memoryLimitController;
     }
 
     private int getRc(int rc) {
@@ -339,33 +325,11 @@ public class BookieClientImpl implements BookieClient, PerChannelBookieClientFac
         // Retain the buffer, since the connection could be obtained after
         // the PendingApp might have already failed
         toSend.retain();
-        Optional<WriteAndFlushCallback> callback = Optional.empty();
-        try {
-            callback = setMemoryLimit(entryId, toSend.readableBytes());
-        } catch (InterruptedException e) {
-            completeAdd(getRc(BKException.Code.IllegalOpException), ledgerId, entryId, addr, cb, ctx);
-            LOG.error("Failed to set memory limit when adding entry {}:{}", ledgerId, entryId, e);
-            return;
-        }
-        client.obtain(ChannelReadyForAddEntryCallback.create(
-                this, toSend, ledgerId, entryId, addr,
-                ctx, cb, options, masterKey, allowFastFail, writeFlags, callback),
-            ledgerId);
-    }
 
-    private Optional<WriteAndFlushCallback> setMemoryLimit(final long entryId, final long entrySize) throws InterruptedException {
-        if (getMemoryLimitController().isPresent()) {
-            MemoryLimitController mlc = getMemoryLimitController().get();
-            mlc.reserveMemory(entrySize);
-            LOG.debug("Acquire memory size {} for entry {}, current usage {} ", entrySize, entryId,
-                mlc.currentUsage());
-            WriteAndFlushCallbackImpl callback = new WriteAndFlushCallbackImpl();
-            callback.setBookieClient(this);
-            callback.setSize(entrySize);
-            callback.setEntryId(entryId);
-            return Optional.of(callback);
-        }
-        return Optional.empty();
+        client.obtain(ChannelReadyForAddEntryCallback.create(
+                              this, toSend, ledgerId, entryId, addr,
+                                  ctx, cb, options, masterKey, allowFastFail, writeFlags),
+                      ledgerId);
     }
 
     @Override
@@ -414,31 +378,6 @@ public class BookieClientImpl implements BookieClient, PerChannelBookieClientFac
         }
     }
 
-    private static class WriteAndFlushCallbackImpl implements WriteAndFlushCallback {
-
-        private BookieClientImpl bookieClient;
-        private long size;
-        private long entryId;
-
-        public void setBookieClient(BookieClientImpl bookieClient) {
-            this.bookieClient = bookieClient;
-        }
-
-        public void setSize(long size) {
-            this.size = size;
-        }
-
-        public void setEntryId(long entryId) {
-            this.entryId = entryId;
-        }
-
-        @Override
-        public void complete() {
-            bookieClient.getMemoryLimitController().get().releaseMemory(size);
-            LOG.debug("Release memory size {} for entry {}", size, entryId);
-        }
-    }
-
     private static class ChannelReadyForAddEntryCallback
         implements GenericCallback<PerChannelBookieClient> {
         private final Handle<ChannelReadyForAddEntryCallback> recyclerHandle;
@@ -454,13 +393,12 @@ public class BookieClientImpl implements BookieClient, PerChannelBookieClientFac
         private byte[] masterKey;
         private boolean allowFastFail;
         private EnumSet<WriteFlag> writeFlags;
-        private Optional<WriteAndFlushCallback> writeAndFlushCallback;
 
         static ChannelReadyForAddEntryCallback create(
                 BookieClientImpl bookieClient, ByteBufList toSend, long ledgerId,
                 long entryId, BookieId addr, Object ctx,
                 WriteCallback cb, int options, byte[] masterKey, boolean allowFastFail,
-                EnumSet<WriteFlag> writeFlags, Optional<WriteAndFlushCallback> writeAndFlushCallback) {
+                EnumSet<WriteFlag> writeFlags) {
             ChannelReadyForAddEntryCallback callback = RECYCLER.get();
             callback.bookieClient = bookieClient;
             callback.toSend = toSend;
@@ -473,7 +411,6 @@ public class BookieClientImpl implements BookieClient, PerChannelBookieClientFac
             callback.masterKey = masterKey;
             callback.allowFastFail = allowFastFail;
             callback.writeFlags = writeFlags;
-            callback.writeAndFlushCallback = writeAndFlushCallback;
             return callback;
         }
 
@@ -484,7 +421,7 @@ public class BookieClientImpl implements BookieClient, PerChannelBookieClientFac
                 bookieClient.completeAdd(rc, ledgerId, entryId, addr, cb, ctx);
             } else {
                 pcbc.addEntry(ledgerId, masterKey, entryId,
-                    toSend, cb, ctx, options, allowFastFail, writeFlags, writeAndFlushCallback);
+                              toSend, cb, ctx, options, allowFastFail, writeFlags);
             }
 
             toSend.release();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index e36b579942..67f83e9ce5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -298,8 +298,6 @@ public class BookieRequestProcessor implements RequestProcessor {
         }
     }
 
-    private static final String SLEEP_TIME = System.getenv("SLEEP_TIME_FOR_TESTING");
-
     @Override
     public void processRequest(Object msg, Channel c) {
         // If we can decode this packet as a Request protobuf packet, process
@@ -311,14 +309,6 @@ public class BookieRequestProcessor implements RequestProcessor {
                 BookkeeperProtocol.BKPacketHeader header = r.getHeader();
                 switch (header.getOperation()) {
                     case ADD_ENTRY:
-                        LOG.info("Using v3 protocol to resolve the request, wait for {}", SLEEP_TIME);
-                        int seconds = Integer.parseInt(SLEEP_TIME);
-                        try {
-                            TimeUnit.MILLISECONDS.sleep(seconds);
-                        } catch (InterruptedException e) {
-                            e.printStackTrace();
-                        }
-                        LOG.info("Continue to process the add entry request");
                         processAddRequestV3(r, c);
                         break;
                     case READ_ENTRY:
@@ -374,14 +364,6 @@ public class BookieRequestProcessor implements RequestProcessor {
             // process packet
             switch (r.getOpCode()) {
                 case BookieProtocol.ADDENTRY:
-                    LOG.info("Using v2 protocol to resolve the request, wait for {}", SLEEP_TIME);
-                    int seconds = Integer.parseInt(SLEEP_TIME);
-                    try {
-                        TimeUnit.MILLISECONDS.sleep(seconds);
-                    } catch (InterruptedException e) {
-                        e.printStackTrace();
-                    }
-                    LOG.info("Continue to process the add entry request");
                     checkArgument(r instanceof BookieProtocol.ParsedAddRequest);
                     processAddRequest((BookieProtocol.ParsedAddRequest) r, c);
                     break;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
index f6333f6ac5..9904c90aef 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
@@ -80,10 +80,6 @@ public class BookkeeperInternalCallbacks {
         void writeComplete(int rc, long ledgerId, long entryId, BookieId addr, Object ctx);
     }
 
-    public interface WriteAndFlushCallback {
-        void complete();
-    }
-
     /**
      * A last-add-confirmed (LAC) reader callback interface.
      */
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 56d93b77dd..12209a636d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -104,7 +104,6 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.StartTLSCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteAndFlushCallback;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader;
@@ -755,8 +754,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
      *          WriteFlags
      */
     void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBufList toSend, WriteCallback cb,
-                  Object ctx, final int options, boolean allowFastFail, final EnumSet<WriteFlag> writeFlags,
-                  Optional<WriteAndFlushCallback> wfc) {
+                  Object ctx, final int options, boolean allowFastFail, final EnumSet<WriteFlag> writeFlags) {
         Object request = null;
         CompletionKey completionKey = null;
         if (useV2WireProtocol) {
@@ -765,7 +763,6 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                 executor.executeOrdered(ledgerId, () -> {
                     cb.writeComplete(BKException.Code.IllegalOpException, ledgerId, entryId, bookieId, ctx);
                 });
-                wfc.ifPresent(WriteAndFlushCallback::complete);
                 return;
             }
             completionKey = acquireV2Key(ledgerId, entryId, OperationType.ADD_ENTRY);
@@ -825,11 +822,10 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
             // because we need to release toSend.
             errorOut(completionKey);
             toSend.release();
-            wfc.ifPresent(WriteAndFlushCallback::complete);
             return;
         } else {
             // addEntry times out on backpressure
-            writeAndFlush(c, completionKey, request, allowFastFail, wfc);
+            writeAndFlush(c, completionKey, request, allowFastFail);
         }
     }
 
@@ -1111,18 +1107,9 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
     }
 
     private void writeAndFlush(final Channel channel,
-                               final CompletionKey key,
-                               final Object request,
-                               final boolean allowFastFail) {
-        writeAndFlush(channel, key, request, allowFastFail, Optional.empty());
-
-    }
-
-    private void writeAndFlush(final Channel channel,
-                               final CompletionKey key,
-                               final Object request,
-                               final boolean allowFastFail,
-                               final Optional<WriteAndFlushCallback> wfc) {
+                           final CompletionKey key,
+                           final Object request,
+                           final boolean allowFastFail) {
         if (channel == null) {
             LOG.warn("Operation {} failed: channel == null", StringUtils.requestToString(request));
             errorOut(key);
@@ -1156,7 +1143,6 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                 } else {
                     nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
                 }
-                wfc.ifPresent(WriteAndFlushCallback::complete);
             });
 
             channel.writeAndFlush(request, promise);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
index 9d3ed9cf0d..944203c3e5 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
@@ -33,7 +33,6 @@ import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
@@ -344,19 +343,4 @@ public class BookieClientTest {
         assertEquals("BookieInfoSuccessCount", expectedBookieInfoSuccessCount,
                 perChannelBookieClientScopeOfThisAddr.getSuccessCount());
     }
-
-    @Test
-    public void testMemoryLimit() throws Exception {
-        ResultStruct arc = new ResultStruct();
-        BookieId addr = bs.getBookieId();
-        byte[] pwd = "".getBytes(StandardCharsets.UTF_8);
-
-        ClientConfiguration conf = new ClientConfiguration();
-        conf.setClientMemoryLimitEnabled(true);
-        conf.setClientMemoryLimitByBytes(10);
-        BookieClient bc = new BookieClientImpl();
-        ByteBufList bb = createByteBuffer(1, 1, 1);
-        System.out.println(bb.readableBytes());
-//        bc.addEntry(addr, 1, pwd, 1, );
-    }
 }