You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Igor Khomenko (JIRA)" <ji...@apache.org> on 2015/02/04 09:51:34 UTC
[jira] [Updated] (KAFKA-1913) App hungs when calls producer.send to
wrong IP of Kafka broker
[ https://issues.apache.org/jira/browse/KAFKA-1913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Igor Khomenko updated KAFKA-1913:
---------------------------------
Environment: OS X 10.10.1, Java 7, AWS Linux (was: OS X 10.10.1, Java 7)
> App hungs when calls producer.send to wrong IP of Kafka broker
> --------------------------------------------------------------
>
> Key: KAFKA-1913
> URL: https://issues.apache.org/jira/browse/KAFKA-1913
> Project: Kafka
> Issue Type: Bug
> Components: producer
> Affects Versions: 0.8.1.1
> Environment: OS X 10.10.1, Java 7, AWS Linux
> Reporter: Igor Khomenko
> Assignee: Jun Rao
> Fix For: 0.8.1.2
>
>
> I have next test code to check the Kafka functionality:
> {code}
> package com.company;
> import kafka.common.FailedToSendMessageException;
> import kafka.javaapi.producer.Producer;
> import kafka.producer.KeyedMessage;
> import kafka.producer.ProducerConfig;
> import java.util.Date;
> import java.util.Properties;
> public class Main {
> public static void main(String[] args) {
> Properties props = new Properties();
> props.put("metadata.broker.list", "192.168.9.3:9092");
> props.put("serializer.class", "com.company.KafkaMessageSerializer");
> props.put("request.required.acks", "1");
> ProducerConfig config = new ProducerConfig(props);
> // The first is the type of the Partition key, the second the type of the message.
> Producer<String, String> messagesProducer = new Producer<String, String>(config);
> // Send
> String topicName = "my_messages";
> String message = "hello world";
> KeyedMessage<String, String> data = new KeyedMessage<String, String>(topicName, message);
> try {
> System.out.println(new Date() + ": sending...");
> messagesProducer.send(data);
> System.out.println(new Date() + ": sent");
> }catch (FailedToSendMessageException e){
> System.out.println("e: " + e);
> e.printStackTrace();
> }catch (Exception exc){
> System.out.println("e: " + exc);
> exc.printStackTrace();
> }
> }
> }
> {code}
> {code}
> package com.company;
> import kafka.serializer.Encoder;
> import kafka.utils.VerifiableProperties;
> /**
> * Created by igorkhomenko on 2/2/15.
> */
> public class KafkaMessageSerializer implements Encoder<String> {
> public KafkaMessageSerializer(VerifiableProperties verifiableProperties) {
> /* This constructor must be present for successful compile. */
> }
> @Override
> public byte[] toBytes(String entity) {
> byte [] serializedMessage = doCustomSerialization(entity);
> return serializedMessage;
> }
> private byte[] doCustomSerialization(String entity) {
> return entity.getBytes();
> }
> }
> {code}
> Here is also GitHub version https://github.com/soulfly/Kafka-java-producer
> So it just hungs on next line:
> {code}
> messagesProducer.send(data)
> {code}
> When I replaced the brokerlist to
> {code}
> props.put("metadata.broker.list", "localhost:9092");
> {code}
> then I got an exception:
> {code}
> kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
> {code}
> so it's okay
> Why it hungs with wrong brokerlist? Any ideas?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)