You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/11/14 23:30:10 UTC
kafka git commit: kafka-1642;
[Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network
connection is lost; patched by Ewen Cheslack-Postava;
reviewed by Guozhang Wang and Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk ca2cf97a6 -> 7d89867c0
kafka-1642; [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost; patched by Ewen Cheslack-Postava; reviewed by Guozhang Wang and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7d89867c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7d89867c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7d89867c
Branch: refs/heads/trunk
Commit: 7d89867c05a7cca6e76cc48c47f8e703d68a9a43
Parents: ca2cf97
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Fri Nov 14 14:30:04 2014 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Nov 14 14:30:04 2014 -0800
----------------------------------------------------------------------
.../kafka/clients/ClusterConnectionStates.java | 21 +++++++
.../org/apache/kafka/clients/KafkaClient.java | 10 +++
.../org/apache/kafka/clients/NetworkClient.java | 13 ++++
.../producer/internals/RecordAccumulator.java | 18 ++++--
.../clients/producer/internals/Sender.java | 13 +++-
.../org/apache/kafka/clients/MockClient.java | 5 ++
.../clients/producer/RecordAccumulatorTest.java | 64 ++++++++++++++++----
7 files changed, 124 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d89867c/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
index d304660..8aece7e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -57,6 +57,27 @@ final class ClusterConnectionStates {
}
/**
+ * Returns the number of milliseconds to wait, based on the connection state, before attempting to send data. When
+ * disconnected, this respects the reconnect backoff time. When connecting or connected, this handles slow/stalled
+ * connections.
+ * @param node The node to check
+ * @param now The current time in ms
+ */
+ public long connectionDelay(int node, long now) {
+ NodeConnectionState state = nodeState.get(node);
+ if (state == null) return 0;
+ long timeWaited = now - state.lastConnectAttemptMs;
+ if (state.state == ConnectionState.DISCONNECTED) {
+ return Math.max(this.reconnectBackoffMs - timeWaited, 0);
+ }
+ else {
+ // When connecting or connected, we should be able to delay indefinitely since other events (connection or
+ // data acked) will cause a wakeup once data can be sent.
+ return Long.MAX_VALUE;
+ }
+ }
+
+ /**
* Enter the connecting state for the given node.
* @param node The id of the node we are connecting to
* @param now The current time.
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d89867c/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
index 29658d4..3976955 100644
--- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
@@ -41,6 +41,16 @@ public interface KafkaClient {
public boolean ready(Node node, long now);
/**
+ * Returns the number of milliseconds to wait, based on the connection state, before attempting to send data. When
+ * disconnected, this respects the reconnect backoff time. When connecting or connected, this handles slow/stalled
+ * connections.
+ * @param node The node to check
+ * @param now The current timestamp
+ * @return The number of milliseconds to wait.
+ */
+ public long connectionDelay(Node node, long now);
+
+ /**
* Initiate the sending of the given requests and return any completed responses. Requests can only be sent on ready
* connections.
* @param requests The requests to send
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d89867c/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index eea270a..525b95e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -119,6 +119,19 @@ public class NetworkClient implements KafkaClient {
}
/**
+ * Returns the number of milliseconds to wait, based on the connection state, before attempting to send data. When
+ * disconnected, this respects the reconnect backoff time. When connecting or connected, this handles slow/stalled
+ * connections.
+ * @param node The node to check
+ * @param now The current timestamp
+ * @return The number of milliseconds to wait.
+ */
+ @Override
+ public long connectionDelay(Node node, long now) {
+ return connectionStates.connectionDelay(node.id(), now);
+ }
+
+ /**
* Check if the node with the given id is ready to send more requests.
* @param node The given node id
* @param now The current time in ms
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d89867c/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index c5d4700..c15485d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -183,9 +183,9 @@ public final class RecordAccumulator {
}
/**
- * Get a list of nodes whose partitions are ready to be sent, and the time to when any partition will be ready if no
- * partitions are ready yet; If the ready nodes list is non-empty, the timeout value will be 0. Also return the flag
- * for whether there are any unknown leaders for the accumulated partition batches.
+ * Get a list of nodes whose partitions are ready to be sent, and the earliest time at which any non-sendable
+ * partition will be ready; Also return the flag for whether there are any unknown leaders for the accumulated
+ * partition batches.
* <p>
* A destination node is ready to send data if ANY one of its partition is not backing off the send and ANY of the
* following are true :
@@ -219,11 +219,17 @@ public final class RecordAccumulator {
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
boolean full = deque.size() > 1 || batch.records.isFull();
- boolean expired = waitedTimeMs >= lingerMs;
+ boolean expired = waitedTimeMs >= timeToWaitMs;
boolean sendable = full || expired || exhausted || closed;
- if (sendable && !backingOff)
+ if (sendable && !backingOff) {
readyNodes.add(leader);
- nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
+ }
+ else {
+ // Note that this results in a conservative estimate since an un-sendable partition may have
+ // a leader that will later be found to have sendable data. However, this is good enough
+ // since we'll just wake up and then sleep again for the remaining time.
+ nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d89867c/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 8ebe7ed..84a7a07 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -152,10 +152,13 @@ public class Sender implements Runnable {
// remove any nodes we aren't ready to send to
Iterator<Node> iter = result.readyNodes.iterator();
+ long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
- if (!this.client.ready(node, now))
+ if (!this.client.ready(node, now)) {
iter.remove();
+ notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
+ }
}
// create produce requests
@@ -163,16 +166,22 @@ public class Sender implements Runnable {
List<ClientRequest> requests = createProduceRequests(batches, now);
sensors.updateProduceRequestMetrics(requests);
+ // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
+ // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
+ // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
+ // with sendable data that aren't ready to send since they would cause busy looping.
+ long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
if (result.readyNodes.size() > 0) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
log.trace("Created {} produce requests: {}", requests.size(), requests);
+ pollTimeout = 0;
}
// if some partitions are already ready to be sent, the select time would be 0;
// otherwise if some partition already has some data accumulated but not ready yet,
// the select time will be the time difference between now and its linger expiry time;
// otherwise the select time will be the time difference between now and the metadata expiry time;
- List<ClientResponse> responses = this.client.poll(requests, result.nextReadyCheckDelayMs, now);
+ List<ClientResponse> responses = this.client.poll(requests, pollTimeout, now);
for (ClientResponse response : responses) {
if (response.wasDisconnected())
handleDisconnect(response, now);
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d89867c/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index aae8d4a..47b5d4a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -41,6 +41,11 @@ public class MockClient implements KafkaClient {
return found;
}
+ @Override
+ public long connectionDelay(Node node, long now) {
+ return 0;
+ }
+
public void disconnect(Integer node) {
Iterator<ClientRequest> iter = requests.iterator();
while (iter.hasNext()) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d89867c/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
index 0762b35..2c99324 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
@@ -14,6 +14,7 @@ package org.apache.kafka.clients.producer;
import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import java.nio.ByteBuffer;
@@ -43,16 +44,20 @@ public class RecordAccumulatorTest {
private String topic = "test";
private int partition1 = 0;
private int partition2 = 1;
- private Node node = new Node(0, "localhost", 1111);
+ private int partition3 = 2;
+ private Node node1 = new Node(0, "localhost", 1111);
+ private Node node2 = new Node(1, "localhost", 1112);
private TopicPartition tp1 = new TopicPartition(topic, partition1);
private TopicPartition tp2 = new TopicPartition(topic, partition2);
- private PartitionInfo part1 = new PartitionInfo(topic, partition1, node, null, null);
- private PartitionInfo part2 = new PartitionInfo(topic, partition2, node, null, null);
+ private TopicPartition tp3 = new TopicPartition(topic, partition3);
+ private PartitionInfo part1 = new PartitionInfo(topic, partition1, node1, null, null);
+ private PartitionInfo part2 = new PartitionInfo(topic, partition2, node1, null, null);
+ private PartitionInfo part3 = new PartitionInfo(topic, partition3, node2, null, null);
private MockTime time = new MockTime();
private byte[] key = "key".getBytes();
private byte[] value = "value".getBytes();
private int msgSize = Records.LOG_OVERHEAD + Record.recordSize(key, value);
- private Cluster cluster = new Cluster(Collections.singleton(node), Arrays.asList(part1, part2));
+ private Cluster cluster = new Cluster(Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3));
private Metrics metrics = new Metrics(time);
@Test
@@ -65,8 +70,8 @@ public class RecordAccumulatorTest {
assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size());
}
accum.append(tp1, key, value, CompressionType.NONE, null);
- assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()).readyNodes);
- List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node), Integer.MAX_VALUE, 0).get(node.id());
+ assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
+ List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id());
assertEquals(1, batches.size());
RecordBatch batch = batches.get(0);
Iterator<LogEntry> iter = batch.records.iterator();
@@ -83,7 +88,7 @@ public class RecordAccumulatorTest {
int batchSize = 512;
RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, 100L, false, metrics, time);
accum.append(tp1, key, new byte[2 * batchSize], CompressionType.NONE, null);
- assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()).readyNodes);
+ assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
}
@Test
@@ -93,8 +98,8 @@ public class RecordAccumulatorTest {
accum.append(tp1, key, value, CompressionType.NONE, null);
assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
time.sleep(10);
- assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()).readyNodes);
- List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node), Integer.MAX_VALUE, 0).get(node.id());
+ assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
+ List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id());
assertEquals(1, batches.size());
RecordBatch batch = batches.get(0);
Iterator<LogEntry> iter = batch.records.iterator();
@@ -113,9 +118,9 @@ public class RecordAccumulatorTest {
for (int i = 0; i < appends; i++)
accum.append(tp, key, value, CompressionType.NONE, null);
}
- assertEquals("Partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()).readyNodes);
+ assertEquals("Partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
- List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node), 1024, 0).get(node.id());
+ List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), 1024, 0).get(node1.id());
assertEquals("But due to size bound only one partition should have been retrieved", 1, batches.size());
}
@@ -145,7 +150,7 @@ public class RecordAccumulatorTest {
long now = time.milliseconds();
while (read < numThreads * msgs) {
Set<Node> nodes = accum.ready(cluster, now).readyNodes;
- List<RecordBatch> batches = accum.drain(cluster, nodes, 5 * 1024, 0).get(node.id());
+ List<RecordBatch> batches = accum.drain(cluster, nodes, 5 * 1024, 0).get(node1.id());
if (batches != null) {
for (RecordBatch batch : batches) {
for (LogEntry entry : batch.records)
@@ -159,4 +164,39 @@ public class RecordAccumulatorTest {
t.join();
}
+
+ @Test
+ public void testNextReadyCheckDelay() throws Exception {
+ // Next check time will use lingerMs since this test won't trigger any retries/backoff
+ long lingerMs = 10L;
+ RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time);
+ // Just short of going over the limit so we trigger linger time
+ int appends = 1024 / msgSize;
+
+ // Partition on node1 only
+ for (int i = 0; i < appends; i++)
+ accum.append(tp1, key, value, CompressionType.NONE, null);
+ RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
+ assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
+ assertEquals("Next check time should be the linger time", lingerMs, result.nextReadyCheckDelayMs);
+
+ time.sleep(lingerMs / 2);
+
+ // Add partition on node2 only
+ for (int i = 0; i < appends; i++)
+ accum.append(tp3, key, value, CompressionType.NONE, null);
+ result = accum.ready(cluster, time.milliseconds());
+ assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
+ assertEquals("Next check time should be defined by node1, half remaining linger time", lingerMs / 2, result.nextReadyCheckDelayMs);
+
+ // Add data for another partition on node1, enough to make data sendable immediately
+ for (int i = 0; i < appends+1; i++)
+ accum.append(tp2, key, value, CompressionType.NONE, null);
+ result = accum.ready(cluster, time.milliseconds());
+ assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes);
+ // Note this can actually be < linger time because it may use delays from partitions that aren't sendable
+ // but have leaders with other sendable data.
+ assertTrue("Next check time should be defined by node2, at most linger time", result.nextReadyCheckDelayMs <= lingerMs);
+ }
+
}