You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/11/14 23:30:10 UTC

kafka git commit: kafka-1642; [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost; patched by Ewen Cheslack-Postava; reviewed by Guozhang Wang and Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/trunk ca2cf97a6 -> 7d89867c0


kafka-1642; [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost; patched by Ewen Cheslack-Postava; reviewed by Guozhang Wang and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7d89867c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7d89867c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7d89867c

Branch: refs/heads/trunk
Commit: 7d89867c05a7cca6e76cc48c47f8e703d68a9a43
Parents: ca2cf97
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Fri Nov 14 14:30:04 2014 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Nov 14 14:30:04 2014 -0800

----------------------------------------------------------------------
 .../kafka/clients/ClusterConnectionStates.java  | 21 +++++++
 .../org/apache/kafka/clients/KafkaClient.java   | 10 +++
 .../org/apache/kafka/clients/NetworkClient.java | 13 ++++
 .../producer/internals/RecordAccumulator.java   | 18 ++++--
 .../clients/producer/internals/Sender.java      | 13 +++-
 .../org/apache/kafka/clients/MockClient.java    |  5 ++
 .../clients/producer/RecordAccumulatorTest.java | 64 ++++++++++++++++----
 7 files changed, 124 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7d89867c/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
index d304660..8aece7e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -57,6 +57,27 @@ final class ClusterConnectionStates {
     }
 
     /**
+     * Returns the number of milliseconds to wait, based on the connection state, before attempting to send data. When
+     * disconnected, this respects the reconnect backoff time. When connecting or connected, this handles slow/stalled
+     * connections.
+     * @param node The node to check
+     * @param now The current time in ms
+     */
+    public long connectionDelay(int node, long now) {
+        NodeConnectionState state = nodeState.get(node);
+        if (state == null) return 0;
+        long timeWaited = now - state.lastConnectAttemptMs;
+        if (state.state == ConnectionState.DISCONNECTED) {
+            return Math.max(this.reconnectBackoffMs - timeWaited, 0);
+        }
+        else {
+            // When connecting or connected, we should be able to delay indefinitely since other events (connection or
+            // data acked) will cause a wakeup once data can be sent.
+            return Long.MAX_VALUE;
+        }
+    }
+
+    /**
      * Enter the connecting state for the given node.
      * @param node The id of the node we are connecting to
      * @param now The current time.

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d89867c/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
index 29658d4..3976955 100644
--- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
@@ -41,6 +41,16 @@ public interface KafkaClient {
     public boolean ready(Node node, long now);
 
     /**
+     * Returns the number of milliseconds to wait, based on the connection state, before attempting to send data. When
+     * disconnected, this respects the reconnect backoff time. When connecting or connected, this handles slow/stalled
+     * connections.
+     * @param node The node to check
+     * @param now The current timestamp
+     * @return The number of milliseconds to wait.
+     */
+    public long connectionDelay(Node node, long now);
+
+    /**
      * Initiate the sending of the given requests and return any completed responses. Requests can only be sent on ready
      * connections.
      * @param requests The requests to send

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d89867c/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index eea270a..525b95e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -119,6 +119,19 @@ public class NetworkClient implements KafkaClient {
     }
 
     /**
+     * Returns the number of milliseconds to wait, based on the connection state, before attempting to send data. When
+     * disconnected, this respects the reconnect backoff time. When connecting or connected, this handles slow/stalled
+     * connections.
+     * @param node The node to check
+     * @param now The current timestamp
+     * @return The number of milliseconds to wait.
+     */
+    @Override
+    public long connectionDelay(Node node, long now) {
+        return connectionStates.connectionDelay(node.id(), now);
+    }
+
+    /**
      * Check if the node with the given id is ready to send more requests.
      * @param node The given node id
      * @param now The current time in ms

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d89867c/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index c5d4700..c15485d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -183,9 +183,9 @@ public final class RecordAccumulator {
     }
 
     /**
-     * Get a list of nodes whose partitions are ready to be sent, and the time to when any partition will be ready if no
-     * partitions are ready yet; If the ready nodes list is non-empty, the timeout value will be 0. Also return the flag
-     * for whether there are any unknown leaders for the accumulated partition batches.
+     * Get a list of nodes whose partitions are ready to be sent, and the earliest time at which any non-sendable
+     * partition will be ready; Also return the flag for whether there are any unknown leaders for the accumulated
+     * partition batches.
      * <p>
      * A destination node is ready to send data if ANY one of its partition is not backing off the send and ANY of the
      * following are true :
@@ -219,11 +219,17 @@ public final class RecordAccumulator {
                         long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
                         long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                         boolean full = deque.size() > 1 || batch.records.isFull();
-                        boolean expired = waitedTimeMs >= lingerMs;
+                        boolean expired = waitedTimeMs >= timeToWaitMs;
                         boolean sendable = full || expired || exhausted || closed;
-                        if (sendable && !backingOff)
+                        if (sendable && !backingOff) {
                             readyNodes.add(leader);
-                        nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
+                        }
+                        else {
+                            // Note that this results in a conservative estimate since an un-sendable partition may have
+                            // a leader that will later be found to have sendable data. However, this is good enough
+                            // since we'll just wake up and then sleep again for the remaining time.
+                            nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
+                        }
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d89867c/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 8ebe7ed..84a7a07 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -152,10 +152,13 @@ public class Sender implements Runnable {
 
         // remove any nodes we aren't ready to send to
         Iterator<Node> iter = result.readyNodes.iterator();
+        long notReadyTimeout = Long.MAX_VALUE;
         while (iter.hasNext()) {
             Node node = iter.next();
-            if (!this.client.ready(node, now))
+            if (!this.client.ready(node, now)) {
                 iter.remove();
+                notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
+            }
         }
 
         // create produce requests
@@ -163,16 +166,22 @@ public class Sender implements Runnable {
         List<ClientRequest> requests = createProduceRequests(batches, now);
         sensors.updateProduceRequestMetrics(requests);
 
+        // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
+        // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
+        // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
+        // with sendable data that aren't ready to send since they would cause busy looping.
+        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
         if (result.readyNodes.size() > 0) {
             log.trace("Nodes with data ready to send: {}", result.readyNodes);
             log.trace("Created {} produce requests: {}", requests.size(), requests);
+            pollTimeout = 0;
         }
 
         // if some partitions are already ready to be sent, the select time would be 0;
         // otherwise if some partition already has some data accumulated but not ready yet,
         // the select time will be the time difference between now and its linger expiry time;
         // otherwise the select time will be the time difference between now and the metadata expiry time;
-        List<ClientResponse> responses = this.client.poll(requests, result.nextReadyCheckDelayMs, now);
+        List<ClientResponse> responses = this.client.poll(requests, pollTimeout, now);
         for (ClientResponse response : responses) {
             if (response.wasDisconnected())
                 handleDisconnect(response, now);

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d89867c/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index aae8d4a..47b5d4a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -41,6 +41,11 @@ public class MockClient implements KafkaClient {
         return found;
     }
 
+    @Override
+    public long connectionDelay(Node node, long now) {
+        return 0;
+    }
+
     public void disconnect(Integer node) {
         Iterator<ClientRequest> iter = requests.iterator();
         while (iter.hasNext()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d89867c/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
index 0762b35..2c99324 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
@@ -14,6 +14,7 @@ package org.apache.kafka.clients.producer;
 
 import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
 
 import java.nio.ByteBuffer;
@@ -43,16 +44,20 @@ public class RecordAccumulatorTest {
     private String topic = "test";
     private int partition1 = 0;
     private int partition2 = 1;
-    private Node node = new Node(0, "localhost", 1111);
+    private int partition3 = 2;
+    private Node node1 = new Node(0, "localhost", 1111);
+    private Node node2 = new Node(1, "localhost", 1112);
     private TopicPartition tp1 = new TopicPartition(topic, partition1);
     private TopicPartition tp2 = new TopicPartition(topic, partition2);
-    private PartitionInfo part1 = new PartitionInfo(topic, partition1, node, null, null);
-    private PartitionInfo part2 = new PartitionInfo(topic, partition2, node, null, null);
+    private TopicPartition tp3 = new TopicPartition(topic, partition3);
+    private PartitionInfo part1 = new PartitionInfo(topic, partition1, node1, null, null);
+    private PartitionInfo part2 = new PartitionInfo(topic, partition2, node1, null, null);
+    private PartitionInfo part3 = new PartitionInfo(topic, partition3, node2, null, null);
     private MockTime time = new MockTime();
     private byte[] key = "key".getBytes();
     private byte[] value = "value".getBytes();
     private int msgSize = Records.LOG_OVERHEAD + Record.recordSize(key, value);
-    private Cluster cluster = new Cluster(Collections.singleton(node), Arrays.asList(part1, part2));
+    private Cluster cluster = new Cluster(Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3));
     private Metrics metrics = new Metrics(time);
 
     @Test
@@ -65,8 +70,8 @@ public class RecordAccumulatorTest {
             assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size());
         }
         accum.append(tp1, key, value, CompressionType.NONE, null);
-        assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()).readyNodes);
-        List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node), Integer.MAX_VALUE, 0).get(node.id());
+        assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
+        List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id());
         assertEquals(1, batches.size());
         RecordBatch batch = batches.get(0);
         Iterator<LogEntry> iter = batch.records.iterator();
@@ -83,7 +88,7 @@ public class RecordAccumulatorTest {
         int batchSize = 512;
         RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, 100L, false, metrics, time);
         accum.append(tp1, key, new byte[2 * batchSize], CompressionType.NONE, null);
-        assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()).readyNodes);
+        assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
     }
 
     @Test
@@ -93,8 +98,8 @@ public class RecordAccumulatorTest {
         accum.append(tp1, key, value, CompressionType.NONE, null);
         assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
         time.sleep(10);
-        assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()).readyNodes);
-        List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node), Integer.MAX_VALUE, 0).get(node.id());
+        assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
+        List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id());
         assertEquals(1, batches.size());
         RecordBatch batch = batches.get(0);
         Iterator<LogEntry> iter = batch.records.iterator();
@@ -113,9 +118,9 @@ public class RecordAccumulatorTest {
             for (int i = 0; i < appends; i++)
                 accum.append(tp, key, value, CompressionType.NONE, null);
         }
-        assertEquals("Partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()).readyNodes);
+        assertEquals("Partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
 
-        List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node), 1024, 0).get(node.id());
+        List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), 1024, 0).get(node1.id());
         assertEquals("But due to size bound only one partition should have been retrieved", 1, batches.size());
     }
 
@@ -145,7 +150,7 @@ public class RecordAccumulatorTest {
         long now = time.milliseconds();
         while (read < numThreads * msgs) {
             Set<Node> nodes = accum.ready(cluster, now).readyNodes;
-            List<RecordBatch> batches = accum.drain(cluster, nodes, 5 * 1024, 0).get(node.id());
+            List<RecordBatch> batches = accum.drain(cluster, nodes, 5 * 1024, 0).get(node1.id());
             if (batches != null) {
                 for (RecordBatch batch : batches) {
                     for (LogEntry entry : batch.records)
@@ -159,4 +164,39 @@ public class RecordAccumulatorTest {
             t.join();
     }
 
+
+    @Test
+    public void testNextReadyCheckDelay() throws Exception {
+        // Next check time will use lingerMs since this test won't trigger any retries/backoff
+        long lingerMs = 10L;
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time);
+        // Just short of going over the limit so we trigger linger time
+        int appends = 1024 / msgSize;
+
+        // Partition on node1 only
+        for (int i = 0; i < appends; i++)
+            accum.append(tp1, key, value, CompressionType.NONE, null);
+        RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
+        assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
+        assertEquals("Next check time should be the linger time", lingerMs, result.nextReadyCheckDelayMs);
+
+        time.sleep(lingerMs / 2);
+
+        // Add partition on node2 only
+        for (int i = 0; i < appends; i++)
+            accum.append(tp3, key, value, CompressionType.NONE, null);
+        result = accum.ready(cluster, time.milliseconds());
+        assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
+        assertEquals("Next check time should be defined by node1, half remaining linger time", lingerMs / 2, result.nextReadyCheckDelayMs);
+
+        // Add data for another partition on node1, enough to make data sendable immediately
+        for (int i = 0; i < appends+1; i++)
+            accum.append(tp2, key, value, CompressionType.NONE, null);
+        result = accum.ready(cluster, time.milliseconds());
+        assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes);
+        // Note this can actually be < linger time because it may use delays from partitions that aren't sendable
+        // but have leaders with other sendable data.
+        assertTrue("Next check time should be defined by node2, at most linger time", result.nextReadyCheckDelayMs <= lingerMs);
+    }
+
 }