You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/05/26 19:58:34 UTC
activemq-artemis git commit: ARTEMIS-1175 fix memory leak
Repository: activemq-artemis
Updated Branches:
refs/heads/1.x 1cbadb08d -> ee3669e42
ARTEMIS-1175 fix memory leak
(cherry picked from commit b136fed48f756071bbf9542e94ae6448c37881f2)
(cherry picked from commit f63ffc7af5d96fd1a6785a12d5ce3ae504831269)
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ee3669e4
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ee3669e4
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ee3669e4
Branch: refs/heads/1.x
Commit: ee3669e422d9e0c5df66c29d44ff4d78ee76ba24
Parents: 1cbadb0
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri May 26 15:58:20 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri May 26 15:58:20 2017 -0400
----------------------------------------------------------------------
.../core/protocol/mqtt/MQTTSessionState.java | 22 +++++++++++++-------
1 file changed, 14 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ee3669e4/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
index 67c2749..b1c83f2 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
@@ -40,7 +40,7 @@ public class MQTTSessionState {
private final ConcurrentMap<String, Map<Long, Integer>> addressMessageMap = new ConcurrentHashMap<>();
- private final Set<Integer> pubRec = new HashSet<>();
+ private final Set<Integer> pubRec = new HashSet<>();
private boolean attached = false;
@@ -127,7 +127,7 @@ public class MQTTSessionState {
public class OutboundStore {
- private HashMap<String, Integer> artemisToMqttMessageMap = new HashMap<>();
+ private HashMap<Pair<Long, Long>, Integer> artemisToMqttMessageMap = new HashMap<>();
private HashMap<Integer, Pair<Long, Long>> mqttToServerIds = new HashMap<>();
@@ -135,9 +135,13 @@ public class MQTTSessionState {
private final AtomicInteger ids = new AtomicInteger(0);
- public int generateMqttId(long serverId, long consumerId) {
+ private Pair<Long, Long> generateKey(long messageId, long consumerID) {
+ return new Pair<>(messageId, consumerID);
+ }
+
+ public int generateMqttId(long messageId, long consumerId) {
synchronized (dataStoreLock) {
- Integer id = artemisToMqttMessageMap.get(consumerId + ":" + serverId);
+ Integer id = artemisToMqttMessageMap.get(generateKey(messageId, consumerId));
if (id == null) {
ids.compareAndSet(Short.MAX_VALUE, 1);
id = ids.addAndGet(1);
@@ -146,18 +150,20 @@ public class MQTTSessionState {
}
}
- public void publish(int mqtt, long serverId, long consumerId) {
+ public void publish(int mqtt, long messageId, long consumerId) {
synchronized (dataStoreLock) {
- artemisToMqttMessageMap.put(consumerId + ":" + serverId, mqtt);
- mqttToServerIds.put(mqtt, new Pair(serverId, consumerId));
+ Pair<Long, Long> key = generateKey(messageId, consumerId);
+ artemisToMqttMessageMap.put(key, mqtt);
+ mqttToServerIds.put(mqtt, key);
}
}
public Pair<Long, Long> publishAckd(int mqtt) {
synchronized (dataStoreLock) {
- Pair p = mqttToServerIds.remove(mqtt);
+ Pair p = mqttToServerIds.remove(mqtt);
if (p != null) {
mqttToServerIds.remove(p.getA());
+ artemisToMqttMessageMap.remove(p);
}
return p;
}