You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/03/12 04:56:54 UTC

git commit: kafka-1301; system testcase_0206 fails using the new producer; patched by Jun Rao; reviewed by Jay Kreps

Repository: kafka
Updated Branches:
  refs/heads/trunk c765d7bd4 -> c124bbbb6


kafka-1301; system testcase_0206 fails using the new producer; patched by Jun Rao; reviewed by Jay Kreps


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c124bbbb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c124bbbb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c124bbbb

Branch: refs/heads/trunk
Commit: c124bbbb6c3e2152915411e1c3b00fca67634eed
Parents: c765d7b
Author: Jun Rao <ju...@gmail.com>
Authored: Tue Mar 11 20:56:32 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Mar 11 20:56:32 2014 -0700

----------------------------------------------------------------------
 .../clients/producer/internals/RecordAccumulator.java | 14 ++++++++++++++
 .../kafka/clients/producer/internals/Sender.java      | 10 +++-------
 .../replication_testsuite/config/server.properties    |  2 +-
 3 files changed, 18 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c124bbbb/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 616e100..7a03f38 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -205,6 +205,20 @@ public final class RecordAccumulator {
     }
 
     /**
+     * @return Whether there is any unsent record in the accumulator.
+     */
+    public boolean hasUnsent() {
+        for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
+            Deque<RecordBatch> deque = entry.getValue();
+            synchronized (deque) {
+                if (deque.size() > 0)
+                    return true;
+            }
+        }
+        return false;
+    }
+
+    /**
      * Drain all the data for the given topic-partitions that will fit within the specified size. This method attempts
      * to avoid choosing the same topic-partitions over and over.
      * 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c124bbbb/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 2acb96d..7b5d144 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -159,14 +159,13 @@ public class Sender implements Runnable {
         // okay we stopped accepting requests but there may still be
         // requests in the accumulator or waiting for acknowledgment,
         // wait until these are completed.
-        int unsent = 0;
         do {
             try {
-                unsent = run(time.milliseconds());
+                run(time.milliseconds());
             } catch (Exception e) {
                 log.error("Uncaught error in kafka producer I/O thread: ", e);
             }
-        } while (unsent > 0 || this.inFlightRequests.totalInFlightRequests() > 0);
+        } while (this.accumulator.hasUnsent() || this.inFlightRequests.totalInFlightRequests() > 0);
 
         // close all the connections
         this.selector.close();
@@ -178,9 +177,8 @@ public class Sender implements Runnable {
      * Run a single iteration of sending
      * 
      * @param now The current time
-     * @return The total number of topic/partitions that had data ready (regardless of what we actually sent)
      */
-    public int run(long now) {
+    public void run(long now) {
         Cluster cluster = metadata.fetch();
         // get the list of partitions with data ready to send
         List<TopicPartition> ready = this.accumulator.ready(now);
@@ -220,8 +218,6 @@ public class Sender implements Runnable {
         handleResponses(this.selector.completedReceives(), time.milliseconds());
         handleDisconnects(this.selector.disconnected(), time.milliseconds());
         handleConnects(this.selector.connected());
-
-        return ready.size();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/c124bbbb/system_test/replication_testsuite/config/server.properties
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/config/server.properties b/system_test/replication_testsuite/config/server.properties
index c628412..6becbab 100644
--- a/system_test/replication_testsuite/config/server.properties
+++ b/system_test/replication_testsuite/config/server.properties
@@ -136,5 +136,5 @@ replica.socket.timeout.ms=30000
 replica.socket.receive.buffer.bytes=65536
 replica.fetch.max.bytes=1048576
 replica.fetch.wait.max.ms=500
-replica.fetch.min.bytes=4096
+replica.fetch.min.bytes=1
 num.replica.fetchers=1