You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Prasad Dls <pr...@gmail.com> on 2016/12/14 05:24:15 UTC

Consumer/Publisher code is not throwing any exception if my Kafka broker is not running

Hi,

I am new to Kafka, could you please let me know why my application is not
getting any exception like *refuse to connect so and so host, *if the
broker is down/unable to connect

*Here is my producer code*

private static Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<String, Object>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "ProducerTest_client");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"org.apache.kafka.clients.producer.internals.DefaultPartitioner");
return props;
}

Producer<String, String> producer = new KafkaProducer<>(producerConfigs());
producer.send(new ProducerRecord<String, String>("TEST.TOPIC", "TESTKEY"
"TEST VALUE"))));

*Here is my consumer code*

private Properties configProperties() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, <>);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, <>);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, <>);
props.put(ConsumerConfig.GROUP_ID_CONFIG, <>);
props.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, <>);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, <>);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,<>);
return props;
}


*KafkaConsumer<String, String> consumer = new KafkaConsumer<String,
String>(props);*

*consumer.subscribe(Arrays.asList(getTopics().split(",")));*

*while (true) {*

*ConsumerRecords<String, String> records = consumer.poll(100); *
*}*

I need to write some logic in exception block, please suggest with solution

Thanks in Advance
Prasad