You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2021/07/30 18:07:02 UTC

[rocketmq] branch develop updated: [ISSUE #3199] Two timed task for RequestFutureTable (#3202)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 94abced  [ISSUE #3199] Two timed task for RequestFutureTable (#3202)
94abced is described below

commit 94abced3f7b391a560421247ad0c70401d4bd6a8
Author: panzhi <pa...@qq.com>
AuthorDate: Sat Jul 31 02:06:50 2021 +0800

    [ISSUE #3199] Two timed task for RequestFutureTable (#3202)
    
    Co-authored-by: panzhi33 <wb...@alibaba-inc.com>
---
 .../impl/producer/DefaultMQProducerImpl.java       | 40 +++++++++++++++-------
 .../client/producer/RequestFutureTable.java        |  7 ++++
 2 files changed, 34 insertions(+), 13 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index fac3ed3..00ee3b0 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -24,14 +24,14 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
 import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -107,7 +107,12 @@ public class DefaultMQProducerImpl implements MQProducerInner {
     private final RPCHook rpcHook;
     private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
     private final ExecutorService defaultAsyncSenderExecutor;
-    private final Timer timer = new Timer("RequestHouseKeepingService", true);
+    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+        @Override
+        public Thread newThread(Runnable r) {
+            return new Thread(r, "RequestHouseKeepingService");
+        }
+    });
     protected BlockingQueue<Runnable> checkRequestQueue;
     protected ExecutorService checkExecutor;
     private ServiceState serviceState = ServiceState.CREATE_JUST;
@@ -227,16 +232,23 @@ public class DefaultMQProducerImpl implements MQProducerInner {
 
         this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
 
-        this.timer.scheduleAtFixedRate(new TimerTask() {
-            @Override
-            public void run() {
-                try {
-                    RequestFutureTable.scanExpiredRequest();
-                } catch (Throwable e) {
-                    log.error("scan RequestFutureTable exception", e);
+        this.startScheduledTask();
+
+    }
+
+    private void startScheduledTask() {
+        if (RequestFutureTable.getProducerNum().incrementAndGet() == 1) {
+            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        RequestFutureTable.scanExpiredRequest();
+                    } catch (Throwable e) {
+                        log.error("scan RequestFutureTable exception", e);
+                    }
                 }
-            }
-        }, 1000 * 3, 1000);
+            }, 1000 * 3, 1000, TimeUnit.MILLISECONDS);
+        }
     }
 
     private void checkConfig() throws MQClientException {
@@ -266,7 +278,9 @@ public class DefaultMQProducerImpl implements MQProducerInner {
                 if (shutdownFactory) {
                     this.mQClientFactory.shutdown();
                 }
-                this.timer.cancel();
+                if (RequestFutureTable.getProducerNum().decrementAndGet() == 0) {
+                    scheduledExecutorService.shutdown();
+                }
                 log.info("the producer [{}] shutdown OK", this.defaultMQProducer.getProducerGroup());
                 this.serviceState = ServiceState.SHUTDOWN_ALREADY;
                 break;
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureTable.java b/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureTable.java
index 3d4caa2..52cda3e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureTable.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureTable.java
@@ -22,6 +22,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.rocketmq.client.common.ClientErrorCode;
 import org.apache.rocketmq.client.exception.RequestTimeoutException;
 import org.apache.rocketmq.client.log.ClientLogger;
@@ -30,6 +32,7 @@ import org.apache.rocketmq.logging.InternalLogger;
 public class RequestFutureTable {
     private static InternalLogger log = ClientLogger.getLog();
     private static ConcurrentHashMap<String, RequestResponseFuture> requestFutureTable = new ConcurrentHashMap<String, RequestResponseFuture>();
+    private static final AtomicInteger producerNum = new AtomicInteger(0);
 
     public static ConcurrentHashMap<String, RequestResponseFuture> getRequestFutureTable() {
         return requestFutureTable;
@@ -59,4 +62,8 @@ public class RequestFutureTable {
             }
         }
     }
+
+    public static AtomicInteger getProducerNum() {
+        return producerNum;
+    }
 }