You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/05/26 19:58:34 UTC

activemq-artemis git commit: ARTEMIS-1175 fix memory leak

Repository: activemq-artemis
Updated Branches:
  refs/heads/1.x 1cbadb08d -> ee3669e42


ARTEMIS-1175 fix memory leak

(cherry picked from commit b136fed48f756071bbf9542e94ae6448c37881f2)
(cherry picked from commit f63ffc7af5d96fd1a6785a12d5ce3ae504831269)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ee3669e4
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ee3669e4
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ee3669e4

Branch: refs/heads/1.x
Commit: ee3669e422d9e0c5df66c29d44ff4d78ee76ba24
Parents: 1cbadb0
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri May 26 15:58:20 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri May 26 15:58:20 2017 -0400

----------------------------------------------------------------------
 .../core/protocol/mqtt/MQTTSessionState.java    | 22 +++++++++++++-------
 1 file changed, 14 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ee3669e4/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
index 67c2749..b1c83f2 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
@@ -40,7 +40,7 @@ public class MQTTSessionState {
 
    private final ConcurrentMap<String, Map<Long, Integer>> addressMessageMap = new ConcurrentHashMap<>();
 
-   private final Set<Integer>  pubRec = new HashSet<>();
+   private final Set<Integer> pubRec = new HashSet<>();
 
    private boolean attached = false;
 
@@ -127,7 +127,7 @@ public class MQTTSessionState {
 
    public class OutboundStore {
 
-      private HashMap<String, Integer> artemisToMqttMessageMap = new HashMap<>();
+      private HashMap<Pair<Long, Long>, Integer> artemisToMqttMessageMap = new HashMap<>();
 
       private HashMap<Integer, Pair<Long, Long>> mqttToServerIds = new HashMap<>();
 
@@ -135,9 +135,13 @@ public class MQTTSessionState {
 
       private final AtomicInteger ids = new AtomicInteger(0);
 
-      public int generateMqttId(long serverId, long consumerId) {
+      private Pair<Long, Long> generateKey(long messageId, long consumerID) {
+         return new Pair<>(messageId, consumerID);
+      }
+
+      public int generateMqttId(long messageId, long consumerId) {
          synchronized (dataStoreLock) {
-            Integer id = artemisToMqttMessageMap.get(consumerId + ":" + serverId);
+            Integer id = artemisToMqttMessageMap.get(generateKey(messageId, consumerId));
             if (id == null) {
                ids.compareAndSet(Short.MAX_VALUE, 1);
                id = ids.addAndGet(1);
@@ -146,18 +150,20 @@ public class MQTTSessionState {
          }
       }
 
-      public void publish(int mqtt, long serverId, long consumerId) {
+      public void publish(int mqtt, long messageId, long consumerId) {
          synchronized (dataStoreLock) {
-            artemisToMqttMessageMap.put(consumerId + ":" + serverId, mqtt);
-            mqttToServerIds.put(mqtt, new Pair(serverId, consumerId));
+            Pair<Long, Long> key = generateKey(messageId, consumerId);
+            artemisToMqttMessageMap.put(key, mqtt);
+            mqttToServerIds.put(mqtt, key);
          }
       }
 
       public Pair<Long, Long> publishAckd(int mqtt) {
          synchronized (dataStoreLock) {
-            Pair p =  mqttToServerIds.remove(mqtt);
+            Pair p = mqttToServerIds.remove(mqtt);
             if (p != null) {
                mqttToServerIds.remove(p.getA());
+               artemisToMqttMessageMap.remove(p);
             }
             return p;
          }