You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2018/08/13 07:58:48 UTC

[GitHub] shenxinquan opened a new issue #407: TransactionMQProducer 调用shutdown() 方法bug

shenxinquan opened a new issue #407: TransactionMQProducer   调用shutdown() 方法bug
URL: https://github.com/apache/rocketmq/issues/407
 
 
   代码部分片段。 
    ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new 
     ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
   	            @Override
   	            public Thread newThread(Runnable r) {
   	                Thread thread = new Thread(r);
   	                thread.setName("client-transaction-msg-check-thread");
   	                return thread;
   	            }
   	        });
   	       producer.setExecutorService(executorService);
   
   描述:当producer 设置自定义线程池的时候  producer 调用shutdown 方法的时候会报如下错误
   Exception in thread "main" java.lang.NullPointerException
   	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.destroyTransactionEnv(DefaultMQProducerImpl.java:135)
   	at org.apache.rocketmq.client.producer.TransactionMQProducer.shutdown(TransactionMQProducer.java:50)
   	at com.mmall.concurrency.rocketmq.TransactionProducer.main(TransactionProducer.java:89)
   
   主要是 DefaultMQProducerImpl 类 方法
       public void destroyTransactionEnv() {
           this.checkExecutor.shutdown();
           this.checkRequestQueue.clear();
       }
    里面的this.checkRequestQueue.clear();  报错
   
   分析主要原因是 DefaultMQProducerImpl 里面的initTransactionEnv()方法 当有ExecutorService线程池的时候   this.checkRequestQueue 没同意初始化导致的。
   代码片段如下
     public void initTransactionEnv() {
           TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer;
           if (producer.getExecutorService() != null) {
               this.checkExecutor = producer.getExecutorService();
           } else {
               this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(2000);
               this.checkExecutor = new ThreadPoolExecutor(
                   1,
                   1,
                   1000 * 60,
                   TimeUnit.MILLISECONDS,
                   this.checkRequestQueue);
           }
       }
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services