You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2016/03/08 23:07:12 UTC

kafka git commit: KAFKA-3197; Fix producer sending records out of order

Repository: kafka
Updated Branches:
  refs/heads/trunk f6e35dec9 -> 5afa16601


KAFKA-3197; Fix producer sending records out of order

This patch reuse max.in.flight.request.per.connection. When it equals to one, we take it as user wants order protection. The current approach is make sure there is only one batch per partition on the fly.

Author: Jiangjie Qin <be...@gmail.com>

Reviewers: Aditya Auradkar <aa...@linkedin.com>, Jason Gustafson <ja...@confluent.io>, Grant Henke <gr...@gmail.com>, Ismael Juma <is...@juma.me.uk>, Joel Koshy <jj...@gmail.com>, Jun Rao <ju...@gmail.com>

Closes #857 from becketqin/KAFKA-3197


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5afa1660
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5afa1660
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5afa1660

Branch: refs/heads/trunk
Commit: 5afa1660103df8aecbf7558e761418944fb5905a
Parents: f6e35de
Author: Jiangjie Qin <be...@gmail.com>
Authored: Tue Mar 8 14:06:53 2016 -0800
Committer: Joel Koshy <jj...@gmail.com>
Committed: Tue Mar 8 14:06:53 2016 -0800

----------------------------------------------------------------------
 .../kafka/clients/producer/KafkaProducer.java   |  1 +
 .../producer/internals/RecordAccumulator.java   | 84 +++++++++++++-------
 .../clients/producer/internals/Sender.java      | 17 +++-
 .../internals/RecordAccumulatorTest.java        | 37 ++++++++-
 .../clients/producer/internals/SenderTest.java  | 51 ++++++++++++
 .../java/org/apache/kafka/test/TestUtils.java   |  2 +-
 6 files changed, 156 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5afa1660/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index a066512..85ba9ef 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -287,6 +287,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             this.sender = new Sender(client,
                     this.metadata,
                     this.accumulator,
+                    config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
                     config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                     (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
                     config.getInt(ProducerConfig.RETRIES_CONFIG),

http://git-wip-us.apache.org/repos/asf/kafka/blob/5afa1660/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index f1414f0..beaa832 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -59,7 +59,6 @@ public final class RecordAccumulator {
     private static final Logger log = LoggerFactory.getLogger(RecordAccumulator.class);
 
     private volatile boolean closed;
-    private int drainIndex;
     private final AtomicInteger flushesInProgress;
     private final AtomicInteger appendsInProgress;
     private final int batchSize;
@@ -70,6 +69,9 @@ public final class RecordAccumulator {
     private final Time time;
     private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
     private final IncompleteRecordBatches incomplete;
+    // The following variables are only accessed by the sender thread, so we don't need to protect them.
+    private final Set<TopicPartition> muted;
+    private int drainIndex;
 
 
     /**
@@ -105,6 +107,7 @@ public final class RecordAccumulator {
         String metricGrpName = "producer-metrics";
         this.free = new BufferPool(totalSize, batchSize, metrics, time, metricGrpName);
         this.incomplete = new IncompleteRecordBatches();
+        this.muted = new HashSet<TopicPartition>();
         this.time = time;
         registerMetrics(metrics, metricGrpName);
     }
@@ -213,7 +216,6 @@ public final class RecordAccumulator {
         List<RecordBatch> expiredBatches = new ArrayList<RecordBatch>();
         int count = 0;
         for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
-            TopicPartition topicAndPartition = entry.getKey();
             Deque<RecordBatch> dq = entry.getValue();
             synchronized (dq) {
                 // iterate over the batches and expire them if they have stayed in accumulator for more than requestTimeOut
@@ -259,14 +261,20 @@ public final class RecordAccumulator {
      * partition will be ready; Also return the flag for whether there are any unknown leaders for the accumulated
      * partition batches.
      * <p>
-     * A destination node is ready to send data if ANY one of its partition is not backing off the send and ANY of the
-     * following are true :
+     * A destination node is ready to send data if:
      * <ol>
-     * <li>The record set is full
-     * <li>The record set has sat in the accumulator for at least lingerMs milliseconds
-     * <li>The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions are
-     * immediately considered ready).
-     * <li>The accumulator has been closed
+     * <li>There is at least one partition that is not backing off its send
+     * <li><b>and</b> those partitions are not muted (to prevent reordering if
+     *   {@value org.apache.kafka.clients.producer.ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION}
+     *   is set to one)</li>
+     * <li><b>and <i>any</i></b> of the following are true</li>
+     * <ul>
+     *     <li>The record set is full</li>
+     *     <li>The record set has sat in the accumulator for at least lingerMs milliseconds</li>
+     *     <li>The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions
+     *     are immediately considered ready).</li>
+     *     <li>The accumulator has been closed</li>
+     * </ul>
      * </ol>
      */
     public ReadyCheckResult ready(Cluster cluster, long nowMs) {
@@ -282,7 +290,7 @@ public final class RecordAccumulator {
             Node leader = cluster.leaderFor(part);
             if (leader == null) {
                 unknownLeadersExist = true;
-            } else if (!readyNodes.contains(leader)) {
+            } else if (!readyNodes.contains(leader) && !muted.contains(part)) {
                 synchronized (deque) {
                     RecordBatch batch = deque.peekFirst();
                     if (batch != null) {
@@ -333,7 +341,10 @@ public final class RecordAccumulator {
      * @param now The current unix time in milliseconds
      * @return A list of {@link RecordBatch} for each node specified with total size less than the requested maxSize.
      */
-    public Map<Integer, List<RecordBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {
+    public Map<Integer, List<RecordBatch>> drain(Cluster cluster,
+                                                 Set<Node> nodes,
+                                                 int maxSize,
+                                                 long now) {
         if (nodes.isEmpty())
             return Collections.emptyMap();
 
@@ -346,25 +357,29 @@ public final class RecordAccumulator {
             int start = drainIndex = drainIndex % parts.size();
             do {
                 PartitionInfo part = parts.get(drainIndex);
-                Deque<RecordBatch> deque = dequeFor(new TopicPartition(part.topic(), part.partition()));
-                if (deque != null) {
-                    synchronized (deque) {
-                        RecordBatch first = deque.peekFirst();
-                        if (first != null) {
-                            boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now;
-                            // Only drain the batch if it is not during backoff period.
-                            if (!backoff) {
-                                if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) {
-                                    // there is a rare case that a single batch size is larger than the request size due
-                                    // to compression; in this case we will still eventually send this batch in a single
-                                    // request
-                                    break;
-                                } else {
-                                    RecordBatch batch = deque.pollFirst();
-                                    batch.records.close();
-                                    size += batch.records.sizeInBytes();
-                                    ready.add(batch);
-                                    batch.drainedMs = now;
+                TopicPartition tp = new TopicPartition(part.topic(), part.partition());
+                // Only proceed if the partition has no in-flight batches.
+                if (!muted.contains(tp)) {
+                    Deque<RecordBatch> deque = dequeFor(new TopicPartition(part.topic(), part.partition()));
+                    if (deque != null) {
+                        synchronized (deque) {
+                            RecordBatch first = deque.peekFirst();
+                            if (first != null) {
+                                boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now;
+                                // Only drain the batch if it is not during backoff period.
+                                if (!backoff) {
+                                    if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) {
+                                        // there is a rare case that a single batch size is larger than the request size due
+                                        // to compression; in this case we will still eventually send this batch in a single
+                                        // request
+                                        break;
+                                    } else {
+                                        RecordBatch batch = deque.pollFirst();
+                                        batch.records.close();
+                                        size += batch.records.sizeInBytes();
+                                        ready.add(batch);
+                                        batch.drainedMs = now;
+                                    }
                                 }
                             }
                         }
@@ -465,6 +480,14 @@ public final class RecordAccumulator {
         }
     }
 
+    public void mutePartition(TopicPartition tp) {
+        muted.add(tp);
+    }
+
+    public void unmutePartition(TopicPartition tp) {
+        muted.remove(tp);
+    }
+
     /**
      * Close this accumulator and force all the record buffers to be drained
      */
@@ -532,4 +555,5 @@ public final class RecordAccumulator {
             }
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5afa1660/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 9d24d07..db8918c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -67,6 +67,9 @@ public class Sender implements Runnable {
     /* the metadata for the client */
     private final Metadata metadata;
 
+    /* the flag indicating whether the producer should guarantee the message order on the broker or not. */
+    private final boolean guaranteeMessageOrder;
+
     /* the maximum request size to attempt to send to the server */
     private final int maxRequestSize;
 
@@ -97,6 +100,7 @@ public class Sender implements Runnable {
     public Sender(KafkaClient client,
                   Metadata metadata,
                   RecordAccumulator accumulator,
+                  boolean guaranteeMessageOrder,
                   int maxRequestSize,
                   short acks,
                   int retries,
@@ -107,6 +111,7 @@ public class Sender implements Runnable {
         this.client = client;
         this.accumulator = accumulator;
         this.metadata = metadata;
+        this.guaranteeMessageOrder = guaranteeMessageOrder;
         this.maxRequestSize = maxRequestSize;
         this.running = true;
         this.acks = acks;
@@ -164,7 +169,7 @@ public class Sender implements Runnable {
      * @param now
      *            The current POSIX time in milliseconds
      */
-    public void run(long now) {
+    void run(long now) {
         Cluster cluster = metadata.fetch();
         // get the list of partitions with data ready to send
         RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
@@ -189,6 +194,13 @@ public class Sender implements Runnable {
                                                                          result.readyNodes,
                                                                          this.maxRequestSize,
                                                                          now);
+        if (guaranteeMessageOrder) {
+            // Mute all the partitions drained
+            for (List<RecordBatch> batchList : batches.values()) {
+                for (RecordBatch batch : batchList)
+                    this.accumulator.mutePartition(batch.topicPartition);
+            }
+        }
 
         List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, cluster, now);
         // update sensors
@@ -304,6 +316,9 @@ public class Sender implements Runnable {
         }
         if (error.exception() instanceof InvalidMetadataException)
             metadata.requestUpdate();
+        // Unmute the completed partition.
+        if (guaranteeMessageOrder)
+            this.accumulator.unmutePartition(batch.topicPartition);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/5afa1660/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 0f95ee5..3660272 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -14,8 +14,8 @@ package org.apache.kafka.clients.producer.internals;
 
 import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -39,8 +39,6 @@ import org.apache.kafka.common.record.LogEntry;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Time;
 import org.junit.After;
 import org.junit.Test;
 
@@ -299,7 +297,6 @@ public class RecordAccumulatorTest {
 
     @Test
     public void testExpiredBatches() throws InterruptedException {
-        Time time = new SystemTime();
         long now = time.milliseconds();
         RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10, 100L, metrics, time);
         int appends = 1024 / msgSize;
@@ -317,4 +314,36 @@ public class RecordAccumulatorTest {
         List<RecordBatch> expiredBatches = accum.abortExpiredBatches(60, cluster, now);
         assertEquals(1, expiredBatches.size());
     }
+
+    @Test
+    public void testMutedPartitions() throws InterruptedException {
+        long now = time.milliseconds();
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10, 100L, metrics, time);
+        int appends = 1024 / msgSize;
+        for (int i = 0; i < appends; i++) {
+            accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
+            assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size());
+        }
+        time.sleep(2000);
+
+        // Test ready with muted partition
+        accum.mutePartition(tp1);
+        RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
+        assertEquals("No node should be ready", 0, result.readyNodes.size());
+
+        // Test ready without muted partition
+        accum.unmutePartition(tp1);
+        result = accum.ready(cluster, time.milliseconds());
+        assertTrue("The batch should be ready", result.readyNodes.size() > 0);
+
+        // Test drain with muted partition
+        accum.mutePartition(tp1);
+        Map<Integer, List<RecordBatch>> drained = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
+        assertEquals("No batch should have been drained", 0, drained.get(node1.id()).size());
+
+        // Test drain without muted partition.
+        accum.unmutePartition(tp1);
+        drained = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
+        assertTrue("The batch should have been drained.", drained.get(node1.id()).size() > 0);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5afa1660/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index b983de5..fb67747 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.CompressionType;
@@ -75,6 +76,7 @@ public class SenderTest {
         sender = new Sender(client,
                             metadata,
                             this.accumulator,
+                            true,
                             MAX_REQUEST_SIZE,
                             ACKS_ALL,
                             MAX_RETRIES,
@@ -134,6 +136,7 @@ public class SenderTest {
             Sender sender = new Sender(client,
                                        metadata,
                                        this.accumulator,
+                                       false,
                                        MAX_REQUEST_SIZE,
                                        ACKS_ALL,
                                        maxRetries,
@@ -178,6 +181,54 @@ public class SenderTest {
         }
     }
 
+    @Test
+    public void testSendInOrder() throws Exception {
+        int maxRetries = 1;
+        Metrics m = new Metrics();
+        try {
+            Sender sender = new Sender(client,
+                metadata,
+                this.accumulator,
+                true,
+                MAX_REQUEST_SIZE,
+                ACKS_ALL,
+                maxRetries,
+                m,
+                time,
+                "clientId",
+                REQUEST_TIMEOUT);
+
+            // Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
+            Cluster cluster1 = TestUtils.clusterWith(2, "test", 2);
+            metadata.update(cluster1, time.milliseconds());
+
+            // Send the first message.
+            TopicPartition tp2 = new TopicPartition("test", 1);
+            accumulator.append(tp2, 0L, "key1".getBytes(), "value1".getBytes(), null, MAX_BLOCK_TIMEOUT);
+            sender.run(time.milliseconds()); // connect
+            sender.run(time.milliseconds()); // send produce request
+            String id = client.requests().peek().request().destination();
+            assertEquals(ApiKeys.PRODUCE.id, client.requests().peek().request().header().apiKey());
+            Node node = new Node(Integer.valueOf(id), "localhost", 0);
+            assertEquals(1, client.inFlightRequestCount());
+            assertTrue("Client ready status should be true", client.isReady(node, 0L));
+
+            time.sleep(900);
+            // Now send another message to tp2
+            accumulator.append(tp2, 0L, "key2".getBytes(), "value2".getBytes(), null, MAX_BLOCK_TIMEOUT);
+
+            // Update metadata before sender receives response from broker 0. Now partition 2 moves to broker 0
+            Cluster cluster2 = TestUtils.singletonCluster("test", 2);
+            metadata.update(cluster2, time.milliseconds());
+            // Sender should not send the second message to node 0.
+            sender.run(time.milliseconds());
+            assertEquals(1, client.inFlightRequestCount());
+        } finally {
+            m.close();
+        }
+
+    }
+
     private void completedWithError(Future<RecordMetadata> future, Errors error) throws Exception {
         assertTrue("Request should be completed", future.isDone());
         try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5afa1660/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 7b88f5e..7ffc54a 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -52,7 +52,7 @@ public class TestUtils {
     public static Cluster clusterWith(int nodes, String topic, int partitions) {
         Node[] ns = new Node[nodes];
         for (int i = 0; i < nodes; i++)
-            ns[i] = new Node(0, "localhost", 1969);
+            ns[i] = new Node(i, "localhost", 1969);
         List<PartitionInfo> parts = new ArrayList<PartitionInfo>();
         for (int i = 0; i < partitions; i++)
             parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, ns));