You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/03/12 04:56:54 UTC
git commit: kafka-1301;
system testcase_0206 fails using the new producer; patched by Jun Rao;
reviewed by Jay Kreps
Repository: kafka
Updated Branches:
refs/heads/trunk c765d7bd4 -> c124bbbb6
kafka-1301; system testcase_0206 fails using the new producer; patched by Jun Rao; reviewed by Jay Kreps
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c124bbbb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c124bbbb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c124bbbb
Branch: refs/heads/trunk
Commit: c124bbbb6c3e2152915411e1c3b00fca67634eed
Parents: c765d7b
Author: Jun Rao <ju...@gmail.com>
Authored: Tue Mar 11 20:56:32 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Mar 11 20:56:32 2014 -0700
----------------------------------------------------------------------
.../clients/producer/internals/RecordAccumulator.java | 14 ++++++++++++++
.../kafka/clients/producer/internals/Sender.java | 10 +++-------
.../replication_testsuite/config/server.properties | 2 +-
3 files changed, 18 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/c124bbbb/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 616e100..7a03f38 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -205,6 +205,20 @@ public final class RecordAccumulator {
}
/**
+ * @return Whether there is any unsent record in the accumulator.
+ */
+ public boolean hasUnsent() {
+ for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
+ Deque<RecordBatch> deque = entry.getValue();
+ synchronized (deque) {
+ if (deque.size() > 0)
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
* Drain all the data for the given topic-partitions that will fit within the specified size. This method attempts
* to avoid choosing the same topic-partitions over and over.
*
http://git-wip-us.apache.org/repos/asf/kafka/blob/c124bbbb/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 2acb96d..7b5d144 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -159,14 +159,13 @@ public class Sender implements Runnable {
// okay we stopped accepting requests but there may still be
// requests in the accumulator or waiting for acknowledgment,
// wait until these are completed.
- int unsent = 0;
do {
try {
- unsent = run(time.milliseconds());
+ run(time.milliseconds());
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
- } while (unsent > 0 || this.inFlightRequests.totalInFlightRequests() > 0);
+ } while (this.accumulator.hasUnsent() || this.inFlightRequests.totalInFlightRequests() > 0);
// close all the connections
this.selector.close();
@@ -178,9 +177,8 @@ public class Sender implements Runnable {
* Run a single iteration of sending
*
* @param now The current time
- * @return The total number of topic/partitions that had data ready (regardless of what we actually sent)
*/
- public int run(long now) {
+ public void run(long now) {
Cluster cluster = metadata.fetch();
// get the list of partitions with data ready to send
List<TopicPartition> ready = this.accumulator.ready(now);
@@ -220,8 +218,6 @@ public class Sender implements Runnable {
handleResponses(this.selector.completedReceives(), time.milliseconds());
handleDisconnects(this.selector.disconnected(), time.milliseconds());
handleConnects(this.selector.connected());
-
- return ready.size();
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/c124bbbb/system_test/replication_testsuite/config/server.properties
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/config/server.properties b/system_test/replication_testsuite/config/server.properties
index c628412..6becbab 100644
--- a/system_test/replication_testsuite/config/server.properties
+++ b/system_test/replication_testsuite/config/server.properties
@@ -136,5 +136,5 @@ replica.socket.timeout.ms=30000
replica.socket.receive.buffer.bytes=65536
replica.fetch.max.bytes=1048576
replica.fetch.wait.max.ms=500
-replica.fetch.min.bytes=4096
+replica.fetch.min.bytes=1
num.replica.fetchers=1