You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Shashank Sah <sh...@urbanclap.com.INVALID> on 2018/10/24 05:04:15 UTC

Kafka-node: Kafka client keeps sending request to broker which went down

When a broker host goes down/restarts, the kafka client keeps sending the
new requests to the same hosts address. As a result, the requests are
failing with this error: Request timed out after 30000ms.

Node version: v6.8.1
Kafka-node version: 3.0.1
Kafka version: 2.11-2.0.0
Number of Brokers: 3
Number partitions for topic: 10

Some code pointers:

    "clusterConfig" : {
      "kafkaHost": "localhost:9092,localhost:9093,localhost:9094",
      "autoConnect": true
    }
    ...
    let kafkaClient = new kafka.KafkaClient(clusterConfig);
    producer = new kafka.HighLevelProducer(kafkaClient,
cluster.producerConfig);
    Promise.promisifyAll(producer);
    ...
    producer.sendAsync([eventPayload])
      .then(function (data) {
        let topicName = eventPayload.topic;
        let payLoadSize = (eventPayload || '').length;
        logger.eventInfo(topicName, payLoadSize, source);
      })
      .catch(function (e) {
        logger.produceFailedEvent(eventPayload, source);
        throw Error.getErrorObject(errorType, e, topic, source);
      });

I have the kept the other configurations to default.
It seems there is some issue with kafka-node library, given below are the
logs with the corresponding hosts they are going to connect. The first call
was successful, second failed and third was successful. In the below case
localhost:9092 was down.

kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +1s
kafka-node:KafkaClient compressing messages if needed +76ms
shashanksah actual broker!!!!![BrokerWrapper localhost:9094 (connected:
true) (ready: true) (idle: false) (needAuthentication: false)
(authenticated: false)]
kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092
+928ms
kafka-node:KafkaClient compressing messages if needed +422ms
shashanksah actual broker!!!!![BrokerWrapper localhost:9092 (connected:
false) (ready: false) (idle: false) (needAuthentication: false)
(authenticated: false)]
kafka-node:KafkaClient missing apiSupport waiting until broker is ready...
+1ms
kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092
+583ms
kafka-node:KafkaClient compressing messages if needed +280ms
shashanksah actual broker!!!!![BrokerWrapper localhost:9093 (connected:
true) (ready: true) (idle: false) (needAuthentication: false)
(authenticated: false)]
kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092
+723ms

The issue is that it gets the ip of host which is down and then waits on
that host to be ready (for 30secs) and then fails the sent request. If we
change the function ensureBrokerReady in this file:
https://github.com/SOHU-Co/kafka-node/blob/master/lib/kafkaClient.js#L1016
to something like given below, then this issue does not occurs:

  const ensureBrokerReady = async.ensureAsync((leader, callback) => {
    let broker = this.brokerForLeader(leader, longpolling);
    console.log("shashanksah actual broker!!!!!" + broker);
    if (!broker.isReady()) {
      this.refreshBrokerMetadata();
      broker = this.brokerForLeader(leader, longpolling);
      //console.log("shashanksah broker not ready so refresh and retry!!!!!");
    }
    if (!broker.isReady()) {
      //console.log("shashanksah !broker.isReady");
      logger.debug('missing apiSupport waiting until broker is ready...');
      this.waitUntilReady(broker, callback);
    } else {
      callback(null);
    }
  });

Please tell if I am missing anything or the RCA is correct.

Thanks,

Shashank

Re: Kafka-node: Kafka client keeps sending request to broker which went down

Posted by Colin McCabe <cm...@apache.org>.
Hi Shashank,

kafka-node is not developed by Apache or the Apache Kafka project.  I don't think anyone here has looked at that code.

I have heard reports from the field that kafka-node has some very serious bugs, such as not retrying failed requests at all in some circumstances.  I do not recommend using this client.

https://github.com/Blizzard/node-rdkafka is a better choice.  Keep in mind that node-rdkafka is still not part of the Apache Kafka project, so you might want to discuss any issues with that particular project.

best,
Colin


On Tue, Oct 23, 2018, at 22:04, Shashank Sah wrote:
> When a broker host goes down/restarts, the kafka client keeps sending the
> new requests to the same hosts address. As a result, the requests are
> failing with this error: Request timed out after 30000ms.
> 
> Node version: v6.8.1
> Kafka-node version: 3.0.1
> Kafka version: 2.11-2.0.0
> Number of Brokers: 3
> Number partitions for topic: 10
> 
> Some code pointers:
> 
>     "clusterConfig" : {
>       "kafkaHost": "localhost:9092,localhost:9093,localhost:9094",
>       "autoConnect": true
>     }
>     ...
>     let kafkaClient = new kafka.KafkaClient(clusterConfig);
>     producer = new kafka.HighLevelProducer(kafkaClient,
> cluster.producerConfig);
>     Promise.promisifyAll(producer);
>     ...
>     producer.sendAsync([eventPayload])
>       .then(function (data) {
>         let topicName = eventPayload.topic;
>         let payLoadSize = (eventPayload || '').length;
>         logger.eventInfo(topicName, payLoadSize, source);
>       })
>       .catch(function (e) {
>         logger.produceFailedEvent(eventPayload, source);
>         throw Error.getErrorObject(errorType, e, topic, source);
>       });
> 
> I have the kept the other configurations to default.
> It seems there is some issue with kafka-node library, given below are the
> logs with the corresponding hosts they are going to connect. The first call
> was successful, second failed and third was successful. In the below case
> localhost:9092 was down.
> 
> kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +1s
> kafka-node:KafkaClient compressing messages if needed +76ms
> shashanksah actual broker!!!!![BrokerWrapper localhost:9094 (connected:
> true) (ready: true) (idle: false) (needAuthentication: false)
> (authenticated: false)]
> kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092
> +928ms
> kafka-node:KafkaClient compressing messages if needed +422ms
> shashanksah actual broker!!!!![BrokerWrapper localhost:9092 (connected:
> false) (ready: false) (idle: false) (needAuthentication: false)
> (authenticated: false)]
> kafka-node:KafkaClient missing apiSupport waiting until broker is ready...
> +1ms
> kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092
> +583ms
> kafka-node:KafkaClient compressing messages if needed +280ms
> shashanksah actual broker!!!!![BrokerWrapper localhost:9093 (connected:
> true) (ready: true) (idle: false) (needAuthentication: false)
> (authenticated: false)]
> kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092
> +723ms
> 
> The issue is that it gets the ip of host which is down and then waits on
> that host to be ready (for 30secs) and then fails the sent request. If we
> change the function ensureBrokerReady in this file:
> https://github.com/SOHU-Co/kafka-node/blob/master/lib/kafkaClient.js#L1016
> to something like given below, then this issue does not occurs:
> 
>   const ensureBrokerReady = async.ensureAsync((leader, callback) => {
>     let broker = this.brokerForLeader(leader, longpolling);
>     console.log("shashanksah actual broker!!!!!" + broker);
>     if (!broker.isReady()) {
>       this.refreshBrokerMetadata();
>       broker = this.brokerForLeader(leader, longpolling);
>       //console.log("shashanksah broker not ready so refresh and retry!!!!!");
>     }
>     if (!broker.isReady()) {
>       //console.log("shashanksah !broker.isReady");
>       logger.debug('missing apiSupport waiting until broker is ready...');
>       this.waitUntilReady(broker, callback);
>     } else {
>       callback(null);
>     }
>   });
> 
> Please tell if I am missing anything or the RCA is correct.
> 
> Thanks,
> 
> Shashank