You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2020/11/03 14:19:31 UTC

[GitHub] [rocketmq] wangchongyang007 opened a new issue #2397: When DefaultMQProducer is initialized, there is a risk of thread exhaustion in high concurrency situations

wangchongyang007 opened a new issue #2397:
URL: https://github.com/apache/rocketmq/issues/2397


   
   When the same groupName of  DefaultMQProducer are initialized multiple times, Even if it fails, and three threads are still generated.
   
   Tread1: ClientHouseKeepingService (It is  a daemon)
   Tread2: AsyncAppender-Dispatcher-Thread
   Tread3: RequestHouseKeepingService
   
   And they will never be destroyed.
   
   Developers should be reminded that thread-safe and single-instance use of it. Or fix it.
   
   Scene recurrence:
   
       public static void main(String[] args) {
   
           ThreadPoolExecutor asyncSenderExecutor = new ThreadPoolExecutor(
                   10, 10, 1000 * 60, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(500000),
                   new ThreadFactory() {
                       private AtomicInteger threadIndex = new AtomicInteger(0);
   
                       @Override
                       public Thread newThread(Runnable r) {
                           return new Thread(r, "test111" + this.threadIndex.incrementAndGet());
                       }
                   });
   
           for (int i = 0; i < 50; i++) {
               asyncSenderExecutor.submit(new Runnable() {
                   @Override
                   public void run() {
                       createMqProducer();
                   }
               });
           }
           try {
               Thread.sleep(20000000000L);
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       }
   
   
       public static DefaultMQProducer createMqProducer() {
           DefaultMQProducer producer = new DefaultMQProducer("w222");
           producer.setNamesrvAddr("192.168.0.22:9876");
           producer.setRetryTimesWhenSendAsyncFailed(2);
           producer.setSendMsgTimeout(50000);
   
           ThreadPoolExecutor asyncSenderExecutor = new ThreadPoolExecutor(
                   8, 8, 1000 * 60, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(500000),
                   new ThreadFactory() {
                       private AtomicInteger threadIndex = new AtomicInteger(0);
   
                       @Override
                       public Thread newThread(Runnable r) {
                           return new Thread(r, "aaaAsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
                       }
                   });
           producer.setAsyncSenderExecutor(asyncSenderExecutor);
           producer.setRetryAnotherBrokerWhenNotStoreOK(true);
           try {
               producer.start();
               return producer;
           } catch (Exception e) {
               e.printStackTrace();
           } finally {
               try {
                   producer.shutdown();
               } finally {
                   producer = null;
               }
           }
           return null;
       }
   
   ![image](https://user-images.githubusercontent.com/16188008/97858751-93ce1f80-1d3a-11eb-9288-2dbffe211eec.png)
   ![image](https://user-images.githubusercontent.com/16188008/97858825-b06a5780-1d3a-11eb-9e0f-3a505ee7f965.png)
   ![image](https://user-images.githubusercontent.com/16188008/97858862-be1fdd00-1d3a-11eb-93e4-0dd6d182574c.png)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] maixiaohai commented on issue #2397: When DefaultMQProducer is initialized, there is a risk of thread exhaustion in high concurrency situations

Posted by GitBox <gi...@apache.org>.
maixiaohai commented on issue #2397:
URL: https://github.com/apache/rocketmq/issues/2397#issuecomment-785055222


   issue #1957 fixed this problem?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] lizhiboo commented on issue #2397: When DefaultMQProducer is initialized, there is a risk of thread exhaustion in high concurrency situations

Posted by GitBox <gi...@apache.org>.
lizhiboo commented on issue #2397:
URL: https://github.com/apache/rocketmq/issues/2397#issuecomment-785047717


   > When the same groupName of DefaultMQProducer are initialized multiple times, Even if it fails, and three threads are still generated.
   > 
   > Tread1: ClientHouseKeepingService (It is a daemon)
   > Tread2: AsyncAppender-Dispatcher-Thread
   > Tread3: RequestHouseKeepingService
   > 
   > And they will never be destroyed.
   > 
   > Developers should be reminded that thread-safe and single-instance use of it. Or fix it.
   > 
   > Scene recurrence:
   > 
   > ```
   > public static void main(String[] args) {
   > 
   >     ThreadPoolExecutor asyncSenderExecutor = new ThreadPoolExecutor(
   >             10, 10, 1000 * 60, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(500000),
   >             new ThreadFactory() {
   >                 private AtomicInteger threadIndex = new AtomicInteger(0);
   > 
   >                 @Override
   >                 public Thread newThread(Runnable r) {
   >                     return new Thread(r, "test111" + this.threadIndex.incrementAndGet());
   >                 }
   >             });
   > 
   >     for (int i = 0; i < 50; i++) {
   >         asyncSenderExecutor.submit(new Runnable() {
   >             @Override
   >             public void run() {
   >                 createMqProducer();
   >             }
   >         });
   >     }
   >     try {
   >         Thread.sleep(20000000000L);
   >     } catch (InterruptedException e) {
   >         e.printStackTrace();
   >     }
   > }
   > 
   > 
   > public static DefaultMQProducer createMqProducer() {
   >     DefaultMQProducer producer = new DefaultMQProducer("w222");
   >     producer.setNamesrvAddr("192.168.0.22:9876");
   >     producer.setRetryTimesWhenSendAsyncFailed(2);
   >     producer.setSendMsgTimeout(50000);
   > 
   >     ThreadPoolExecutor asyncSenderExecutor = new ThreadPoolExecutor(
   >             8, 8, 1000 * 60, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(500000),
   >             new ThreadFactory() {
   >                 private AtomicInteger threadIndex = new AtomicInteger(0);
   > 
   >                 @Override
   >                 public Thread newThread(Runnable r) {
   >                     return new Thread(r, "aaaAsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
   >                 }
   >             });
   >     producer.setAsyncSenderExecutor(asyncSenderExecutor);
   >     producer.setRetryAnotherBrokerWhenNotStoreOK(true);
   >     try {
   >         producer.start();
   >         return producer;
   >     } catch (Exception e) {
   >         e.printStackTrace();
   >     } finally {
   >         try {
   >             producer.shutdown();
   >         } finally {
   >             producer = null;
   >         }
   >     }
   >     return null;
   > }
   > ```
   > 
   > ![image](https://user-images.githubusercontent.com/16188008/97858751-93ce1f80-1d3a-11eb-9288-2dbffe211eec.png)
   > ![image](https://user-images.githubusercontent.com/16188008/97858825-b06a5780-1d3a-11eb-9e0f-3a505ee7f965.png)
   > ![image](https://user-images.githubusercontent.com/16188008/97858862-be1fdd00-1d3a-11eb-93e4-0dd6d182574c.png)
   
   In RocketMQ 4.8.0, thread RequestHouseKeepingService is daemon. Can u supply the detailed version in ur environment?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org