You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by ro...@apache.org on 2017/10/18 23:22:55 UTC
[sling-org-apache-sling-jms] 03/14: SLING-5647 - Provide ActiveMQ
implementation of the MoM API in SLING-5646
This is an automated email from the ASF dual-hosted git repository.
rombert pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-jms.git
commit 2621716131c6af5fb08fa5cd28360c73fa129d5a
Author: Chetan Mehrotra <ch...@apache.org>
AuthorDate: Wed Sep 21 13:28:18 2016 +0000
SLING-5647 - Provide ActiveMQ implementation of the MoM API in SLING-5646
Map passed in add call may be immutable so add internal state on a copy and then ensure that such internal props are not passed to reader.
Also fix the case around casting of numtries
git-svn-id: https://svn.apache.org/repos/asf/sling/trunk@1761729 13f79535-47bb-0310-9956-ffa450edef68
---
.../java/org/apache/sling/jms/JMSQueueManager.java | 19 +++++++++++++++----
.../org/apache/sling/jms/JMSQueueManagerTest.java | 3 ++-
2 files changed, 17 insertions(+), 5 deletions(-)
diff --git a/src/main/java/org/apache/sling/jms/JMSQueueManager.java b/src/main/java/org/apache/sling/jms/JMSQueueManager.java
index 7c7b83b..4fcc904 100644
--- a/src/main/java/org/apache/sling/jms/JMSQueueManager.java
+++ b/src/main/java/org/apache/sling/jms/JMSQueueManager.java
@@ -30,6 +30,7 @@ import java.io.Closeable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -46,6 +47,7 @@ public class JMSQueueManager implements QueueManager {
private static final Logger LOGGER = LoggerFactory.getLogger(JMSQueueManager.class);
private static final String NRETRIES = "_nr";
+ private static final Set<String> INTERNAL_PROPS = Collections.singleton(NRETRIES);
@Reference
private ConnectionFactoryService connectionFactoryService;
@@ -92,8 +94,10 @@ public class JMSQueueManager implements QueueManager {
Session session = null;
try {
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
- message.put(NRETRIES, 0L); // set the number of retries to 0.
- TextMessage textMessage = session.createTextMessage(Json.toJson(message));
+ //TODO Instead of copy do addition at JSON writer level
+ Map<String, Object> msgCopy = new HashMap<>(message);
+ msgCopy.put(NRETRIES, 0L); // set the number of retries to 0.
+ TextMessage textMessage = session.createTextMessage(Json.toJson(msgCopy));
textMessage.setJMSType(JMSMessageTypes.JSON.toString());
LOGGER.info("Sending to {} message {} ", name, textMessage);
session.createProducer(session.createQueue(name.toString())).send(textMessage);
@@ -189,6 +193,13 @@ public class JMSQueueManager implements QueueManager {
}
}
+ private static Map<String,Object> filter(Map<String, Object> map) {
+ //Filter out internal properties
+ for (String internalKey : INTERNAL_PROPS){
+ map.remove(internalKey);
+ }
+ return map;
+ }
public static class JMSQueueSession implements Closeable, MessageListener {
private static final Logger LOGGER = LoggerFactory.getLogger(JMSQueueSession.class);
@@ -248,7 +259,7 @@ public class JMSQueueManager implements QueueManager {
final Map<String, Object> mapMessage = Json.toMap(textMessage.getText());
Types.QueueName queueName = Types.queueName(queue.getQueueName());
if (queueName.equals(name) && messageFilter.accept(queueName, mapMessage)) {
- queueReader.onMessage(queueName, mapMessage);
+ queueReader.onMessage(queueName, filter(mapMessage));
session.commit();
// all ok.
committed = true;
@@ -260,7 +271,7 @@ public class JMSQueueManager implements QueueManager {
LOGGER.info("QueueReader requested requeue of message ", e);
if (retryByRequeue && textMessage != null) {
Map<String, Object> mapMessage = Json.toMap(textMessage.getText());
- if ((int)mapMessage.get(NRETRIES) < maxRetries) {
+ if ((long)mapMessage.get(NRETRIES) < maxRetries) {
mapMessage.put(NRETRIES, ((long) mapMessage.get(NRETRIES)) + 1);
TextMessage retryMessage = session.createTextMessage(Json.toJson(mapMessage));
retryMessage.setJMSType(JMSMessageTypes.JSON.toString());
diff --git a/src/test/java/org/apache/sling/jms/JMSQueueManagerTest.java b/src/test/java/org/apache/sling/jms/JMSQueueManagerTest.java
index ddf20a7..4c3aa36 100644
--- a/src/test/java/org/apache/sling/jms/JMSQueueManagerTest.java
+++ b/src/test/java/org/apache/sling/jms/JMSQueueManagerTest.java
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
import javax.jms.*;
import java.lang.reflect.Field;
+import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
@@ -204,7 +205,7 @@ public class JMSQueueManagerTest {
// make the test map unique, if the dequeue fails, then the message wont be the first.
testMap.put("testing", queueName + System.currentTimeMillis());
LOGGER.info("Sending message to queue");
- jmsQueueManager.add(Types.queueName(queueName), testMap);
+ jmsQueueManager.add(Types.queueName(queueName), Collections.unmodifiableMap(testMap));
LOGGER.info("Sent message to queue ... receiving from queue");
checkMessagesInQueue(queueName, 1);
--
To stop receiving notification emails like this one, please contact
"commits@sling.apache.org" <co...@sling.apache.org>.