You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Ismael Juma (JIRA)" <ji...@apache.org> on 2017/10/28 15:41:00 UTC
[jira] [Resolved] (KAFKA-4832) kafka producer send Async message to
the wrong IP cannot to stop producer.close()
[ https://issues.apache.org/jira/browse/KAFKA-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ismael Juma resolved KAFKA-4832.
--------------------------------
Resolution: Auto Closed
The Scala producers have been deprecated for a while and no further work is planned. Please upgrade to the Java producer whenever possible.
> kafka producer send Async message to the wrong IP cannot to stop producer.close()
> ---------------------------------------------------------------------------------
>
> Key: KAFKA-4832
> URL: https://issues.apache.org/jira/browse/KAFKA-4832
> Project: Kafka
> Issue Type: Bug
> Components: producer
> Affects Versions: 0.8.2.2
> Environment: JDK8 Eclipse Mars Win7
> Reporter: Wang Hong
>
> 1.When I tried to send msg by Async with wrong IP in loop 1400times 10batches.
> 2.I use javaapi.kafkaproducer designed by Factory.
> 3.1 of 10 batches I take Producer.Connected() and Producer.Closed().
> 4.I know I send msg to a wrong IP finally, But I noticed the terminal was blocking. It can't close normally.
> function just like that :
> public static void go(int s) throws Exception {
> KafkaService kf = new KafkaServiceImpl();//init properties
> for (int i = 0; i < 1400; i++) {
> String msg = "a b 0a b c d e 0a b c d e 0" + s + "--" + i;
> System.out.println(msg);
> kf.push(msg); //producer.send()
> }
> kf.closeProducerFactory();//producer.closed()
> System.out.println(s);
> Thread.sleep(1000);
> }
> kf.closeProducerFactory() is used by producer.closed(),
> But Async send was always waiting for kafka server .I gave it a wrong IP.
> I think it waits for long time Will bring problem with whole system.it occupy resources.
> And another problem was I sending kafka msg with true IP and Runnable ,Threadpools, all is right .Also use ↑ examples for loop.
> It take error that said wait for 3 tries.
> I also configered
> advertised.host.name=xxx.xxx.xxx.xxx
> advertised.port=9092
> Now I think it maybe cannot get so much concurrent volume in a time.
> Our System is over 1000tps.
> Thank you .
> Resource Code part:
> package kafka.baseconfig;
> import java.util.Properties;
> import com.travelsky.util.ConFile;
> import kafka.javaapi.producer.Producer;
> import kafka.producer.KeyedMessage;
> import kafka.producer.ProducerConfig;
> /**
> * kafka工厂模式
> *
> * 1.替代Producer方法.//多线程效率不适合.
> * 2.使用三部:
> * ProducerFactory fac = new ProducerFactory();
> * fac.openProducer(); ->初始化对象
> * fac.push(msg); ->发消息主体
> * fac.closeProducer(); ->关闭对象
> * @author 王宏
> *
> */
> public class ProducerFactory {
> protected Producer<String, String> producer = null;
> protected ConFile conf = null;
> private Properties props = new Properties();
> private String topic = null;
> {
> try {
> conf = new ConFile("KafkaProperties.conf");
> topic = conf.getString("topic");
> if (conf == null) {
> throw new Exception("kafka配置文件有问题");
> }
> } catch (Exception e) {
> e.printStackTrace();
> }
> }
>
> /**
> * 发送消息方法
> * @param msg
> */
> public void push(String msg) {
> if (producer == null) {
> throw new RuntimeException("producer实例为空");
> }
> KeyedMessage<String, String> messageForSend = new KeyedMessage<String, String>(topic, msg);
> producer.send(messageForSend);
> }
>
> /**
> * 打开生产者
> */
> public void openProducer() {
> props.put("serializer.class", "kafka.serializer.StringEncoder");
> props.put("metadata.broker.list", conf.getString("kafkaserverurl"));
> // 异步发送
> props.put("producer.type", conf.getString("synctype"));
> // 每次发送多少条
> props.put("batch.num.messages", conf.getString("batchmsgnums"));
>
> //
> props.put("request.required.acks", "1");
> //
> props.put("queue.enqueue.timeout.ms", "1");
> //
> props.put("request.timeout.ms", "1");
> //
> props.put("timeout.ms", "1");
> //
> props.put("reconnect.backoff.ms", "1");
> //
> props.put("retry.backoff.ms", "1");
> //
> props.put("message.send.max.retries", "1");
> //
> props.put("retry.backoff.ms", "1");
> //
> props.put("linger.ms", "1");
> //
> props.put("max.block.ms", "1");
> //
> props.put("metadata.fetch.timeout.ms", "1");
> //
> props.put("metadata.max.age.ms", "1");
> //
> props.put("metrics.sample.window.ms ", "1");
> producer = new Producer<String, String>(new ProducerConfig(props));
> if (producer == null) {
> throw new RuntimeException("kafka producer 打开失败");
> }
> }
> /**
> * 关闭生产对象
> */
> public void closeProducer() {
> if (producer != null) {
> producer.close();
> }
> }
> /**
> * 判断producer是否开启
> * @return
> */
> public boolean isOpenProduer() {
> return producer != null;
> }
> }
> package kafka.service.impl;
> import kafka.baseconfig.ProducerFactory;
> import kafka.service.KafkaService;
> public class KafkaServiceImpl implements KafkaService {
> private ProducerFactory factory = null;
>
> public KafkaServiceImpl() {
> factory = new ProducerFactory();
> factory.openProducer();
> }
>
> /**
> * 往卡呼卡灌装数据并且可以修改topic
> * @param msg 数据
> * @param topic 发送的主题
> *
> * @Deprecated 這個方法已經過期.不建議使用.
> */
> @Override
> @Deprecated
> public void push(String msg) throws Exception {
> //new Producer(msg).start();
> if (factory.isOpenProduer()) {
> factory.push(msg);
> }else {
> throw new RuntimeException("factory沒有初始化");
> }
> }
>
> /**
> * 过期方法
> *
> * @param msg
> * @param topic
> * @throws Exception
> */
> @Override
> public void push(String msg, String topic) throws Exception {
> //new Producer(msg, topic).start();
> if (factory.isOpenProduer()) {
> factory.push(msg);
> }else {
> throw new RuntimeException("factory沒有初始化");
> }
> }
>
> /**
> * 释放资源.
> */
> @Override
> public void closeProducerFactory()throws Exception{
> if (factory.isOpenProduer()) {
> factory.closeProducer();
> }
> }
> }
> public static void main(String[] args) throws Exception {
> long l = System.currentTimeMillis();
> for (int i = 0; i < 10; i++) {
> go(i);
> }
> System.out.println(System.currentTimeMillis() - l);
> }
> public static void go(int s) throws Exception {
> for (int i = 0; i < 1400; i++) {
> KafkaService kf = new KafkaServiceImpl();
> String msg = "a b 0a b c d e 0a b c d e 0" + s + "--" + i;
> System.out.println(msg);
> kf.push(msg);
> kf.closeProducerFactory();
> }
> System.out.println(s);
> Thread.sleep(1000);
> }
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)