You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/08/12 05:33:28 UTC

[GitHub] [kafka] aloknnikhil opened a new pull request #11204: KAFKA-13188 - Release the memory back into MemoryPool

aloknnikhil opened a new pull request #11204:
URL: https://github.com/apache/kafka/pull/11204


   **Forward porting** - https://github.com/linkedin/kafka/pull/186
   
   **TICKET** = [KAFKA-13188](https://issues.apache.org/jira/browse/KAFKA-13188)
   **LI_DESCRIPTION** =
   The current KafkaConsumer code includes support for allocating buffers from MemoryPool when allocating bytes for requests to Brokers. However, the code doesn't release them back to the pool and hence, rendering pooling moot. Currently, it works because it uses MemoryPool.NONE which just mallocs buffer every time a buffer is requested. However, this won't work if a different memory pool implementation is used.
   
   The consumer can't just release the pool back into the memory because the fetch requests keep on holding on to the references to the pool in CompletedFetch objects which live across multiple poll calls. The idea here is to use ref counting based approach, where the ClientResponse increments ref count every time a CompletedFetch object is created and decrement when the fetch is drained after a poll calls returning records.
   
   For the rest of the things such as metadata, offset commit, list offsets it is somewhat easier as the client is done with the response bytes after response future callback is completed.
   
   This PR also adds the ClientResponseWithFinalize class for debugging purposes as well protected by linkedin.enable.client.resonse.leakcheck flag. The class uses finalizer to check whether there is some issue in code due to which the buffer wasn't released back to the pool yet. However, during in an actual production setting, the finalizers are costly and hence, not enabled by default.
   
   **EXIT_CRITERIA** = If and when the upstream ticket is merged, and the changes are pulled in
   
   ### Committer Checklist (excluded from commit message)
   - [x] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #11204: KAFKA-13188 - Release the memory back into MemoryPool

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #11204:
URL: https://github.com/apache/kafka/pull/11204#discussion_r799998166



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClientResponseWithFinalize.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients;
+
+import java.nio.ByteBuffer;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+
+/**
+ * This is a decorator for ClientResponse used to verify (at finalization time) that any underlying memory for this
+ * response has been returned to the pool. To be used only as a debugging aide.
+ */
+public class ClientResponseWithFinalize extends ClientResponse {
+    private final LogContext logContext;
+
+    public ClientResponseWithFinalize(RequestHeader requestHeader, RequestCompletionHandler callback,
+        String destination, long createdTimeMs, long receivedTimeMs, boolean disconnected,
+        UnsupportedVersionException versionMismatch, AuthenticationException authenticationException,
+        AbstractResponse responseBody, MemoryPool memoryPool, ByteBuffer responsePayload, LogContext logContext) {
+        super(requestHeader, callback, destination, createdTimeMs, receivedTimeMs, disconnected, versionMismatch,
+            authenticationException, responseBody, memoryPool, responsePayload);
+        this.logContext = logContext;
+    }
+
+    private Logger getLogger() {
+        if (logContext != null) {
+            return logContext.logger(ClientResponseWithFinalize.class);
+        }
+        return log;
+    }
+
+    protected void checkAndForceBufferRelease() {
+        if (memoryPool != null && responsePayload != null) {
+            getLogger().error("ByteBuffer[{}] not released. Ref Count: {}. RequestType: {}", responsePayload.position(),
+                refCount.get(), this.requestHeader.apiKey());
+            memoryPool.release(responsePayload);
+            responsePayload = null;
+        }
+    }
+
+    @Override
+    protected void finalize() throws Throwable {
+        super.finalize();
+        checkAndForceBufferRelease();
+    }

Review comment:
       Finalizers are deprecated, so it would be good to avoid them.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] smccauliff commented on a change in pull request #11204: KAFKA-13188 - Release the memory back into MemoryPool

Posted by GitBox <gi...@apache.org>.
smccauliff commented on a change in pull request #11204:
URL: https://github.com/apache/kafka/pull/11204#discussion_r688600716



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClientResponseWithFinalize.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients;
+
+import java.nio.ByteBuffer;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+
+/**
+ * This is a decorator for ClientResponse used to verify (at finalization time) that any underlying memory for this
+ * response has been returned to the pool. To be used only as a debugging aide.
+ */
+public class ClientResponseWithFinalize extends ClientResponse {
+    private final LogContext logContext;
+
+    public ClientResponseWithFinalize(RequestHeader requestHeader, RequestCompletionHandler callback,
+        String destination, long createdTimeMs, long receivedTimeMs, boolean disconnected,
+        UnsupportedVersionException versionMismatch, AuthenticationException authenticationException,
+        AbstractResponse responseBody, MemoryPool memoryPool, ByteBuffer responsePayload, LogContext logContext) {
+        super(requestHeader, callback, destination, createdTimeMs, receivedTimeMs, disconnected, versionMismatch,
+            authenticationException, responseBody, memoryPool, responsePayload);
+        this.logContext = logContext;
+    }
+
+    private Logger getLogger() {
+        if (logContext != null) {
+            return logContext.logger(ClientResponseWithFinalize.class);
+        }
+        return log;
+    }
+
+    protected void checkAndForceBufferRelease() {
+        if (memoryPool != null && responsePayload != null) {
+            getLogger().error("ByteBuffer[{}] not released. Ref Count: {}. RequestType: {}", responsePayload.position(),
+                refCount.get(), this.requestHeader.apiKey());
+            memoryPool.release(responsePayload);
+            responsePayload = null;
+        }
+    }
+
+    @Override
+    protected void finalize() throws Throwable {
+        super.finalize();
+        checkAndForceBufferRelease();
+    }

Review comment:
       I realize the super class does not define a finalize method, but it might be nice to have correct ordering of calls to super class finalize in the code base.  See https://stackoverflow.com/questions/11379115/do-you-call-super-finalize-within-a-subclass/11379181
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tusharg1993 commented on pull request #11204: KAFKA-13188 - Release the memory back into MemoryPool

Posted by GitBox <gi...@apache.org>.
tusharg1993 commented on pull request #11204:
URL: https://github.com/apache/kafka/pull/11204#issuecomment-907540893


   @aloknnikhil Hey, Tushar here. I am the original author for the PR. Thanks for forward porting the change apache/kafka trunk. 
   
   Some background. This change is essentially a very nice improvement that we saw especially when KafkaConsumer is lagging and try to fetch as much as data possible from brokers. 
   
   This becomes a GC problem because this long withstanding allocation of memory can promote to old gen in cases where we are lagging. The solution to this is to pool these buffers inside kafka library such that only first few instances are allocated and then the network code tries to reuse them in order to relieve the memory pressure in terms of new allocations per sec and amount of data that gets promoted to OldGen


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tusharg1993 commented on a change in pull request #11204: KAFKA-13188 - Release the memory back into MemoryPool

Posted by GitBox <gi...@apache.org>.
tusharg1993 commented on a change in pull request #11204:
URL: https://github.com/apache/kafka/pull/11204#discussion_r802200036



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClientResponseWithFinalize.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients;
+
+import java.nio.ByteBuffer;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+
+/**
+ * This is a decorator for ClientResponse used to verify (at finalization time) that any underlying memory for this
+ * response has been returned to the pool. To be used only as a debugging aide.
+ */
+public class ClientResponseWithFinalize extends ClientResponse {
+    private final LogContext logContext;
+
+    public ClientResponseWithFinalize(RequestHeader requestHeader, RequestCompletionHandler callback,
+        String destination, long createdTimeMs, long receivedTimeMs, boolean disconnected,
+        UnsupportedVersionException versionMismatch, AuthenticationException authenticationException,
+        AbstractResponse responseBody, MemoryPool memoryPool, ByteBuffer responsePayload, LogContext logContext) {
+        super(requestHeader, callback, destination, createdTimeMs, receivedTimeMs, disconnected, versionMismatch,
+            authenticationException, responseBody, memoryPool, responsePayload);
+        this.logContext = logContext;
+    }
+
+    private Logger getLogger() {
+        if (logContext != null) {
+            return logContext.logger(ClientResponseWithFinalize.class);
+        }
+        return log;
+    }
+
+    protected void checkAndForceBufferRelease() {
+        if (memoryPool != null && responsePayload != null) {
+            getLogger().error("ByteBuffer[{}] not released. Ref Count: {}. RequestType: {}", responsePayload.position(),
+                refCount.get(), this.requestHeader.apiKey());
+            memoryPool.release(responsePayload);
+            responsePayload = null;
+        }
+    }
+
+    @Override
+    protected void finalize() throws Throwable {
+        super.finalize();
+        checkAndForceBufferRelease();
+    }

Review comment:
       The finalizer here is not supposed to be used in production. Rather it's a debugging tool in cases issues arise
   
   >This PR also adds the ClientResponseWithFinalize class for debugging purposes as well protected by linkedin.enable.client.resonse.leakcheck flag. The class uses finalizer to check whether there is some issue in code due to which the buffer wasn't released back to the pool yet. However, during in an actual production setting, the finalizers are costly and hence, not enabled by default.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org