You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2019/12/27 07:09:42 UTC

[GitHub] [rocketmq] areyouok commented on a change in pull request #1627: [for 4.6.0] Fix concurrent problem in ProducerManager.getAvaliableCha…

areyouok commented on a change in pull request #1627: [for 4.6.0] Fix concurrent problem in ProducerManager.getAvaliableCha…
URL: https://github.com/apache/rocketmq/pull/1627#discussion_r361597823
 
 

 ##########
 File path: broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
 ##########
 @@ -36,205 +32,145 @@
 
 public class ProducerManager {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
-    private static final long LOCK_TIMEOUT_MILLIS = 3000;
     private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
     private static final int GET_AVALIABLE_CHANNEL_RETRY_COUNT = 3;
-    private final Lock groupChannelLock = new ReentrantLock();
-    private final HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> groupChannelTable =
-        new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
+    private final ConcurrentHashMap<String /* group name */, ConcurrentHashMap<Channel, ClientChannelInfo>> groupChannelTable =
+        new ConcurrentHashMap<>();
     private final ConcurrentHashMap<String, Channel> clientChannelTable = new ConcurrentHashMap<>();
     private PositiveAtomicCounter positiveAtomicCounter = new PositiveAtomicCounter();
 
     public ProducerManager() {
     }
 
-    public HashMap<String, HashMap<Channel, ClientChannelInfo>> getGroupChannelTable() {
-        HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> newGroupChannelTable =
-            new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
-        try {
-            if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
-                try {
-                    Iterator<Map.Entry<String, HashMap<Channel, ClientChannelInfo>>> iter = groupChannelTable.entrySet().iterator();
-                    while (iter.hasNext()) {
-                        Map.Entry<String, HashMap<Channel, ClientChannelInfo>> entry = iter.next();
-                        String key = entry.getKey();
-                        HashMap<Channel, ClientChannelInfo> val = entry.getValue();
-                        HashMap<Channel, ClientChannelInfo> tmp = new HashMap<Channel, ClientChannelInfo>();
-                        tmp.putAll(val);
-                        newGroupChannelTable.put(key, tmp);
-                    }
-                } finally {
-                    groupChannelLock.unlock();
-                }
-            }
-        } catch (InterruptedException e) {
-            log.error("", e);
-        }
-        return newGroupChannelTable;
+    public ConcurrentHashMap<String, ConcurrentHashMap<Channel, ClientChannelInfo>> getGroupChannelTable() {
+        return groupChannelTable;
     }
 
     public void scanNotActiveChannel() {
-        try {
-            if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
-                try {
-                    for (final Map.Entry<String, HashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable
-                        .entrySet()) {
-                        final String group = entry.getKey();
-                        final HashMap<Channel, ClientChannelInfo> chlMap = entry.getValue();
-
-                        Iterator<Entry<Channel, ClientChannelInfo>> it = chlMap.entrySet().iterator();
-                        while (it.hasNext()) {
-                            Entry<Channel, ClientChannelInfo> item = it.next();
-                            // final Integer id = item.getKey();
-                            final ClientChannelInfo info = item.getValue();
-
-                            long diff = System.currentTimeMillis() - info.getLastUpdateTimestamp();
-                            if (diff > CHANNEL_EXPIRED_TIMEOUT) {
-                                it.remove();
-                                clientChannelTable.remove(info.getClientId());
-                                log.warn(
-                                    "SCAN: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}",
-                                    RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
-                                RemotingUtil.closeChannel(info.getChannel());
-                            }
-                        }
-                    }
-                } finally {
-                    this.groupChannelLock.unlock();
+        for (final Map.Entry<String, ConcurrentHashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable
+                .entrySet()) {
+            final String group = entry.getKey();
+            final ConcurrentHashMap<Channel, ClientChannelInfo> chlMap = entry.getValue();
+
+            Iterator<Entry<Channel, ClientChannelInfo>> it = chlMap.entrySet().iterator();
+            while (it.hasNext()) {
+                Entry<Channel, ClientChannelInfo> item = it.next();
+                // final Integer id = item.getKey();
+                final ClientChannelInfo info = item.getValue();
+
+                long diff = System.currentTimeMillis() - info.getLastUpdateTimestamp();
+                if (diff > CHANNEL_EXPIRED_TIMEOUT) {
+                    it.remove();
+                    clientChannelTable.remove(info.getClientId());
+                    log.warn(
+                            "SCAN: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}",
+                            RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
+                    RemotingUtil.closeChannel(info.getChannel());
                 }
-            } else {
-                log.warn("ProducerManager scanNotActiveChannel lock timeout");
             }
-        } catch (InterruptedException e) {
-            log.error("", e);
         }
     }
 
-    public void doChannelCloseEvent(final String remoteAddr, final Channel channel) {
+    public synchronized void doChannelCloseEvent(final String remoteAddr, final Channel channel) {
         if (channel != null) {
-            try {
-                if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
-                    try {
-                        for (final Map.Entry<String, HashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable
-                            .entrySet()) {
-                            final String group = entry.getKey();
-                            final HashMap<Channel, ClientChannelInfo> clientChannelInfoTable =
-                                entry.getValue();
-                            final ClientChannelInfo clientChannelInfo =
-                                clientChannelInfoTable.remove(channel);
-                            if (clientChannelInfo != null) {
-                                clientChannelTable.remove(clientChannelInfo.getClientId());
-                                log.info(
-                                    "NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}",
-                                    clientChannelInfo.toString(), remoteAddr, group);
-                            }
-
-                        }
-                    } finally {
-                        this.groupChannelLock.unlock();
-                    }
-                } else {
-                    log.warn("ProducerManager doChannelCloseEvent lock timeout");
+            for (final Map.Entry<String, ConcurrentHashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable
+                    .entrySet()) {
+                final String group = entry.getKey();
+                final ConcurrentHashMap<Channel, ClientChannelInfo> clientChannelInfoTable =
+                        entry.getValue();
+                final ClientChannelInfo clientChannelInfo =
+                        clientChannelInfoTable.remove(channel);
+                if (clientChannelInfo != null) {
+                    clientChannelTable.remove(clientChannelInfo.getClientId());
+                    log.info(
+                            "NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}",
+                            clientChannelInfo.toString(), remoteAddr, group);
                 }
-            } catch (InterruptedException e) {
-                log.error("", e);
+
             }
         }
     }
 
-    public void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) {
-        try {
-            ClientChannelInfo clientChannelInfoFound = null;
-
-            if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
-                try {
-                    HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
-                    if (null == channelTable) {
-                        channelTable = new HashMap<>();
-                        this.groupChannelTable.put(group, channelTable);
-                    }
-
-                    clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel());
-                    if (null == clientChannelInfoFound) {
-                        channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo);
-                        clientChannelTable.put(clientChannelInfo.getClientId(), clientChannelInfo.getChannel());
-                        log.info("new producer connected, group: {} channel: {}", group,
-                            clientChannelInfo.toString());
-                    }
-                } finally {
-                    this.groupChannelLock.unlock();
-                }
+    public synchronized void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) {
+        ClientChannelInfo clientChannelInfoFound = null;
 
-                if (clientChannelInfoFound != null) {
-                    clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis());
-                }
-            } else {
-                log.warn("ProducerManager registerProducer lock timeout");
-            }
-        } catch (InterruptedException e) {
-            log.error("", e);
+        ConcurrentHashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
+        if (null == channelTable) {
+            channelTable = new ConcurrentHashMap<>();
+            this.groupChannelTable.put(group, channelTable);
+        }
+
+        clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel());
+        if (null == clientChannelInfoFound) {
+            channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo);
+            clientChannelTable.put(clientChannelInfo.getClientId(), clientChannelInfo.getChannel());
+            log.info("new producer connected, group: {} channel: {}", group,
+                    clientChannelInfo.toString());
+        }
+
+
+        if (clientChannelInfoFound != null) {
+            clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis());
         }
     }
 
-    public void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) {
-        try {
-            if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
-                try {
-                    HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
-                    if (null != channelTable && !channelTable.isEmpty()) {
-                        ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel());
-                        clientChannelTable.remove(clientChannelInfo.getClientId());
-                        if (old != null) {
-                            log.info("unregister a producer[{}] from groupChannelTable {}", group,
-                                clientChannelInfo.toString());
-                        }
-
-                        if (channelTable.isEmpty()) {
-                            this.groupChannelTable.remove(group);
-                            log.info("unregister a producer group[{}] from groupChannelTable", group);
-                        }
-                    }
-                } finally {
-                    this.groupChannelLock.unlock();
-                }
-            } else {
-                log.warn("ProducerManager unregisterProducer lock timeout");
+    public synchronized void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) {
+        ConcurrentHashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
+        if (null != channelTable && !channelTable.isEmpty()) {
+            ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel());
+            clientChannelTable.remove(clientChannelInfo.getClientId());
+            if (old != null) {
+                log.info("unregister a producer[{}] from groupChannelTable {}", group,
+                        clientChannelInfo.toString());
+            }
+
+            if (channelTable.isEmpty()) {
+                this.groupChannelTable.remove(group);
+                log.info("unregister a producer group[{}] from groupChannelTable", group);
             }
-        } catch (InterruptedException e) {
-            log.error("", e);
         }
     }
 
     public Channel getAvaliableChannel(String groupId) {
 
 Review comment:
   This method is public, change name may cause incompatibility

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services