You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Ismael Juma (JIRA)" <ji...@apache.org> on 2017/10/28 15:41:00 UTC

[jira] [Resolved] (KAFKA-4832) kafka producer send Async message to the wrong IP cannot to stop producer.close()

     [ https://issues.apache.org/jira/browse/KAFKA-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ismael Juma resolved KAFKA-4832.
--------------------------------
    Resolution: Auto Closed

The Scala producers have been deprecated for a while and no further work is planned. Please upgrade to the Java producer whenever possible.

> kafka producer send Async message to the wrong IP cannot to stop producer.close()
> ---------------------------------------------------------------------------------
>
>                 Key: KAFKA-4832
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4832
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 0.8.2.2
>         Environment: JDK8 Eclipse Mars Win7
>            Reporter: Wang Hong
>
> 1.When I tried to send msg by Async with wrong IP in loop 1400times 10batches.
> 2.I use javaapi.kafkaproducer designed by Factory.
> 3.1 of 10 batches I take Producer.Connected() and Producer.Closed().
> 4.I know I send msg to a wrong IP finally, But I noticed the terminal was blocking. It can't close normally.
> function just like that :
> 	public static void go(int s) throws Exception {
>                 KafkaService kf = new KafkaServiceImpl();//init properties
> 		for (int i = 0; i < 1400; i++) {
> 			String msg = "a b 0a b c d e 0a b c d e 0" + s + "--" + i;
> 			System.out.println(msg);
> 			kf.push(msg); //producer.send()
> 		}
>                 kf.closeProducerFactory();//producer.closed()
> 		System.out.println(s);
> 		Thread.sleep(1000);
> 	}
> kf.closeProducerFactory() is used by producer.closed(),
> But Async send was always waiting for kafka server .I gave it a wrong IP.
> I think it waits for long time Will bring problem with whole system.it occupy resources.
> And another problem was I sending kafka msg with true IP and Runnable ,Threadpools, all is right .Also use ↑ examples for loop.
> It take error that said wait for 3 tries.
> I also configered 
> advertised.host.name=xxx.xxx.xxx.xxx
> advertised.port=9092
> Now I think it maybe cannot get so much concurrent volume in a time.
> Our System is  over 1000tps.
> Thank you .
> Resource Code part:
> package kafka.baseconfig;
> import java.util.Properties;
> import com.travelsky.util.ConFile;
> import kafka.javaapi.producer.Producer;
> import kafka.producer.KeyedMessage;
> import kafka.producer.ProducerConfig;
> /**
>  * kafka工厂模式
>  * 
>  * 1.替代Producer方法.//多线程效率不适合.
>  * 2.使用三部: 
>  * ProducerFactory fac = new ProducerFactory();
>  * fac.openProducer(); ->初始化对象
>  * fac.push(msg); ->发消息主体
>  * fac.closeProducer(); ->关闭对象
>  * @author 王宏
>  *
>  */
> public class ProducerFactory {
> 	protected Producer<String, String> producer = null;
> 	protected ConFile conf = null;
> 	private Properties props = new Properties();
> 	private String topic = null;
> 	{
> 		try {
> 			conf = new ConFile("KafkaProperties.conf");
> 			topic = conf.getString("topic");
> 			if (conf == null) {
> 				throw new Exception("kafka配置文件有问题");
> 			}
> 		} catch (Exception e) {
> 			e.printStackTrace();
> 		}
> 	}
> 	
> 	/**
> 	 * 发送消息方法
> 	 * @param msg
> 	 */
> 	public void push(String msg) {
> 		if (producer == null) {
> 			throw new RuntimeException("producer实例为空");
> 		}
> 		KeyedMessage<String, String> messageForSend = new KeyedMessage<String, String>(topic, msg);
> 		producer.send(messageForSend);
> 	}
> 	
> 	/**
> 	 * 打开生产者
> 	 */
> 	public void openProducer() {
> 		props.put("serializer.class", "kafka.serializer.StringEncoder");
> 		props.put("metadata.broker.list", conf.getString("kafkaserverurl"));
> 		// 异步发送
> 		props.put("producer.type", conf.getString("synctype"));
> 		// 每次发送多少条
> 		props.put("batch.num.messages", conf.getString("batchmsgnums"));
> 		
> 		//
> 		props.put("request.required.acks", "1");
> 		//
> 		props.put("queue.enqueue.timeout.ms", "1");
> 		//
> 		props.put("request.timeout.ms", "1");
> 		//
> 		props.put("timeout.ms", "1");
> 		//
> 		props.put("reconnect.backoff.ms", "1");
> 		//
> 		props.put("retry.backoff.ms", "1");
> 		//
> 		props.put("message.send.max.retries", "1");
> 		//
> 		props.put("retry.backoff.ms", "1");
> 		//
> 		props.put("linger.ms", "1");
> 		//
> 		props.put("max.block.ms", "1");
> 		//
> 		props.put("metadata.fetch.timeout.ms", "1");
> 		//
> 		props.put("metadata.max.age.ms", "1");
> 		//
> 		props.put("metrics.sample.window.ms ", "1");
> 		producer = new Producer<String, String>(new ProducerConfig(props));
> 		if (producer == null) {
> 			throw new RuntimeException("kafka producer 打开失败");
> 		}
> 	}
> 	/**
> 	 * 关闭生产对象
> 	 */
> 	public void closeProducer() {
> 		if (producer != null) {
> 			producer.close();
> 		}
> 	}
> 	/**
> 	 * 判断producer是否开启
> 	 * @return
> 	 */
> 	public boolean isOpenProduer() {
> 		return producer != null;
> 	}
> }
> package kafka.service.impl;
> import kafka.baseconfig.ProducerFactory;
> import kafka.service.KafkaService;
> public class KafkaServiceImpl implements KafkaService {
> 	private ProducerFactory factory = null;
> 	
> 	public KafkaServiceImpl() {
> 		factory = new ProducerFactory();
> 		factory.openProducer();
> 	}
> 	
> 	/**
> 	 * 往卡呼卡灌装数据并且可以修改topic
> 	 * @param msg 数据
> 	 * @param topic 发送的主题
> 	 * 
> 	 * @Deprecated 這個方法已經過期.不建議使用.
> 	 */
> 	@Override
> 	@Deprecated
> 	public void push(String msg) throws Exception {
> 		//new Producer(msg).start();
> 		if (factory.isOpenProduer()) {
> 			factory.push(msg);
> 		}else {
> 			throw new RuntimeException("factory沒有初始化");
> 		}
> 	}
> 	
> 	/**
> 	 * 过期方法
> 	 * 
> 	 * @param msg
> 	 * @param topic
> 	 * @throws Exception
> 	 */
> 	@Override
> 	public void push(String msg, String topic) throws Exception {
> 		//new Producer(msg, topic).start();
> 		if (factory.isOpenProduer()) {
> 			factory.push(msg);
> 		}else {
> 			throw new RuntimeException("factory沒有初始化");
> 		}
> 	}
> 	
> 	/**
> 	 * 释放资源.
> 	 */
> 	@Override
> 	public void closeProducerFactory()throws Exception{
> 		if (factory.isOpenProduer()) {
> 			factory.closeProducer();
> 		}
> 	}
> }
> public static void main(String[] args) throws Exception {
> 		long l = System.currentTimeMillis();
> 		for (int i = 0; i < 10; i++) {
> 			go(i);
> 		}
> 		System.out.println(System.currentTimeMillis() - l);
> }
> 	public static void go(int s) throws Exception {
> 		for (int i = 0; i < 1400; i++) {
> 			KafkaService kf = new KafkaServiceImpl();
> 			String msg = "a b 0a b c d e 0a b c d e 0" + s + "--" + i;
> 			System.out.println(msg);
> 			kf.push(msg);
> 			kf.closeProducerFactory();
> 		}
> 		System.out.println(s);
> 		Thread.sleep(1000);
> 	}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)