You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Igor Khomenko (JIRA)" <ji...@apache.org> on 2015/07/29 13:53:04 UTC

[jira] [Commented] (KAFKA-1913) App hungs when calls producer.send to wrong IP of Kafka broker

    [ https://issues.apache.org/jira/browse/KAFKA-1913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14645918#comment-14645918 ] 

Igor Khomenko commented on KAFKA-1913:
--------------------------------------

For now I have to use the following utils to check the url 

{code}
    public static boolean isReachable(String hostname, int port, int timeout) {
        if(hostname == null) {
            return false;
        } else {
            if(log.isLoggable(Level.FINE)) {
                log.log(Level.FINE, "Checking host: " + hostname + ", port: " + port + ", timeout: " + timeout);
            }

            InetSocketAddress sockaddr = new InetSocketAddress(hostname, port);
            Socket socket = new Socket();
            boolean online = true;

            try {
                socket.connect(sockaddr, timeout);
                if(log.isLoggable(Level.FINE)) {
                    log.log(Level.FINE, "Host \'" + hostname + "\' is ON");
                }
            } catch (IOException var15) {
                online = false;
                if(log.isLoggable(Level.FINE)) {
                    log.log(Level.FINE, "Host \'" + hostname + "\' is not reachable");
                }
            } finally {
                try {
                    socket.close();
                } catch (IOException var14) {
                    ;
                }

            }

            return online;
        }
    }
{code} 

> App hungs when calls producer.send to wrong IP of Kafka broker
> --------------------------------------------------------------
>
>                 Key: KAFKA-1913
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1913
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 0.8.1.1
>         Environment: OS X 10.10.1, Java 7, AWS Linux
>            Reporter: Igor Khomenko
>            Assignee: Jun Rao
>             Fix For: 0.8.3
>
>
> I have next test code to check the Kafka functionality:
> {code}
> package com.company;
> import kafka.common.FailedToSendMessageException;
> import kafka.javaapi.producer.Producer;
> import kafka.producer.KeyedMessage;
> import kafka.producer.ProducerConfig;
> import java.util.Date;
> import java.util.Properties;
> public class Main {
>     public static void main(String[] args) {
>         Properties props = new Properties();
>         props.put("metadata.broker.list", "192.168.9.3:9092");
>         props.put("serializer.class", "com.company.KafkaMessageSerializer");
>         props.put("request.required.acks", "1");
>         ProducerConfig config = new ProducerConfig(props);
>         // The first is the type of the Partition key, the second the type of the message.
>         Producer<String, String> messagesProducer = new Producer<String, String>(config);
>         // Send
>         String topicName = "my_messages";
>         String message = "hello world";
>         KeyedMessage<String, String> data = new KeyedMessage<String, String>(topicName, message);
>         try {
>             System.out.println(new Date() + ": sending...");
>             messagesProducer.send(data);
>             System.out.println(new Date() +  ": sent");
>         }catch (FailedToSendMessageException e){
>             System.out.println("e: " + e);
>             e.printStackTrace();
>         }catch (Exception exc){
>             System.out.println("e: " + exc);
>             exc.printStackTrace();
>         }
>     }
> }
> {code}
> {code}
> package com.company;
> import kafka.serializer.Encoder;
> import kafka.utils.VerifiableProperties;
> /**
>  * Created by igorkhomenko on 2/2/15.
>  */
> public class KafkaMessageSerializer implements Encoder<String> {
>     public KafkaMessageSerializer(VerifiableProperties verifiableProperties) {
>         /* This constructor must be present for successful compile. */
>     }
>     @Override
>     public byte[] toBytes(String entity) {
>         byte [] serializedMessage = doCustomSerialization(entity);
>         return serializedMessage;
>     }
>     private byte[] doCustomSerialization(String entity) {
>         return entity.getBytes();
>     }
> }
> {code}
> Here is also GitHub version https://github.com/soulfly/Kafka-java-producer
> So it just hungs on next line:
> {code}
> messagesProducer.send(data)
> {code}
> When I replaced the brokerlist to
> {code}
> props.put("metadata.broker.list", "localhost:9092");
> {code}
> then I got an exception:
> {code}
> kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
> {code}
> so it's okay
> Why it hungs with wrong brokerlist? Any ideas?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)