You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2021/07/30 18:07:02 UTC
[rocketmq] branch develop updated: [ISSUE #3199] Two timed task for
RequestFutureTable (#3202)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 94abced [ISSUE #3199] Two timed task for RequestFutureTable (#3202)
94abced is described below
commit 94abced3f7b391a560421247ad0c70401d4bd6a8
Author: panzhi <pa...@qq.com>
AuthorDate: Sat Jul 31 02:06:50 2021 +0800
[ISSUE #3199] Two timed task for RequestFutureTable (#3202)
Co-authored-by: panzhi33 <wb...@alibaba-inc.com>
---
.../impl/producer/DefaultMQProducerImpl.java | 40 +++++++++++++++-------
.../client/producer/RequestFutureTable.java | 7 ++++
2 files changed, 34 insertions(+), 13 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index fac3ed3..00ee3b0 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -24,14 +24,14 @@ import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -107,7 +107,12 @@ public class DefaultMQProducerImpl implements MQProducerInner {
private final RPCHook rpcHook;
private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
private final ExecutorService defaultAsyncSenderExecutor;
- private final Timer timer = new Timer("RequestHouseKeepingService", true);
+ private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "RequestHouseKeepingService");
+ }
+ });
protected BlockingQueue<Runnable> checkRequestQueue;
protected ExecutorService checkExecutor;
private ServiceState serviceState = ServiceState.CREATE_JUST;
@@ -227,16 +232,23 @@ public class DefaultMQProducerImpl implements MQProducerInner {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
- this.timer.scheduleAtFixedRate(new TimerTask() {
- @Override
- public void run() {
- try {
- RequestFutureTable.scanExpiredRequest();
- } catch (Throwable e) {
- log.error("scan RequestFutureTable exception", e);
+ this.startScheduledTask();
+
+ }
+
+ private void startScheduledTask() {
+ if (RequestFutureTable.getProducerNum().incrementAndGet() == 1) {
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ RequestFutureTable.scanExpiredRequest();
+ } catch (Throwable e) {
+ log.error("scan RequestFutureTable exception", e);
+ }
}
- }
- }, 1000 * 3, 1000);
+ }, 1000 * 3, 1000, TimeUnit.MILLISECONDS);
+ }
}
private void checkConfig() throws MQClientException {
@@ -266,7 +278,9 @@ public class DefaultMQProducerImpl implements MQProducerInner {
if (shutdownFactory) {
this.mQClientFactory.shutdown();
}
- this.timer.cancel();
+ if (RequestFutureTable.getProducerNum().decrementAndGet() == 0) {
+ scheduledExecutorService.shutdown();
+ }
log.info("the producer [{}] shutdown OK", this.defaultMQProducer.getProducerGroup());
this.serviceState = ServiceState.SHUTDOWN_ALREADY;
break;
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureTable.java b/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureTable.java
index 3d4caa2..52cda3e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureTable.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureTable.java
@@ -22,6 +22,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.rocketmq.client.common.ClientErrorCode;
import org.apache.rocketmq.client.exception.RequestTimeoutException;
import org.apache.rocketmq.client.log.ClientLogger;
@@ -30,6 +32,7 @@ import org.apache.rocketmq.logging.InternalLogger;
public class RequestFutureTable {
private static InternalLogger log = ClientLogger.getLog();
private static ConcurrentHashMap<String, RequestResponseFuture> requestFutureTable = new ConcurrentHashMap<String, RequestResponseFuture>();
+ private static final AtomicInteger producerNum = new AtomicInteger(0);
public static ConcurrentHashMap<String, RequestResponseFuture> getRequestFutureTable() {
return requestFutureTable;
@@ -59,4 +62,8 @@ public class RequestFutureTable {
}
}
}
+
+ public static AtomicInteger getProducerNum() {
+ return producerNum;
+ }
}