You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Igor Khomenko (JIRA)" <ji...@apache.org> on 2015/07/29 13:53:04 UTC
[jira] [Commented] (KAFKA-1913) App hungs when calls producer.send
to wrong IP of Kafka broker
[ https://issues.apache.org/jira/browse/KAFKA-1913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14645918#comment-14645918 ]
Igor Khomenko commented on KAFKA-1913:
--------------------------------------
For now I have to use the following utils to check the url
{code}
public static boolean isReachable(String hostname, int port, int timeout) {
if(hostname == null) {
return false;
} else {
if(log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "Checking host: " + hostname + ", port: " + port + ", timeout: " + timeout);
}
InetSocketAddress sockaddr = new InetSocketAddress(hostname, port);
Socket socket = new Socket();
boolean online = true;
try {
socket.connect(sockaddr, timeout);
if(log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "Host \'" + hostname + "\' is ON");
}
} catch (IOException var15) {
online = false;
if(log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "Host \'" + hostname + "\' is not reachable");
}
} finally {
try {
socket.close();
} catch (IOException var14) {
;
}
}
return online;
}
}
{code}
> App hungs when calls producer.send to wrong IP of Kafka broker
> --------------------------------------------------------------
>
> Key: KAFKA-1913
> URL: https://issues.apache.org/jira/browse/KAFKA-1913
> Project: Kafka
> Issue Type: Bug
> Components: producer
> Affects Versions: 0.8.1.1
> Environment: OS X 10.10.1, Java 7, AWS Linux
> Reporter: Igor Khomenko
> Assignee: Jun Rao
> Fix For: 0.8.3
>
>
> I have next test code to check the Kafka functionality:
> {code}
> package com.company;
> import kafka.common.FailedToSendMessageException;
> import kafka.javaapi.producer.Producer;
> import kafka.producer.KeyedMessage;
> import kafka.producer.ProducerConfig;
> import java.util.Date;
> import java.util.Properties;
> public class Main {
> public static void main(String[] args) {
> Properties props = new Properties();
> props.put("metadata.broker.list", "192.168.9.3:9092");
> props.put("serializer.class", "com.company.KafkaMessageSerializer");
> props.put("request.required.acks", "1");
> ProducerConfig config = new ProducerConfig(props);
> // The first is the type of the Partition key, the second the type of the message.
> Producer<String, String> messagesProducer = new Producer<String, String>(config);
> // Send
> String topicName = "my_messages";
> String message = "hello world";
> KeyedMessage<String, String> data = new KeyedMessage<String, String>(topicName, message);
> try {
> System.out.println(new Date() + ": sending...");
> messagesProducer.send(data);
> System.out.println(new Date() + ": sent");
> }catch (FailedToSendMessageException e){
> System.out.println("e: " + e);
> e.printStackTrace();
> }catch (Exception exc){
> System.out.println("e: " + exc);
> exc.printStackTrace();
> }
> }
> }
> {code}
> {code}
> package com.company;
> import kafka.serializer.Encoder;
> import kafka.utils.VerifiableProperties;
> /**
> * Created by igorkhomenko on 2/2/15.
> */
> public class KafkaMessageSerializer implements Encoder<String> {
> public KafkaMessageSerializer(VerifiableProperties verifiableProperties) {
> /* This constructor must be present for successful compile. */
> }
> @Override
> public byte[] toBytes(String entity) {
> byte [] serializedMessage = doCustomSerialization(entity);
> return serializedMessage;
> }
> private byte[] doCustomSerialization(String entity) {
> return entity.getBytes();
> }
> }
> {code}
> Here is also GitHub version https://github.com/soulfly/Kafka-java-producer
> So it just hungs on next line:
> {code}
> messagesProducer.send(data)
> {code}
> When I replaced the brokerlist to
> {code}
> props.put("metadata.broker.list", "localhost:9092");
> {code}
> then I got an exception:
> {code}
> kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
> {code}
> so it's okay
> Why it hungs with wrong brokerlist? Any ideas?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)