You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by jingguo yao <ya...@gmail.com> on 2018/07/18 14:45:07 UTC

The asynchronous sending of a message returns no error if the Kafka server is not started

The asynchronous sending of a message returns no error even if the
Kafka server is not started.

For all the following tests, the local Kafka server is stopped. First,
consider this piece of code:

public static void main(String[] args) throws Exception {
  Properties config = new Properties();
  config.put("client.id", InetAddress.getLocalHost().getHostName());
  config.put("bootstrap.servers", "localhost:9092");
  config.put("acks", "all");
  config.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
  config.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");

  try (Producer<String, String> producer = new KafkaProducer<>(config);) {
    ProducerRecord<String, String> record = new
ProducerRecord<>("test-topic", null, "a-little-message");
    producer.send(record, new Callback() {
      @Override
      public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception != null) {
          System.out.println("Exceptoin occurred!");
          exception.printStackTrace(System.out);
        }
      }
    });
  }
}

Running it will produce the following error:

Exception occurred!
org.apache.kafka.common.errors.TimeoutException: Failed to update
metadata after 60000 ms.

Second, consider this piece of code:

public static void main(String[] args) throws Exception {
  Properties config = new Properties();
  config.put("client.id", InetAddress.getLocalHost().getHostName());
  config.put("bootstrap.servers", "localhost:9092");
  config.put("acks", "all");
  config.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
  config.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");

  try (Producer<String, String> producer = new KafkaProducer<>(config);) {
    ProducerRecord<String, String> record = new
ProducerRecord<>("test-topic", null, "a-little-message");
    System.out.println("Sending a message...");
    producer.send(record).get();
    System.out.println("Message sent");
  }
}

Running it will produce the following error:

Sending a message...
Exception in thread "main" java.util.concurrent.ExecutionException:
org.apache.kafka.common.errors.TimeoutException: Failed to update
metadata after 60000 ms.
at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1168)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:859)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:797)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:684)
at com.xdf.foreign.KafkaTest.main(KafkaTest.java:46)
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to
update metadata after 60000 ms.

Third, consider this piece of code:

public static void main(String[] args) throws Exception {
  Properties config = new Properties();
  config.put("client.id", InetAddress.getLocalHost().getHostName());
  config.put("bootstrap.servers", "localhost:9092");
  config.put("acks", "all");
  config.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
  config.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");

  try (Producer<String, String> producer = new KafkaProducer<>(config);) {
    ProducerRecord<String, String> record = new
ProducerRecord<>("test-topic", null, "a-little-message");
    System.out.println("Sending a message...");
    producer.send(record);
    System.out.println("Message sent");
  }
}

Running it will produce no error. The following output will be
produced:

Sending a message...
Message sent

I know that the nature of asynchronous sending demands that send
method ignore connection error to the Kafka server. But I think that
it is better to document this kind of behaviour somewhere.

-- 
Jingguo

Re: The asynchronous sending of a message returns no error if the Kafka server is not started

Posted by Hans Jespersen <ha...@confluent.io>.
That is expected behavior. Typically there are multiple kafka brokers and so if one is down the client retries to send to a newly elected leader.

A send should not be considered successful until an ACK is received in the client from the     kafka cluster.

By default the ACK is async for performance but the send() teturns a future so you can make it appear to be a synchrounous publish easily. Examples are in the javadoc.

-hans 

> On Jul 18, 2018, at 7:45 AM, jingguo yao <ya...@gmail.com> wrote:
> 
> The asynchronous sending of a message returns no error even if the
> Kafka server is not started.
> 
> For all the following tests, the local Kafka server is stopped. First,
> consider this piece of code:
> 
> public static void main(String[] args) throws Exception {
>  Properties config = new Properties();
>  config.put("client.id", InetAddress.getLocalHost().getHostName());
>  config.put("bootstrap.servers", "localhost:9092");
>  config.put("acks", "all");
>  config.put("key.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
>  config.put("value.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
> 
>  try (Producer<String, String> producer = new KafkaProducer<>(config);) {
>    ProducerRecord<String, String> record = new
> ProducerRecord<>("test-topic", null, "a-little-message");
>    producer.send(record, new Callback() {
>      @Override
>      public void onCompletion(RecordMetadata metadata, Exception exception) {
>        if (exception != null) {
>          System.out.println("Exceptoin occurred!");
>          exception.printStackTrace(System.out);
>        }
>      }
>    });
>  }
> }
> 
> Running it will produce the following error:
> 
> Exception occurred!
> org.apache.kafka.common.errors.TimeoutException: Failed to update
> metadata after 60000 ms.
> 
> Second, consider this piece of code:
> 
> public static void main(String[] args) throws Exception {
>  Properties config = new Properties();
>  config.put("client.id", InetAddress.getLocalHost().getHostName());
>  config.put("bootstrap.servers", "localhost:9092");
>  config.put("acks", "all");
>  config.put("key.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
>  config.put("value.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
> 
>  try (Producer<String, String> producer = new KafkaProducer<>(config);) {
>    ProducerRecord<String, String> record = new
> ProducerRecord<>("test-topic", null, "a-little-message");
>    System.out.println("Sending a message...");
>    producer.send(record).get();
>    System.out.println("Message sent");
>  }
> }
> 
> Running it will produce the following error:
> 
> Sending a message...
> Exception in thread "main" java.util.concurrent.ExecutionException:
> org.apache.kafka.common.errors.TimeoutException: Failed to update
> metadata after 60000 ms.
> at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1168)
> at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:859)
> at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:797)
> at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:684)
> at com.xdf.foreign.KafkaTest.main(KafkaTest.java:46)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to
> update metadata after 60000 ms.
> 
> Third, consider this piece of code:
> 
> public static void main(String[] args) throws Exception {
>  Properties config = new Properties();
>  config.put("client.id", InetAddress.getLocalHost().getHostName());
>  config.put("bootstrap.servers", "localhost:9092");
>  config.put("acks", "all");
>  config.put("key.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
>  config.put("value.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
> 
>  try (Producer<String, String> producer = new KafkaProducer<>(config);) {
>    ProducerRecord<String, String> record = new
> ProducerRecord<>("test-topic", null, "a-little-message");
>    System.out.println("Sending a message...");
>    producer.send(record);
>    System.out.println("Message sent");
>  }
> }
> 
> Running it will produce no error. The following output will be
> produced:
> 
> Sending a message...
> Message sent
> 
> I know that the nature of asynchronous sending demands that send
> method ignore connection error to the Kafka server. But I think that
> it is better to document this kind of behaviour somewhere.
> 
> -- 
> Jingguo