You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by ro...@apache.org on 2017/10/18 23:22:55 UTC

[sling-org-apache-sling-jms] 03/14: SLING-5647 - Provide ActiveMQ implementation of the MoM API in SLING-5646

This is an automated email from the ASF dual-hosted git repository.

rombert pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-jms.git

commit 2621716131c6af5fb08fa5cd28360c73fa129d5a
Author: Chetan Mehrotra <ch...@apache.org>
AuthorDate: Wed Sep 21 13:28:18 2016 +0000

    SLING-5647 - Provide ActiveMQ implementation of the MoM API in SLING-5646
    
    Map passed in add call may be immutable so add internal state on a copy and then ensure that such internal props are not passed to reader.
    
    Also fix the case around casting of numtries
    
    git-svn-id: https://svn.apache.org/repos/asf/sling/trunk@1761729 13f79535-47bb-0310-9956-ffa450edef68
---
 .../java/org/apache/sling/jms/JMSQueueManager.java    | 19 +++++++++++++++----
 .../org/apache/sling/jms/JMSQueueManagerTest.java     |  3 ++-
 2 files changed, 17 insertions(+), 5 deletions(-)

diff --git a/src/main/java/org/apache/sling/jms/JMSQueueManager.java b/src/main/java/org/apache/sling/jms/JMSQueueManager.java
index 7c7b83b..4fcc904 100644
--- a/src/main/java/org/apache/sling/jms/JMSQueueManager.java
+++ b/src/main/java/org/apache/sling/jms/JMSQueueManager.java
@@ -30,6 +30,7 @@ import java.io.Closeable;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -46,6 +47,7 @@ public class JMSQueueManager implements QueueManager {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(JMSQueueManager.class);
     private static final String NRETRIES = "_nr";
+    private static final Set<String> INTERNAL_PROPS = Collections.singleton(NRETRIES);
 
     @Reference
     private ConnectionFactoryService connectionFactoryService;
@@ -92,8 +94,10 @@ public class JMSQueueManager implements QueueManager {
         Session session = null;
         try {
             session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-            message.put(NRETRIES, 0L); // set the number of retries to 0.
-            TextMessage textMessage = session.createTextMessage(Json.toJson(message));
+            //TODO Instead of copy do addition at JSON writer level
+            Map<String, Object> msgCopy = new HashMap<>(message);
+            msgCopy.put(NRETRIES, 0L); // set the number of retries to 0.
+            TextMessage textMessage = session.createTextMessage(Json.toJson(msgCopy));
             textMessage.setJMSType(JMSMessageTypes.JSON.toString());
             LOGGER.info("Sending to {} message {} ", name, textMessage);
             session.createProducer(session.createQueue(name.toString())).send(textMessage);
@@ -189,6 +193,13 @@ public class JMSQueueManager implements QueueManager {
         }
     }
 
+    private static Map<String,Object> filter(Map<String, Object> map) {
+        //Filter out internal properties
+        for (String internalKey : INTERNAL_PROPS){
+            map.remove(internalKey);
+        }
+        return map;
+    }
 
     public static class JMSQueueSession implements Closeable, MessageListener {
         private static final Logger LOGGER = LoggerFactory.getLogger(JMSQueueSession.class);
@@ -248,7 +259,7 @@ public class JMSQueueManager implements QueueManager {
                             final Map<String, Object> mapMessage = Json.toMap(textMessage.getText());
                             Types.QueueName queueName = Types.queueName(queue.getQueueName());
                             if (queueName.equals(name) && messageFilter.accept(queueName, mapMessage)) {
-                                queueReader.onMessage(queueName, mapMessage);
+                                queueReader.onMessage(queueName, filter(mapMessage));
                                 session.commit();
                                 // all ok.
                                 committed = true;
@@ -260,7 +271,7 @@ public class JMSQueueManager implements QueueManager {
                     LOGGER.info("QueueReader requested requeue of message ", e);
                     if (retryByRequeue && textMessage != null) {
                         Map<String, Object> mapMessage = Json.toMap(textMessage.getText());
-                        if ((int)mapMessage.get(NRETRIES) < maxRetries) {
+                        if ((long)mapMessage.get(NRETRIES) < maxRetries) {
                             mapMessage.put(NRETRIES, ((long) mapMessage.get(NRETRIES)) + 1);
                             TextMessage retryMessage = session.createTextMessage(Json.toJson(mapMessage));
                             retryMessage.setJMSType(JMSMessageTypes.JSON.toString());
diff --git a/src/test/java/org/apache/sling/jms/JMSQueueManagerTest.java b/src/test/java/org/apache/sling/jms/JMSQueueManagerTest.java
index ddf20a7..4c3aa36 100644
--- a/src/test/java/org/apache/sling/jms/JMSQueueManagerTest.java
+++ b/src/test/java/org/apache/sling/jms/JMSQueueManagerTest.java
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.jms.*;
 import java.lang.reflect.Field;
+import java.util.Collections;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Map;
@@ -204,7 +205,7 @@ public class JMSQueueManagerTest {
         // make the test map unique, if the dequeue fails, then the message wont be the first.
         testMap.put("testing", queueName + System.currentTimeMillis());
         LOGGER.info("Sending message to queue");
-        jmsQueueManager.add(Types.queueName(queueName), testMap);
+        jmsQueueManager.add(Types.queueName(queueName), Collections.unmodifiableMap(testMap));
         LOGGER.info("Sent message to queue ... receiving from queue");
 
         checkMessagesInQueue(queueName, 1);

-- 
To stop receiving notification emails like this one, please contact
"commits@sling.apache.org" <co...@sling.apache.org>.