You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2014/02/12 04:53:14 UTC

git commit: KAFKA-1259 Close blocked only until all messages had been sent not until all acknowledgements had been received.

Updated Branches:
  refs/heads/trunk e1845ba1d -> ef1e30bf5


KAFKA-1259 Close blocked only until all messages had been sent not until all acknowledgements had been received.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ef1e30bf
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ef1e30bf
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ef1e30bf

Branch: refs/heads/trunk
Commit: ef1e30bf5b0aba1758522f8cacd2804d3bfbd4fb
Parents: e1845ba
Author: Jay Kreps <ja...@gmail.com>
Authored: Tue Feb 11 16:24:08 2014 -0800
Committer: Jay Kreps <ja...@gmail.com>
Committed: Tue Feb 11 19:52:49 2014 -0800

----------------------------------------------------------------------
 .../kafka/clients/producer/internals/Sender.java       | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ef1e30bf/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 87dd1a6..d93a455 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -101,7 +101,9 @@ public class Sender implements Runnable {
             }
         }
 
-        // send anything left in the accumulator
+        // okay we stopped accepting requests but there may still be
+        // requests in the accumulator or waiting for acknowledgment,
+        // wait until these are completed.
         int unsent = 0;
         do {
             try {
@@ -109,7 +111,7 @@ public class Sender implements Runnable {
             } catch (Exception e) {
                 e.printStackTrace();
             }
-        } while (unsent > 0);
+        } while (unsent > 0 || this.inFlightRequests.totalInFlightRequests() > 0);
 
         // close all the connections
         this.selector.close();
@@ -534,6 +536,13 @@ public class Sender implements Runnable {
                 return requests.remove(node);
             }
         }
+
+        public int totalInFlightRequests() {
+            int total = 0;
+            for (Deque<InFlightRequest> deque : this.requests.values())
+                total += deque.size();
+            return total;
+        }
     }
 
 }