You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2014/02/12 04:53:14 UTC
git commit: KAFKA-1259 Close blocked only until all messages had been
sent not until all acknowledgements had been received.
Updated Branches:
refs/heads/trunk e1845ba1d -> ef1e30bf5
KAFKA-1259 Close blocked only until all messages had been sent not until all acknowledgements had been received.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ef1e30bf
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ef1e30bf
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ef1e30bf
Branch: refs/heads/trunk
Commit: ef1e30bf5b0aba1758522f8cacd2804d3bfbd4fb
Parents: e1845ba
Author: Jay Kreps <ja...@gmail.com>
Authored: Tue Feb 11 16:24:08 2014 -0800
Committer: Jay Kreps <ja...@gmail.com>
Committed: Tue Feb 11 19:52:49 2014 -0800
----------------------------------------------------------------------
.../kafka/clients/producer/internals/Sender.java | 13 +++++++++++--
1 file changed, 11 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/ef1e30bf/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 87dd1a6..d93a455 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -101,7 +101,9 @@ public class Sender implements Runnable {
}
}
- // send anything left in the accumulator
+ // okay we stopped accepting requests but there may still be
+ // requests in the accumulator or waiting for acknowledgment,
+ // wait until these are completed.
int unsent = 0;
do {
try {
@@ -109,7 +111,7 @@ public class Sender implements Runnable {
} catch (Exception e) {
e.printStackTrace();
}
- } while (unsent > 0);
+ } while (unsent > 0 || this.inFlightRequests.totalInFlightRequests() > 0);
// close all the connections
this.selector.close();
@@ -534,6 +536,13 @@ public class Sender implements Runnable {
return requests.remove(node);
}
}
+
+ public int totalInFlightRequests() {
+ int total = 0;
+ for (Deque<InFlightRequest> deque : this.requests.values())
+ total += deque.size();
+ return total;
+ }
}
}