You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/04/30 14:23:45 UTC

[GitHub] [kafka] lbradstreet opened a new pull request #10620: KAFKA-12736: KafkaProducer.flush holds onto completed ProducerBatch(s) until flush completes

lbradstreet opened a new pull request #10620:
URL: https://github.com/apache/kafka/pull/10620


   When flush is called a copy of incomplete batches is made. This
   means that the full ProducerBatch(s) are held in memory until the flush
   has completed. For batches where the existing memory pool is used this
   is not as wasteful as the memory will be returned to the pool,
   but for non pool memory it can only be GC'd after the flush has
   completed. Rather than use copyAll we can make a new array with only the
   produceFuture(s) and await on those.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lbradstreet commented on a change in pull request #10620: KAFKA-12736: KafkaProducer.flush holds onto completed ProducerBatch(s) until flush completes

Posted by GitBox <gi...@apache.org>.
lbradstreet commented on a change in pull request #10620:
URL: https://github.com/apache/kafka/pull/10620#discussion_r629543903



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
##########
@@ -710,8 +710,11 @@ private boolean appendsInProgress() {
      */
     public void awaitFlushCompletion() throws InterruptedException {
         try {
-            for (ProducerBatch batch : this.incomplete.copyAll())
-                batch.produceFuture.await();
+            // Make a copy of of the request results at the time the flush is called.
+            // We avoid making a copy of the full incomplete batch collections to allow
+            // garbage collection.

Review comment:
       I've tried to improve the error message. LMK what you think.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lbradstreet commented on a change in pull request #10620: KAFKA-12736: KafkaProducer.flush holds onto completed ProducerBatch(s) until flush completes

Posted by GitBox <gi...@apache.org>.
lbradstreet commented on a change in pull request #10620:
URL: https://github.com/apache/kafka/pull/10620#discussion_r632614924



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
##########
@@ -710,8 +710,11 @@ private boolean appendsInProgress() {
      */
     public void awaitFlushCompletion() throws InterruptedException {
         try {
-            for (ProducerBatch batch : this.incomplete.copyAll())
-                batch.produceFuture.await();
+            // Obtain a copy of all of the incomplete ProduceRequestResult(s) the time of the flush.
+            // We must be careful not to hold a reference to the ProduceBatch(s) so that garbage
+            // collection can occur on the contents.

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lbradstreet commented on a change in pull request #10620: KAFKA-12736: KafkaProducer.flush holds onto completed ProducerBatch(s) until flush completes

Posted by GitBox <gi...@apache.org>.
lbradstreet commented on a change in pull request #10620:
URL: https://github.com/apache/kafka/pull/10620#discussion_r624525594



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
##########
@@ -710,8 +710,11 @@ private boolean appendsInProgress() {
      */
     public void awaitFlushCompletion() throws InterruptedException {
         try {
-            for (ProducerBatch batch : this.incomplete.copyAll())
-                batch.produceFuture.await();
+            // Make a copy of of the request results at the time the flush is called.
+            // We avoid making a copy of the full incomplete batch collections to allow
+            // garbage collection.

Review comment:
       We do have a reference to incomplete, however the sender will remove the producer batches from the original incomplete collection. It's not currently doing so because we're making a full copy of incomplete's contents.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10620: KAFKA-12736: KafkaProducer.flush holds onto completed ProducerBatch(s) until flush completes

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10620:
URL: https://github.com/apache/kafka/pull/10620#discussion_r629703346



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
##########
@@ -710,8 +710,11 @@ private boolean appendsInProgress() {
      */
     public void awaitFlushCompletion() throws InterruptedException {
         try {
-            for (ProducerBatch batch : this.incomplete.copyAll())
-                batch.produceFuture.await();
+            // Obtain a copy of all of the incomplete ProduceRequestResult(s) the time of the flush.
+            // We must be careful not to hold a reference to the ProduceBatch(s) so that garbage
+            // collection can occur on the contents.

Review comment:
       I think I'd mention this bit from your message: `the sender will remove the producer batches from the original incomplete collection`. This explains why we should not hold to any batches.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lbradstreet commented on a change in pull request #10620: KAFKA-12736: KafkaProducer.flush holds onto completed ProducerBatch(s) until flush completes

Posted by GitBox <gi...@apache.org>.
lbradstreet commented on a change in pull request #10620:
URL: https://github.com/apache/kafka/pull/10620#discussion_r624525594



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
##########
@@ -710,8 +710,11 @@ private boolean appendsInProgress() {
      */
     public void awaitFlushCompletion() throws InterruptedException {
         try {
-            for (ProducerBatch batch : this.incomplete.copyAll())
-                batch.produceFuture.await();
+            // Make a copy of of the request results at the time the flush is called.
+            // We avoid making a copy of the full incomplete batch collections to allow
+            // garbage collection.

Review comment:
       We do have a reference to incomplete, however the sender will remove the producer batches from the original incomplete collection. It's not currently freeing the individual ProducerBatch(s) because we're making a full copy of incomplete's contents.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lbradstreet commented on a change in pull request #10620: KAFKA-12736: KafkaProducer.flush holds onto completed ProducerBatch(s) until flush completes

Posted by GitBox <gi...@apache.org>.
lbradstreet commented on a change in pull request #10620:
URL: https://github.com/apache/kafka/pull/10620#discussion_r633028169



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/IncompleteBatches.java
##########
@@ -51,6 +51,14 @@ public void remove(ProducerBatch batch) {
         }
     }
 
+    public Iterable<ProduceRequestResult> requestResults() {
+        synchronized (incomplete) {
+            ArrayList<ProduceRequestResult> results = new ArrayList<>(this.incomplete.size());
+            this.incomplete.forEach(incomplete -> results.add(incomplete.produceFuture));
+            return results;

Review comment:
       I think I had a bit of a blind spot because I wanted to preallocate the ArrayList to be the correct size. I think streams is able to do that though. I'm happy to convert it back if it's nicer.

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/IncompleteBatches.java
##########
@@ -51,6 +51,14 @@ public void remove(ProducerBatch batch) {
         }
     }
 
+    public Iterable<ProduceRequestResult> requestResults() {
+        synchronized (incomplete) {
+            ArrayList<ProduceRequestResult> results = new ArrayList<>(this.incomplete.size());
+            this.incomplete.forEach(incomplete -> results.add(incomplete.produceFuture));
+            return results;

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10620: KAFKA-12736: KafkaProducer.flush holds onto completed ProducerBatch(s) until flush completes

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10620:
URL: https://github.com/apache/kafka/pull/10620#discussion_r624529198



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
##########
@@ -710,8 +710,11 @@ private boolean appendsInProgress() {
      */
     public void awaitFlushCompletion() throws InterruptedException {
         try {
-            for (ProducerBatch batch : this.incomplete.copyAll())
-                batch.produceFuture.await();
+            // Make a copy of of the request results at the time the flush is called.
+            // We avoid making a copy of the full incomplete batch collections to allow
+            // garbage collection.

Review comment:
       That makes sense. It's worth clarifying that here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lbradstreet commented on a change in pull request #10620: KAFKA-12736: KafkaProducer.flush holds onto completed ProducerBatch(s) until flush completes

Posted by GitBox <gi...@apache.org>.
lbradstreet commented on a change in pull request #10620:
URL: https://github.com/apache/kafka/pull/10620#discussion_r624525594



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
##########
@@ -710,8 +710,11 @@ private boolean appendsInProgress() {
      */
     public void awaitFlushCompletion() throws InterruptedException {
         try {
-            for (ProducerBatch batch : this.incomplete.copyAll())
-                batch.produceFuture.await();
+            // Make a copy of of the request results at the time the flush is called.
+            // We avoid making a copy of the full incomplete batch collections to allow
+            // garbage collection.

Review comment:
       We do have a reference to incomplete, however the sender will remove the producer batches from the original incomplete collection. It's not currently doing so because we're making a full copy of incomplete.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lbradstreet commented on a change in pull request #10620: KAFKA-12736: KafkaProducer.flush holds onto completed ProducerBatch(s) until flush completes

Posted by GitBox <gi...@apache.org>.
lbradstreet commented on a change in pull request #10620:
URL: https://github.com/apache/kafka/pull/10620#discussion_r633135023



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/IncompleteBatches.java
##########
@@ -51,6 +51,14 @@ public void remove(ProducerBatch batch) {
         }
     }
 
+    public Iterable<ProduceRequestResult> requestResults() {
+        synchronized (incomplete) {
+            ArrayList<ProduceRequestResult> results = new ArrayList<>(this.incomplete.size());
+            this.incomplete.forEach(incomplete -> results.add(incomplete.produceFuture));
+            return results;

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10620: KAFKA-12736: KafkaProducer.flush holds onto completed ProducerBatch(s) until flush completes

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10620:
URL: https://github.com/apache/kafka/pull/10620#discussion_r633025923



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/IncompleteBatches.java
##########
@@ -51,6 +51,14 @@ public void remove(ProducerBatch batch) {
         }
     }
 
+    public Iterable<ProduceRequestResult> requestResults() {
+        synchronized (incomplete) {
+            ArrayList<ProduceRequestResult> results = new ArrayList<>(this.incomplete.size());
+            this.incomplete.forEach(incomplete -> results.add(incomplete.produceFuture));
+            return results;

Review comment:
       Out of curiosity, why did you not use `map` here? Is it to avoid the allocation of the `stream`? It doesn't seem that this is a hotspot though.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma merged pull request #10620: KAFKA-12736: KafkaProducer.flush holds onto completed ProducerBatch(s) until flush completes

Posted by GitBox <gi...@apache.org>.
ijuma merged pull request #10620:
URL: https://github.com/apache/kafka/pull/10620


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma merged pull request #10620: KAFKA-12736: KafkaProducer.flush holds onto completed ProducerBatch(s) until flush completes

Posted by GitBox <gi...@apache.org>.
ijuma merged pull request #10620:
URL: https://github.com/apache/kafka/pull/10620


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10620: KAFKA-12736: KafkaProducer.flush holds onto completed ProducerBatch(s) until flush completes

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10620:
URL: https://github.com/apache/kafka/pull/10620#discussion_r624518310



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
##########
@@ -710,8 +710,11 @@ private boolean appendsInProgress() {
      */
     public void awaitFlushCompletion() throws InterruptedException {
         try {
-            for (ProducerBatch batch : this.incomplete.copyAll())
-                batch.produceFuture.await();
+            // Make a copy of of the request results at the time the flush is called.
+            // We avoid making a copy of the full incomplete batch collections to allow
+            // garbage collection.

Review comment:
       Hmm, we still have a reference to `incomplete` in the class, right? Are we expecting the GC to free that while blocked in this method?

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
##########
@@ -710,8 +710,11 @@ private boolean appendsInProgress() {
      */
     public void awaitFlushCompletion() throws InterruptedException {
         try {
-            for (ProducerBatch batch : this.incomplete.copyAll())
-                batch.produceFuture.await();
+            // Make a copy of of the request results at the time the flush is called.
+            // We avoid making a copy of the full incomplete batch collections to allow
+            // garbage collection.

Review comment:
       Hmm, we still have a reference to `incomplete` in this instance, right? Are we expecting the GC to free that while blocked in this method?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10620: KAFKA-12736: KafkaProducer.flush holds onto completed ProducerBatch(s) until flush completes

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10620:
URL: https://github.com/apache/kafka/pull/10620#discussion_r633025923



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/IncompleteBatches.java
##########
@@ -51,6 +51,14 @@ public void remove(ProducerBatch batch) {
         }
     }
 
+    public Iterable<ProduceRequestResult> requestResults() {
+        synchronized (incomplete) {
+            ArrayList<ProduceRequestResult> results = new ArrayList<>(this.incomplete.size());
+            this.incomplete.forEach(incomplete -> results.add(incomplete.produceFuture));
+            return results;

Review comment:
       Out of curiosity, why did you not use `map` here? Is it to avoid the allocation of the `stream`? It doesn't seem that this is a hotspot though.

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/IncompleteBatches.java
##########
@@ -51,6 +51,14 @@ public void remove(ProducerBatch batch) {
         }
     }
 
+    public Iterable<ProduceRequestResult> requestResults() {
+        synchronized (incomplete) {
+            ArrayList<ProduceRequestResult> results = new ArrayList<>(this.incomplete.size());
+            this.incomplete.forEach(incomplete -> results.add(incomplete.produceFuture));
+            return results;

Review comment:
       Let's do it. :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10620: KAFKA-12736: KafkaProducer.flush holds onto completed ProducerBatch(s) until flush completes

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10620:
URL: https://github.com/apache/kafka/pull/10620#discussion_r633029592



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/IncompleteBatches.java
##########
@@ -51,6 +51,14 @@ public void remove(ProducerBatch batch) {
         }
     }
 
+    public Iterable<ProduceRequestResult> requestResults() {
+        synchronized (incomplete) {
+            ArrayList<ProduceRequestResult> results = new ArrayList<>(this.incomplete.size());
+            this.incomplete.forEach(incomplete -> results.add(incomplete.produceFuture));
+            return results;

Review comment:
       Let's do it. :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lbradstreet commented on a change in pull request #10620: KAFKA-12736: KafkaProducer.flush holds onto completed ProducerBatch(s) until flush completes

Posted by GitBox <gi...@apache.org>.
lbradstreet commented on a change in pull request #10620:
URL: https://github.com/apache/kafka/pull/10620#discussion_r633028169



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/IncompleteBatches.java
##########
@@ -51,6 +51,14 @@ public void remove(ProducerBatch batch) {
         }
     }
 
+    public Iterable<ProduceRequestResult> requestResults() {
+        synchronized (incomplete) {
+            ArrayList<ProduceRequestResult> results = new ArrayList<>(this.incomplete.size());
+            this.incomplete.forEach(incomplete -> results.add(incomplete.produceFuture));
+            return results;

Review comment:
       I think I had a bit of a blind spot because I wanted to preallocate the ArrayList to be the correct size. I think streams is able to do that though. I'm happy to convert it back if it's nicer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org