You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/08/06 01:00:06 UTC
[09/37] git commit: KAFKA-1515 Fix a bug that could result in
blocking for a long period of time in the producer. Patch from Guozhang Wang.
KAFKA-1515 Fix a bug that could result in blocking for a long period of time in the producer. Patch from Guozhang Wang.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cd3ce27d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cd3ce27d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cd3ce27d
Branch: refs/heads/transactional_messaging
Commit: cd3ce27d4baf5434676ec040d64663ad3ce09817
Parents: 6de56b3
Author: Jay Kreps <ja...@gmail.com>
Authored: Tue Jul 8 13:16:56 2014 -0700
Committer: Jay Kreps <ja...@gmail.com>
Committed: Tue Jul 8 13:16:56 2014 -0700
----------------------------------------------------------------------
.../org/apache/kafka/clients/NetworkClient.java | 4 +-
.../kafka/clients/producer/KafkaProducer.java | 40 ++++++++++--
.../clients/producer/internals/Metadata.java | 68 +++++++++-----------
.../clients/producer/internals/Sender.java | 4 +-
.../kafka/clients/producer/MetadataTest.java | 9 ++-
5 files changed, 77 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/cd3ce27d/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index d21f922..f739279 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -309,7 +309,7 @@ public class NetworkClient implements KafkaClient {
}
// we got a disconnect so we should probably refresh our metadata and see if that broker is dead
if (this.selector.disconnected().size() > 0)
- this.metadata.forceUpdate();
+ this.metadata.requestUpdate();
}
/**
@@ -375,7 +375,7 @@ public class NetworkClient implements KafkaClient {
/* attempt failed, we'll try again after the backoff */
connectionStates.disconnected(node.id());
/* maybe the problem is our metadata, update it */
- metadata.forceUpdate();
+ metadata.requestUpdate();
log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/cd3ce27d/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index d85ca30..f58b850 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -34,6 +34,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
@@ -73,6 +74,7 @@ public class KafkaProducer implements Producer {
private final Thread ioThread;
private final CompressionType compressionType;
private final Sensor errors;
+ private final Time time;
/**
* A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
@@ -94,7 +96,7 @@ public class KafkaProducer implements Producer {
private KafkaProducer(ProducerConfig config) {
log.trace("Starting the Kafka producer");
- Time time = new SystemTime();
+ this.time = new SystemTime();
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
TimeUnit.MILLISECONDS);
@@ -119,7 +121,7 @@ public class KafkaProducer implements Producer {
metrics,
time);
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
- this.metadata.update(Cluster.bootstrap(addresses), 0);
+ this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
NetworkClient client = new NetworkClient(new Selector(this.metrics, time),
this.metadata,
@@ -225,8 +227,9 @@ public class KafkaProducer implements Producer {
@Override
public Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
try {
- Cluster cluster = metadata.fetch(record.topic(), this.metadataFetchTimeoutMs);
- int partition = partitioner.partition(record, cluster);
+ // first make sure the metadata for the topic is available
+ waitOnMetadata(record.topic(), this.metadataFetchTimeoutMs);
+ int partition = partitioner.partition(record, metadata.fetch());
int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(record.key(), record.value());
ensureValidRecordSize(serializedSize);
TopicPartition tp = new TopicPartition(record.topic(), partition);
@@ -256,6 +259,31 @@ public class KafkaProducer implements Producer {
}
/**
+ * Wait for cluster metadata including partitions for the given topic to be available.
+ * @param topic The topic we want metadata for
+ * @param maxWaitMs The maximum time in ms for waiting on the metadata
+ */
+ private void waitOnMetadata(String topic, long maxWaitMs) {
+ if (metadata.fetch().partitionsForTopic(topic) != null) {
+ return;
+ } else {
+ long begin = time.milliseconds();
+ long remainingWaitMs = maxWaitMs;
+ while (metadata.fetch().partitionsForTopic(topic) == null) {
+ log.trace("Requesting metadata update for topic {}.", topic);
+ int version = metadata.requestUpdate();
+ metadata.add(topic);
+ sender.wakeup();
+ metadata.awaitUpdate(version, remainingWaitMs);
+ long elapsed = time.milliseconds() - begin;
+ if (elapsed >= maxWaitMs)
+ throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
+ remainingWaitMs = maxWaitMs - elapsed;
+ }
+ }
+ }
+
+ /**
* Validate that the record size isn't too large
*/
private void ensureValidRecordSize(int size) {
@@ -271,8 +299,10 @@ public class KafkaProducer implements Producer {
" configuration.");
}
+ @Override
public List<PartitionInfo> partitionsFor(String topic) {
- return this.metadata.fetch(topic, this.metadataFetchTimeoutMs).partitionsForTopic(topic);
+ waitOnMetadata(topic, this.metadataFetchTimeoutMs);
+ return this.metadata.fetch().partitionsForTopic(topic);
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/cd3ce27d/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
index 8890aa2..140237f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
@@ -13,11 +13,9 @@
package org.apache.kafka.clients.producer.internals;
import java.util.HashSet;
-import java.util.List;
import java.util.Set;
import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.errors.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,9 +34,10 @@ public final class Metadata {
private final long refreshBackoffMs;
private final long metadataExpireMs;
+ private int version;
private long lastRefreshMs;
private Cluster cluster;
- private boolean forceUpdate;
+ private boolean needUpdate;
private final Set<String> topics;
/**
@@ -58,8 +57,9 @@ public final class Metadata {
this.refreshBackoffMs = refreshBackoffMs;
this.metadataExpireMs = metadataExpireMs;
this.lastRefreshMs = 0L;
+ this.version = 0;
this.cluster = Cluster.empty();
- this.forceUpdate = false;
+ this.needUpdate = false;
this.topics = new HashSet<String>();
}
@@ -71,33 +71,10 @@ public final class Metadata {
}
/**
- * Fetch cluster metadata including partitions for the given topic. If there is no metadata for the given topic,
- * block waiting for an update.
- * @param topic The topic we want metadata for
- * @param maxWaitMs The maximum amount of time to block waiting for metadata
+ * Add the topic to maintain in the metadata
*/
- public synchronized Cluster fetch(String topic, long maxWaitMs) {
- List<PartitionInfo> partitions = null;
- long begin = System.currentTimeMillis();
- long remainingWaitMs = maxWaitMs;
- do {
- partitions = cluster.partitionsForTopic(topic);
- if (partitions == null) {
- topics.add(topic);
- forceUpdate = true;
- try {
- log.trace("Requesting metadata update for topic {}.", topic);
- wait(remainingWaitMs);
- } catch (InterruptedException e) { /* this is fine, just try again */
- }
- long elapsed = System.currentTimeMillis() - begin;
- if (elapsed >= maxWaitMs)
- throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
- remainingWaitMs = maxWaitMs - elapsed;
- } else {
- return cluster;
- }
- } while (true);
+ public synchronized void add(String topic) {
+ topics.add(topic);
}
/**
@@ -106,16 +83,35 @@ public final class Metadata {
* been request then the expiry time is now
*/
public synchronized long timeToNextUpdate(long nowMs) {
- long timeToExpire = forceUpdate ? 0 : Math.max(this.lastRefreshMs + this.metadataExpireMs - nowMs, 0);
+ long timeToExpire = needUpdate ? 0 : Math.max(this.lastRefreshMs + this.metadataExpireMs - nowMs, 0);
long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs;
return Math.max(timeToExpire, timeToAllowUpdate);
}
/**
- * Force an update of the current cluster info
+ * Request an update of the current cluster metadata info, return the current version before the update
*/
- public synchronized void forceUpdate() {
- this.forceUpdate = true;
+ public synchronized int requestUpdate() {
+ this.needUpdate = true;
+ return this.version;
+ }
+
+ /**
+ * Wait for metadata update until the current version is larger than the last version we know of
+ */
+ public synchronized void awaitUpdate(int lastVerison, long maxWaitMs) {
+ long begin = System.currentTimeMillis();
+ long remainingWaitMs = maxWaitMs;
+ while (this.version <= lastVerison) {
+ try {
+ wait(remainingWaitMs);
+ } catch (InterruptedException e) { /* this is fine */
+ }
+ long elapsed = System.currentTimeMillis() - begin;
+ if (elapsed >= maxWaitMs)
+ throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
+ remainingWaitMs = maxWaitMs - elapsed;
+ }
}
/**
@@ -129,8 +125,9 @@ public final class Metadata {
* Update the cluster metadata
*/
public synchronized void update(Cluster cluster, long now) {
- this.forceUpdate = false;
+ this.needUpdate = false;
this.lastRefreshMs = now;
+ this.version += 1;
this.cluster = cluster;
notifyAll();
log.debug("Updated cluster metadata to {}", cluster);
@@ -142,5 +139,4 @@ public final class Metadata {
public synchronized long lastUpdate() {
return this.lastRefreshMs;
}
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/cd3ce27d/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 37b9d1a..a016269 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -147,7 +147,7 @@ public class Sender implements Runnable {
// if there are any partitions whose leaders are not known yet, force metadata update
if (result.unknownLeadersExist)
- this.metadata.forceUpdate();
+ this.metadata.requestUpdate();
// remove any nodes we aren't ready to send to
Iterator<Node> iter = result.readyNodes.iterator();
@@ -252,7 +252,7 @@ public class Sender implements Runnable {
this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
}
if (error.exception() instanceof InvalidMetadataException)
- metadata.forceUpdate();
+ metadata.requestUpdate();
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/cd3ce27d/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
index 0d7d04c..543304c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
@@ -31,7 +31,7 @@ public class MetadataTest {
long time = 0;
metadata.update(Cluster.empty(), time);
assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
- metadata.forceUpdate();
+ metadata.requestUpdate();
assertFalse("Still no updated needed due to backoff", metadata.timeToNextUpdate(time) == 0);
time += refreshBackoffMs;
assertTrue("Update needed now that backoff time expired", metadata.timeToNextUpdate(time) == 0);
@@ -40,7 +40,9 @@ public class MetadataTest {
Thread t2 = asyncFetch(topic);
assertTrue("Awaiting update", t1.isAlive());
assertTrue("Awaiting update", t2.isAlive());
- metadata.update(TestUtils.singletonCluster(topic, 1), time);
+ // keep updating the metadata until no need to
+ while (metadata.timeToNextUpdate(time) == 0)
+ metadata.update(TestUtils.singletonCluster(topic, 1), time);
t1.join();
t2.join();
assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
@@ -51,7 +53,8 @@ public class MetadataTest {
private Thread asyncFetch(final String topic) {
Thread thread = new Thread() {
public void run() {
- metadata.fetch(topic, Integer.MAX_VALUE);
+ while (metadata.fetch().partitionsForTopic(topic) == null)
+ metadata.awaitUpdate(metadata.requestUpdate(), Long.MAX_VALUE);
}
};
thread.start();