You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2021/12/15 12:00:41 UTC
[rocketmq] branch develop updated: [ISSUE #3624]Fix producer/consumer re-start may fail introduced in #3454 (#3639)
This is an automated email from the ASF dual-hosted git repository.
huangli pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 64ad790 [ISSUE #3624]Fix producer/consumer re-start may fail introduced in #3454 (#3639)
64ad790 is described below
commit 64ad7908b89bec5190e14c80f044b7060ac2774d
Author: panzhi <pa...@qq.com>
AuthorDate: Wed Dec 15 20:00:21 2021 +0800
[ISSUE #3624]Fix producer/consumer re-start may fail introduced in #3454 (#3639)
Fix producer/consumer re-start may fail introduced in #3454
1. start a producer
2. shutdown producer
3. start the second producer, exception throws
---
.../impl/producer/DefaultMQProducerImpl.java | 21 +--------
.../client/producer/RequestFutureHolder.java | 50 ++++++++++++++--------
2 files changed, 35 insertions(+), 36 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index b27117f..cedbbdb 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -223,25 +223,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
- this.startScheduledTask();
+ RequestFutureHolder.getInstance().startScheduledTask(this);
}
- private void startScheduledTask() {
- if (RequestFutureHolder.getInstance().getProducerNum().incrementAndGet() == 1) {
- RequestFutureHolder.getInstance().getScheduledExecutorService().scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- RequestFutureHolder.getInstance().scanExpiredRequest();
- } catch (Throwable e) {
- log.error("scan RequestFutureTable exception", e);
- }
- }
- }, 1000 * 3, 1000, TimeUnit.MILLISECONDS);
- }
- }
-
private void checkConfig() throws MQClientException {
Validators.checkGroup(this.defaultMQProducer.getProducerGroup());
@@ -269,9 +254,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
if (shutdownFactory) {
this.mQClientFactory.shutdown();
}
- if (RequestFutureHolder.getInstance().getProducerNum().decrementAndGet() == 0) {
- RequestFutureHolder.getInstance().getScheduledExecutorService().shutdown();
- }
+ RequestFutureHolder.getInstance().shutdown(this);
log.info("the producer [{}] shutdown OK", this.defaultMQProducer.getProducerGroup());
this.serviceState = ServiceState.SHUTDOWN_ALREADY;
break;
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureHolder.java b/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureHolder.java
index 24b3a90..8fe9abc 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureHolder.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureHolder.java
@@ -17,39 +17,36 @@
package org.apache.rocketmq.client.producer;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.common.ClientErrorCode;
import org.apache.rocketmq.client.exception.RequestTimeoutException;
+import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.logging.InternalLogger;
public class RequestFutureHolder {
private static InternalLogger log = ClientLogger.getLog();
private static final RequestFutureHolder INSTANCE = new RequestFutureHolder();
private ConcurrentHashMap<String, RequestResponseFuture> requestFutureTable = new ConcurrentHashMap<String, RequestResponseFuture>();
- private final AtomicInteger producerNum = new AtomicInteger(0);
- private final ScheduledExecutorService scheduledExecutorService = Executors
- .newSingleThreadScheduledExecutor(new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "RequestHouseKeepingService");
- }
- });
+ private final Set<DefaultMQProducerImpl> producerSet = new HashSet<>();
+ private ScheduledExecutorService scheduledExecutorService = null;
public ConcurrentHashMap<String, RequestResponseFuture> getRequestFutureTable() {
return requestFutureTable;
}
- public void scanExpiredRequest() {
+ private void scanExpiredRequest() {
final List<RequestResponseFuture> rfList = new LinkedList<RequestResponseFuture>();
Iterator<Map.Entry<String, RequestResponseFuture>> it = requestFutureTable.entrySet().iterator();
while (it.hasNext()) {
@@ -74,17 +71,36 @@ public class RequestFutureHolder {
}
}
- private RequestFutureHolder() {
- }
+ public synchronized void startScheduledTask(DefaultMQProducerImpl producer) {
+ this.producerSet.add(producer);
+ if (null == scheduledExecutorService) {
+ this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("RequestHouseKeepingService"));
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ RequestFutureHolder.getInstance().scanExpiredRequest();
+ } catch (Throwable e) {
+ log.error("scan RequestFutureTable exception", e);
+ }
+ }
+ }, 1000 * 3, 1000, TimeUnit.MILLISECONDS);
- public AtomicInteger getProducerNum() {
- return producerNum;
+ }
}
- public ScheduledExecutorService getScheduledExecutorService() {
- return scheduledExecutorService;
+ public synchronized void shutdown(DefaultMQProducerImpl producer) {
+ this.producerSet.remove(producer);
+ if (this.producerSet.size() <= 0 && null != this.scheduledExecutorService) {
+ ScheduledExecutorService executorService = this.scheduledExecutorService;
+ this.scheduledExecutorService = null;
+ executorService.shutdown();
+ }
}
+ private RequestFutureHolder() {}
+
public static RequestFutureHolder getInstance() {
return INSTANCE;
}