You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2021/12/15 12:00:41 UTC

[rocketmq] branch develop updated: [ISSUE #3624]Fix producer/consumer re-start may fail introduced in #3454 (#3639)

This is an automated email from the ASF dual-hosted git repository.

huangli pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 64ad790  [ISSUE #3624]Fix producer/consumer re-start may fail introduced in #3454 (#3639)
64ad790 is described below

commit 64ad7908b89bec5190e14c80f044b7060ac2774d
Author: panzhi <pa...@qq.com>
AuthorDate: Wed Dec 15 20:00:21 2021 +0800

    [ISSUE #3624]Fix producer/consumer re-start may fail introduced in #3454 (#3639)
    
    Fix producer/consumer re-start may fail introduced in #3454
    
    1. start a producer
    2. shutdown producer
    3. start the second producer, exception throws
---
 .../impl/producer/DefaultMQProducerImpl.java       | 21 +--------
 .../client/producer/RequestFutureHolder.java       | 50 ++++++++++++++--------
 2 files changed, 35 insertions(+), 36 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index b27117f..cedbbdb 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -223,25 +223,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
 
         this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
 
-        this.startScheduledTask();
+        RequestFutureHolder.getInstance().startScheduledTask(this);
 
     }
 
-    private void startScheduledTask() {
-        if (RequestFutureHolder.getInstance().getProducerNum().incrementAndGet() == 1) {
-            RequestFutureHolder.getInstance().getScheduledExecutorService().scheduleAtFixedRate(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        RequestFutureHolder.getInstance().scanExpiredRequest();
-                    } catch (Throwable e) {
-                        log.error("scan RequestFutureTable exception", e);
-                    }
-                }
-            }, 1000 * 3, 1000, TimeUnit.MILLISECONDS);
-        }
-    }
-
     private void checkConfig() throws MQClientException {
         Validators.checkGroup(this.defaultMQProducer.getProducerGroup());
 
@@ -269,9 +254,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
                 if (shutdownFactory) {
                     this.mQClientFactory.shutdown();
                 }
-                if (RequestFutureHolder.getInstance().getProducerNum().decrementAndGet() == 0) {
-                    RequestFutureHolder.getInstance().getScheduledExecutorService().shutdown();
-                }
+                RequestFutureHolder.getInstance().shutdown(this);
                 log.info("the producer [{}] shutdown OK", this.defaultMQProducer.getProducerGroup());
                 this.serviceState = ServiceState.SHUTDOWN_ALREADY;
                 break;
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureHolder.java b/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureHolder.java
index 24b3a90..8fe9abc 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureHolder.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureHolder.java
@@ -17,39 +17,36 @@
 
 package org.apache.rocketmq.client.producer;
 
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.rocketmq.client.common.ClientErrorCode;
 import org.apache.rocketmq.client.exception.RequestTimeoutException;
+import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
 import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.logging.InternalLogger;
 
 public class RequestFutureHolder {
     private static InternalLogger log = ClientLogger.getLog();
     private static final RequestFutureHolder INSTANCE = new RequestFutureHolder();
     private ConcurrentHashMap<String, RequestResponseFuture> requestFutureTable = new ConcurrentHashMap<String, RequestResponseFuture>();
-    private final AtomicInteger producerNum = new AtomicInteger(0);
-    private final ScheduledExecutorService scheduledExecutorService = Executors
-        .newSingleThreadScheduledExecutor(new ThreadFactory() {
-            @Override
-            public Thread newThread(Runnable r) {
-                return new Thread(r, "RequestHouseKeepingService");
-            }
-        });
+    private final Set<DefaultMQProducerImpl> producerSet = new HashSet<>();
+    private ScheduledExecutorService scheduledExecutorService = null;
 
     public ConcurrentHashMap<String, RequestResponseFuture> getRequestFutureTable() {
         return requestFutureTable;
     }
 
-    public void scanExpiredRequest() {
+    private void scanExpiredRequest() {
         final List<RequestResponseFuture> rfList = new LinkedList<RequestResponseFuture>();
         Iterator<Map.Entry<String, RequestResponseFuture>> it = requestFutureTable.entrySet().iterator();
         while (it.hasNext()) {
@@ -74,17 +71,36 @@ public class RequestFutureHolder {
         }
     }
 
-    private RequestFutureHolder() {
-    }
+    public synchronized void startScheduledTask(DefaultMQProducerImpl producer) {
+        this.producerSet.add(producer);
+        if (null == scheduledExecutorService) {
+            this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("RequestHouseKeepingService"));
+
+            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        RequestFutureHolder.getInstance().scanExpiredRequest();
+                    } catch (Throwable e) {
+                        log.error("scan RequestFutureTable exception", e);
+                    }
+                }
+            }, 1000 * 3, 1000, TimeUnit.MILLISECONDS);
 
-    public AtomicInteger getProducerNum() {
-        return producerNum;
+        }
     }
 
-    public ScheduledExecutorService getScheduledExecutorService() {
-        return scheduledExecutorService;
+    public synchronized void shutdown(DefaultMQProducerImpl producer) {
+        this.producerSet.remove(producer);
+        if (this.producerSet.size() <= 0 && null != this.scheduledExecutorService) {
+            ScheduledExecutorService executorService = this.scheduledExecutorService;
+            this.scheduledExecutorService = null;
+            executorService.shutdown();
+        }
     }
 
+    private RequestFutureHolder() {}
+
     public static RequestFutureHolder getInstance() {
         return INSTANCE;
     }