You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2014/07/11 22:09:52 UTC
git commit: KAFKA-1515 Producer can hang during metadata updates.
Patch by Guozhang.
Repository: kafka
Updated Branches:
refs/heads/trunk 8034390ef -> 83a9aa55d
KAFKA-1515 Producer can hang during metadata updates. Patch by Guozhang.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/83a9aa55
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/83a9aa55
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/83a9aa55
Branch: refs/heads/trunk
Commit: 83a9aa55d340f6b2720394a49f01a88509e17e52
Parents: 8034390
Author: Jay Kreps <ja...@gmail.com>
Authored: Fri Jul 11 13:08:24 2014 -0700
Committer: Jay Kreps <ja...@gmail.com>
Committed: Fri Jul 11 13:08:24 2014 -0700
----------------------------------------------------------------------
.../org/apache/kafka/clients/NetworkClient.java | 33 +++++++++++++-------
.../clients/producer/internals/Metadata.java | 2 +-
.../main/scala/kafka/tools/MirrorMaker.scala | 31 +++++++++---------
3 files changed, 37 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/83a9aa55/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index f739279..d8f9ce6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -122,16 +122,21 @@ public class NetworkClient implements KafkaClient {
*/
@Override
public boolean isReady(Node node, long now) {
- return isReady(node.id(), now);
- }
-
- private boolean isReady(int node, long now) {
+ int nodeId = node.id();
if (!this.metadataFetchInProgress && this.metadata.timeToNextUpdate(now) == 0)
// if we need to update our metadata now declare all requests unready to make metadata requests first priority
return false;
else
// otherwise we are ready if we are connected and can send more requests
- return connectionStates.isConnected(node) && inFlightRequests.canSendMore(node);
+ return isSendable(nodeId);
+ }
+
+ /**
+ * Are we connected and ready and able to send more requests to the given node?
+ * @param node The node
+ */
+ private boolean isSendable(int node) {
+ return connectionStates.isConnected(node) && inFlightRequests.canSendMore(node);
}
/**
@@ -146,21 +151,21 @@ public class NetworkClient implements KafkaClient {
public List<ClientResponse> poll(List<ClientRequest> requests, long timeout, long now) {
List<NetworkSend> sends = new ArrayList<NetworkSend>();
- // should we update our metadata?
- long metadataTimeout = metadata.timeToNextUpdate(now);
- if (!this.metadataFetchInProgress && metadataTimeout == 0)
- maybeUpdateMetadata(sends, now);
-
for (int i = 0; i < requests.size(); i++) {
ClientRequest request = requests.get(i);
int nodeId = request.request().destination();
- if (!isReady(nodeId, now))
+ if (!isSendable(nodeId))
throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
this.inFlightRequests.add(request);
sends.add(request.request());
}
+ // should we update our metadata?
+ long metadataTimeout = metadata.timeToNextUpdate(now);
+ if (!this.metadataFetchInProgress && metadataTimeout == 0)
+ maybeUpdateMetadata(sends, now);
+
// do the I/O
try {
this.selector.poll(Math.min(timeout, metadataTimeout), sends);
@@ -347,9 +352,12 @@ public class NetworkClient implements KafkaClient {
*/
private void maybeUpdateMetadata(List<NetworkSend> sends, long now) {
Node node = this.leastLoadedNode(now);
- if (node == null)
+ if (node == null) {
+ log.debug("Give up sending metadata request since no node is available");
return;
+ }
+ log.debug("Trying to send metadata request to node {}", node.id());
if (connectionStates.isConnected(node.id()) && inFlightRequests.canSendMore(node.id())) {
Set<String> topics = metadata.topics();
this.metadataFetchInProgress = true;
@@ -359,6 +367,7 @@ public class NetworkClient implements KafkaClient {
this.inFlightRequests.add(metadataRequest);
} else if (connectionStates.canConnect(node.id(), now)) {
// we don't have a connection to this node right now, make one
+ log.debug("Give up sending metadata request to node {} since it is either not connected or cannot have more in flight requests", node.id());
initiateConnect(node, now);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/83a9aa55/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
index 140237f..4aa5b01 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
@@ -130,7 +130,7 @@ public final class Metadata {
this.version += 1;
this.cluster = cluster;
notifyAll();
- log.debug("Updated cluster metadata to {}", cluster);
+ log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/83a9aa55/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 4f06e34..555d751 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -116,19 +116,6 @@ object MirrorMaker extends Logging {
val numStreams = options.valueOf(numStreamsOpt).intValue()
val bufferSize = options.valueOf(bufferSizeOpt).intValue()
- val useNewProducer = options.has(useNewProducerOpt)
- val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt))
-
- // create producer threads
- val clientId = producerProps.getProperty("client.id", "")
- val producers = (1 to numProducers).map(i => {
- producerProps.setProperty("client.id", clientId + "-" + i)
- if (useNewProducer)
- new NewShinyProducer(producerProps)
- else
- new OldProducer(producerProps)
- })
-
// create consumer streams
connectors = options.valuesOf(consumerConfigOpt).toList
.map(cfg => new ConsumerConfig(Utils.loadProps(cfg)))
@@ -138,8 +125,21 @@ object MirrorMaker extends Logging {
// create a data channel btw the consumers and the producers
val mirrorDataChannel = new DataChannel(bufferSize, numConsumers, numProducers)
- producerThreads = producers.zipWithIndex.map(producerAndIndex => new ProducerThread(mirrorDataChannel, producerAndIndex._1, producerAndIndex._2))
+ // create producer threads
+ val useNewProducer = options.has(useNewProducerOpt)
+ val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt))
+ val clientId = producerProps.getProperty("client.id", "")
+ producerThreads = (0 until numProducers).map(i => {
+ producerProps.setProperty("client.id", clientId + "-" + i)
+ val producer =
+ if (useNewProducer)
+ new NewShinyProducer(producerProps)
+ else
+ new OldProducer(producerProps)
+ new ProducerThread(mirrorDataChannel, producer, i)
+ })
+ // create consumer threads
val filterSpec = if (options.has(whitelistOpt))
new Whitelist(options.valueOf(whitelistOpt))
else
@@ -153,7 +153,7 @@ object MirrorMaker extends Logging {
fatal("Unable to create stream - shutting down mirror maker.")
connectors.foreach(_.shutdown)
}
- consumerThreads = streams.zipWithIndex.map(streamAndIndex => new ConsumerThread(streamAndIndex._1, mirrorDataChannel, producers, streamAndIndex._2))
+ consumerThreads = streams.zipWithIndex.map(streamAndIndex => new ConsumerThread(streamAndIndex._1, mirrorDataChannel, streamAndIndex._2))
assert(consumerThreads.size == numConsumers)
Runtime.getRuntime.addShutdownHook(new Thread() {
@@ -233,7 +233,6 @@ object MirrorMaker extends Logging {
class ConsumerThread(stream: KafkaStream[Array[Byte], Array[Byte]],
mirrorDataChannel: DataChannel,
- producers: Seq[BaseProducer],
threadId: Int)
extends Thread with Logging with KafkaMetricsGroup {