You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by ro...@apache.org on 2017/11/07 09:50:20 UTC

[sling-org-apache-sling-jms] annotated tag org.apache.sling.jms-1.0.0 created (now 93dbe58)

This is an automated email from the ASF dual-hosted git repository.

rombert pushed a change to annotated tag org.apache.sling.jms-1.0.0
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-jms.git.


      at 93dbe58  (tag)
 tagging d346f99d91028874e85d6bfedd9b1acd758fda25 (commit)
      by Ian Boston
      on Wed Oct 5 10:03:27 2016 +0000

- Log -----------------------------------------------------------------
org.apache.sling.jms-1.0.0
-----------------------------------------------------------------------

This annotated tag includes the following new commits:

     new 0d2ab66  SLING-5646 MoM API and JMS implementation with example usage by Jobs implementation. Squashes 27 commits from https://github.com/ieb/sling/tree/jobs_28 as follows. Added first stab at a message oriented job subsystem Added basic implementation for the manager keeping queue implementation details abstracted Added ActiveMQ implementation for queues and topics, and fixed a number of the the SPI interfaces in the process Basic Test coverate for OOT JMS Broker Extracted a MOM [...]
     new a41fae5  fixing potential race condition causing a NPE inside the JMS provider, failing tests intermittently
     new 47f7bfd  SLING-5647 - Provide ActiveMQ implementation of the MoM API in SLING-5646
     new d62b783  SLING-5645 Removed my signature from all source code
     new f1a752c  SLING-5645 Embedded gson added default exclude all exports to jms and services-it bundles
     new 69841da  SLING-5645 Made the ActiveMQConfigurationFactory require config to enable other JMS implementations to be used
     new fc3453a  [maven-release-plugin] prepare release org.apache.sling.jms-1.0.0
     new d346f99  [maven-release-plugin] copy for tag org.apache.sling.jms-1.0.0

The 8 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


-- 
To stop receiving notification emails like this one, please contact
['"commits@sling.apache.org" <co...@sling.apache.org>'].

[sling-org-apache-sling-jms] 02/08: fixing potential race condition causing a NPE inside the JMS provider, failing tests intermittently

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rombert pushed a commit to annotated tag org.apache.sling.jms-1.0.0
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-jms.git

commit a41fae55c3bdbdf2901a4946287e320af4ac5aaa
Author: Ian Boston <ie...@apache.org>
AuthorDate: Wed Sep 21 13:14:51 2016 +0000

    fixing potential race condition causing a NPE inside the JMS provider, failing tests intermittently
    
    git-svn-id: https://svn.apache.org/repos/asf/sling/trunk/contrib/commons/mom/jms@1761725 13f79535-47bb-0310-9956-ffa450edef68
---
 src/main/java/org/apache/sling/jms/JMSQueueManager.java | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/src/main/java/org/apache/sling/jms/JMSQueueManager.java b/src/main/java/org/apache/sling/jms/JMSQueueManager.java
index 59d888d..7c7b83b 100644
--- a/src/main/java/org/apache/sling/jms/JMSQueueManager.java
+++ b/src/main/java/org/apache/sling/jms/JMSQueueManager.java
@@ -175,7 +175,11 @@ public class JMSQueueManager implements QueueManager {
         }
 
         public void close() {
-            session.close();
+            try {
+                session.close();
+            } catch ( Exception e ) {
+                LOGGER.debug("Exception when closing session.",e);
+            }
         }
     }
 

-- 
To stop receiving notification emails like this one, please contact
"commits@sling.apache.org" <co...@sling.apache.org>.

[sling-org-apache-sling-jms] 04/08: SLING-5645 Removed my signature from all source code

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rombert pushed a commit to annotated tag org.apache.sling.jms-1.0.0
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-jms.git

commit d62b783dc0dc30805cf479c464ca79a148875dca
Author: Ian Boston <ie...@apache.org>
AuthorDate: Mon Oct 3 15:51:56 2016 +0000

    SLING-5645 Removed my signature from all source code
    
    git-svn-id: https://svn.apache.org/repos/asf/sling/trunk/contrib/commons/mom/jms@1763175 13f79535-47bb-0310-9956-ffa450edef68
---
 src/main/java/org/apache/sling/jms/ConnectionFactoryService.java         | 1 -
 src/main/java/org/apache/sling/jms/JMSMessageTypes.java                  | 1 -
 src/main/java/org/apache/sling/jms/JMSQueueManager.java                  | 1 -
 src/main/java/org/apache/sling/jms/JMSTopicManager.java                  | 1 -
 src/main/java/org/apache/sling/jms/Json.java                             | 1 -
 .../java/org/apache/sling/amq/ActiveMQConnectionFactoryServiceTest.java  | 1 -
 src/test/java/org/apache/sling/jms/JMSQueueManagerTest.java              | 1 -
 src/test/java/org/apache/sling/jms/JMSTopicManagerTest.java              | 1 -
 src/test/java/org/apache/sling/jms/JsonTest.java                         | 1 -
 9 files changed, 9 deletions(-)

diff --git a/src/main/java/org/apache/sling/jms/ConnectionFactoryService.java b/src/main/java/org/apache/sling/jms/ConnectionFactoryService.java
index e14a3f7..396896c 100644
--- a/src/main/java/org/apache/sling/jms/ConnectionFactoryService.java
+++ b/src/main/java/org/apache/sling/jms/ConnectionFactoryService.java
@@ -21,7 +21,6 @@ package org.apache.sling.jms;
 import javax.jms.ConnectionFactory;
 
 /**
- * Created by ieb on 30/03/2016.
  * Implementations of this service provide JMS Connection factories. In general implementations should work OOTB with no
  * further configuration and provide an efficient JMS Connection Factory that allows sessions to be created with minimal
  * overhead.
diff --git a/src/main/java/org/apache/sling/jms/JMSMessageTypes.java b/src/main/java/org/apache/sling/jms/JMSMessageTypes.java
index a6fcf29..b560a5a 100644
--- a/src/main/java/org/apache/sling/jms/JMSMessageTypes.java
+++ b/src/main/java/org/apache/sling/jms/JMSMessageTypes.java
@@ -19,7 +19,6 @@
 package org.apache.sling.jms;
 
 /**
- * Created by ieb on 04/04/2016.
  */
 public enum JMSMessageTypes {
     /**
diff --git a/src/main/java/org/apache/sling/jms/JMSQueueManager.java b/src/main/java/org/apache/sling/jms/JMSQueueManager.java
index 4fcc904..ff9bfbc 100644
--- a/src/main/java/org/apache/sling/jms/JMSQueueManager.java
+++ b/src/main/java/org/apache/sling/jms/JMSQueueManager.java
@@ -34,7 +34,6 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
- * Created by ieb on 30/03/2016.
  * A JMS implementation of a QueueManager. It will allow callers to add messages to named queues, and consumers to read
  * messages from named queues in order. The component uses a single connection to the JMS broker, but dedicated sessions
  * for each send and for each Queue reader.
diff --git a/src/main/java/org/apache/sling/jms/JMSTopicManager.java b/src/main/java/org/apache/sling/jms/JMSTopicManager.java
index e3b0588..e9b7af8 100644
--- a/src/main/java/org/apache/sling/jms/JMSTopicManager.java
+++ b/src/main/java/org/apache/sling/jms/JMSTopicManager.java
@@ -40,7 +40,6 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
- * Created by ieb on 30/03/2016.
  * This class provides support for sending messages to topics over JMS and subscribing to topics. It uses the ConnectionFactoryService
  * to interact with JMS. There is nothing in
  */
diff --git a/src/main/java/org/apache/sling/jms/Json.java b/src/main/java/org/apache/sling/jms/Json.java
index b4715d2..c769a89 100644
--- a/src/main/java/org/apache/sling/jms/Json.java
+++ b/src/main/java/org/apache/sling/jms/Json.java
@@ -31,7 +31,6 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * Created by ieb on 31/03/2016.
  */
 public class Json {
 
diff --git a/src/test/java/org/apache/sling/amq/ActiveMQConnectionFactoryServiceTest.java b/src/test/java/org/apache/sling/amq/ActiveMQConnectionFactoryServiceTest.java
index 0a1da45..479f036 100644
--- a/src/test/java/org/apache/sling/amq/ActiveMQConnectionFactoryServiceTest.java
+++ b/src/test/java/org/apache/sling/amq/ActiveMQConnectionFactoryServiceTest.java
@@ -32,7 +32,6 @@ import java.util.Map;
 import static org.junit.Assert.*;
 
 /**
- * Created by ieb on 31/03/2016.
  */
 public class ActiveMQConnectionFactoryServiceTest {
 
diff --git a/src/test/java/org/apache/sling/jms/JMSQueueManagerTest.java b/src/test/java/org/apache/sling/jms/JMSQueueManagerTest.java
index 4c3aa36..c391004 100644
--- a/src/test/java/org/apache/sling/jms/JMSQueueManagerTest.java
+++ b/src/test/java/org/apache/sling/jms/JMSQueueManagerTest.java
@@ -46,7 +46,6 @@ import java.util.Map;
 import static org.junit.Assert.*;
 
 /**
- * Created by ieb on 01/04/2016.
  */
 public class JMSQueueManagerTest {
 
diff --git a/src/test/java/org/apache/sling/jms/JMSTopicManagerTest.java b/src/test/java/org/apache/sling/jms/JMSTopicManagerTest.java
index 71004e7..b984c5d 100644
--- a/src/test/java/org/apache/sling/jms/JMSTopicManagerTest.java
+++ b/src/test/java/org/apache/sling/jms/JMSTopicManagerTest.java
@@ -45,7 +45,6 @@ import java.util.Set;
 import static org.junit.Assert.*;
 
 /**
- * Created by ieb on 31/03/2016.
  */
 public class JMSTopicManagerTest {
 
diff --git a/src/test/java/org/apache/sling/jms/JsonTest.java b/src/test/java/org/apache/sling/jms/JsonTest.java
index 6880c5b..919f527 100644
--- a/src/test/java/org/apache/sling/jms/JsonTest.java
+++ b/src/test/java/org/apache/sling/jms/JsonTest.java
@@ -29,7 +29,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Created by ieb on 31/03/2016.
  */
 public class JsonTest {
 

-- 
To stop receiving notification emails like this one, please contact
"commits@sling.apache.org" <co...@sling.apache.org>.

[sling-org-apache-sling-jms] 07/08: [maven-release-plugin] prepare release org.apache.sling.jms-1.0.0

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rombert pushed a commit to annotated tag org.apache.sling.jms-1.0.0
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-jms.git

commit fc3453a712e5b81ca718cc6e33fbcabf98fcee84
Author: Ian Boston <ie...@apache.org>
AuthorDate: Wed Oct 5 10:03:14 2016 +0000

    [maven-release-plugin] prepare release org.apache.sling.jms-1.0.0
    
    git-svn-id: https://svn.apache.org/repos/asf/sling/trunk/contrib/commons/mom/jms@1763383 13f79535-47bb-0310-9956-ffa450edef68
---
 pom.xml | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git a/pom.xml b/pom.xml
index 715ae96..a95d1f0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -29,7 +29,7 @@
 
     <artifactId>org.apache.sling.jms</artifactId>
     <packaging>bundle</packaging>
-    <version>0.0.1-SNAPSHOT</version>
+    <version>1.0.0</version>
 
     <name>Apache Sling MoM Implementation using JMS with AMQ</name>
     <description>
@@ -37,9 +37,9 @@
     </description>
 
     <scm>
-        <connection>scm:svn:http://svn.apache.org/repos/asf/sling/trunk/contrib/commons/mom/jms</connection>
-        <developerConnection>scm:svn:https://svn.apache.org/repos/asf/sling/trunk/contrib/commons/mom/jms</developerConnection>
-        <url>http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/jms</url>
+        <connection>scm:svn:http://svn.apache.org/repos/asf/sling/tags/org.apache.sling.jms-1.0.0</connection>
+        <developerConnection>scm:svn:https://svn.apache.org/repos/asf/sling/tags/org.apache.sling.jms-1.0.0</developerConnection>
+        <url>http://svn.apache.org/viewvc/sling/tags/org.apache.sling.jms-1.0.0</url>
     </scm>
 
     <properties>
@@ -106,7 +106,7 @@
                             *
                         </Import-Package>
                         -->
-                        <Export-Package></Export-Package>
+                        <Export-Package />
                     </instructions>
                 </configuration>
             </plugin>
@@ -137,7 +137,7 @@
         <dependency>
             <groupId>org.apache.sling</groupId>
             <artifactId>org.apache.sling.mom</artifactId>
-            <version>0.0.1-SNAPSHOT</version>
+            <version>1.0.0</version>
             <scope>provided</scope>
         </dependency>
 

-- 
To stop receiving notification emails like this one, please contact
"commits@sling.apache.org" <co...@sling.apache.org>.

[sling-org-apache-sling-jms] 08/08: [maven-release-plugin] copy for tag org.apache.sling.jms-1.0.0

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rombert pushed a commit to annotated tag org.apache.sling.jms-1.0.0
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-jms.git

commit d346f99d91028874e85d6bfedd9b1acd758fda25
Author: Ian Boston <ie...@apache.org>
AuthorDate: Wed Oct 5 10:03:27 2016 +0000

    [maven-release-plugin] copy for tag org.apache.sling.jms-1.0.0
    
    git-svn-id: https://svn.apache.org/repos/asf/sling/tags/org.apache.sling.jms-1.0.0@1763384 13f79535-47bb-0310-9956-ffa450edef68

-- 
To stop receiving notification emails like this one, please contact
"commits@sling.apache.org" <co...@sling.apache.org>.

[sling-org-apache-sling-jms] 06/08: SLING-5645 Made the ActiveMQConfigurationFactory require config to enable other JMS implementations to be used

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rombert pushed a commit to annotated tag org.apache.sling.jms-1.0.0
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-jms.git

commit 69841da26f4e401b07cd1c821c1d2742bdf8e659
Author: Ian Boston <ie...@apache.org>
AuthorDate: Tue Oct 4 15:47:55 2016 +0000

    SLING-5645 Made the ActiveMQConfigurationFactory require config to enable other JMS implementations to be used
    
    git-svn-id: https://svn.apache.org/repos/asf/sling/trunk/contrib/commons/mom/jms@1763291 13f79535-47bb-0310-9956-ffa450edef68
---
 .../apache/sling/amq/ActiveMQConnectionFactoryService.java   |  8 ++------
 .../org/apache/sling/jms/{ => impl}/JMSMessageTypes.java     |  2 +-
 .../org/apache/sling/jms/{ => impl}/JMSQueueManager.java     |  3 ++-
 .../org/apache/sling/jms/{ => impl}/JMSTopicManager.java     |  3 ++-
 src/main/java/org/apache/sling/jms/{ => impl}/Json.java      |  2 +-
 .../sling/jms/{JMSMessageTypes.java => package-info.java}    | 12 +++++-------
 src/test/java/org/apache/sling/jms/JMSQueueManagerTest.java  |  1 +
 src/test/java/org/apache/sling/jms/JMSTopicManagerTest.java  |  1 +
 src/test/java/org/apache/sling/jms/JsonTest.java             |  1 +
 9 files changed, 16 insertions(+), 17 deletions(-)

diff --git a/src/main/java/org/apache/sling/amq/ActiveMQConnectionFactoryService.java b/src/main/java/org/apache/sling/amq/ActiveMQConnectionFactoryService.java
index bb3798e..0fdca79 100644
--- a/src/main/java/org/apache/sling/amq/ActiveMQConnectionFactoryService.java
+++ b/src/main/java/org/apache/sling/amq/ActiveMQConnectionFactoryService.java
@@ -19,11 +19,7 @@
 package org.apache.sling.amq;
 
 import org.apache.activemq.pool.PooledConnectionFactory;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Property;
-import org.apache.felix.scr.annotations.Service;
+import org.apache.felix.scr.annotations.*;
 import org.apache.sling.jms.ConnectionFactoryService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,7 +56,7 @@ import javax.jms.ConnectionFactory;
  * properties:/foo/bar.properties uses a properties file as per http://activemq.apache.org/broker-properties-uri.html
  *
  */
-@Component(immediate = true, metatype = true)
+@Component(immediate = true, metatype = true, policy = ConfigurationPolicy.REQUIRE)
 @Service(value=ConnectionFactoryService.class)
 public class ActiveMQConnectionFactoryService implements ConnectionFactoryService {
 
diff --git a/src/main/java/org/apache/sling/jms/JMSMessageTypes.java b/src/main/java/org/apache/sling/jms/impl/JMSMessageTypes.java
similarity index 96%
copy from src/main/java/org/apache/sling/jms/JMSMessageTypes.java
copy to src/main/java/org/apache/sling/jms/impl/JMSMessageTypes.java
index b560a5a..3f12bd9 100644
--- a/src/main/java/org/apache/sling/jms/JMSMessageTypes.java
+++ b/src/main/java/org/apache/sling/jms/impl/JMSMessageTypes.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.jms;
+package org.apache.sling.jms.impl;
 
 /**
  */
diff --git a/src/main/java/org/apache/sling/jms/JMSQueueManager.java b/src/main/java/org/apache/sling/jms/impl/JMSQueueManager.java
similarity index 99%
rename from src/main/java/org/apache/sling/jms/JMSQueueManager.java
rename to src/main/java/org/apache/sling/jms/impl/JMSQueueManager.java
index ff9bfbc..37edecf 100644
--- a/src/main/java/org/apache/sling/jms/JMSQueueManager.java
+++ b/src/main/java/org/apache/sling/jms/impl/JMSQueueManager.java
@@ -16,9 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.jms;
+package org.apache.sling.jms.impl;
 
 import org.apache.felix.scr.annotations.*;
+import org.apache.sling.jms.ConnectionFactoryService;
 import org.apache.sling.mom.*;
 import org.osgi.framework.ServiceReference;
 import org.slf4j.Logger;
diff --git a/src/main/java/org/apache/sling/jms/JMSTopicManager.java b/src/main/java/org/apache/sling/jms/impl/JMSTopicManager.java
similarity index 99%
rename from src/main/java/org/apache/sling/jms/JMSTopicManager.java
rename to src/main/java/org/apache/sling/jms/impl/JMSTopicManager.java
index e9b7af8..ebc2cc8 100644
--- a/src/main/java/org/apache/sling/jms/JMSTopicManager.java
+++ b/src/main/java/org/apache/sling/jms/impl/JMSTopicManager.java
@@ -16,9 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.jms;
+package org.apache.sling.jms.impl;
 
 import org.apache.felix.scr.annotations.*;
+import org.apache.sling.jms.ConnectionFactoryService;
 import org.apache.sling.mom.*;
 import org.osgi.framework.ServiceReference;
 import org.slf4j.Logger;
diff --git a/src/main/java/org/apache/sling/jms/Json.java b/src/main/java/org/apache/sling/jms/impl/Json.java
similarity index 98%
rename from src/main/java/org/apache/sling/jms/Json.java
rename to src/main/java/org/apache/sling/jms/impl/Json.java
index c769a89..b71e3a8 100644
--- a/src/main/java/org/apache/sling/jms/Json.java
+++ b/src/main/java/org/apache/sling/jms/impl/Json.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.jms;
+package org.apache.sling.jms.impl;
 
 import com.google.gson.Gson;
 import com.google.gson.JsonArray;
diff --git a/src/main/java/org/apache/sling/jms/JMSMessageTypes.java b/src/main/java/org/apache/sling/jms/package-info.java
similarity index 89%
rename from src/main/java/org/apache/sling/jms/JMSMessageTypes.java
rename to src/main/java/org/apache/sling/jms/package-info.java
index b560a5a..3b47c12 100644
--- a/src/main/java/org/apache/sling/jms/JMSMessageTypes.java
+++ b/src/main/java/org/apache/sling/jms/package-info.java
@@ -16,13 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.jms;
 
 /**
  */
-public enum JMSMessageTypes {
-    /**
-     * A text message containing json.
-     */
-    JSON
-}
+
+@Version("1.0.0")
+package org.apache.sling.jms;
+
+import aQute.bnd.annotation.Version;
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/jms/JMSQueueManagerTest.java b/src/test/java/org/apache/sling/jms/JMSQueueManagerTest.java
index c391004..b0e2744 100644
--- a/src/test/java/org/apache/sling/jms/JMSQueueManagerTest.java
+++ b/src/test/java/org/apache/sling/jms/JMSQueueManagerTest.java
@@ -21,6 +21,7 @@ package org.apache.sling.jms;
 
 import org.apache.sling.amq.ActiveMQConnectionFactoryService;
 import org.apache.sling.amq.ActiveMQConnectionFactoryServiceTest;
+import org.apache.sling.jms.impl.JMSQueueManager;
 import org.apache.sling.mom.*;
 import org.junit.After;
 import org.junit.Before;
diff --git a/src/test/java/org/apache/sling/jms/JMSTopicManagerTest.java b/src/test/java/org/apache/sling/jms/JMSTopicManagerTest.java
index b984c5d..59bca13 100644
--- a/src/test/java/org/apache/sling/jms/JMSTopicManagerTest.java
+++ b/src/test/java/org/apache/sling/jms/JMSTopicManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.sling.jms;
 
 import org.apache.sling.amq.ActiveMQConnectionFactoryService;
 import org.apache.sling.amq.ActiveMQConnectionFactoryServiceTest;
+import org.apache.sling.jms.impl.JMSTopicManager;
 import org.apache.sling.mom.*;
 import org.junit.After;
 import org.junit.Before;
diff --git a/src/test/java/org/apache/sling/jms/JsonTest.java b/src/test/java/org/apache/sling/jms/JsonTest.java
index 919f527..12dda05 100644
--- a/src/test/java/org/apache/sling/jms/JsonTest.java
+++ b/src/test/java/org/apache/sling/jms/JsonTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.sling.jms;
 
+import org.apache.sling.jms.impl.Json;
 import org.junit.Before;
 import org.junit.Test;
 

-- 
To stop receiving notification emails like this one, please contact
"commits@sling.apache.org" <co...@sling.apache.org>.

[sling-org-apache-sling-jms] 05/08: SLING-5645 Embedded gson added default exclude all exports to jms and services-it bundles

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rombert pushed a commit to annotated tag org.apache.sling.jms-1.0.0
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-jms.git

commit f1a752c7a235179ad990749e76209d7685704ed0
Author: Ian Boston <ie...@apache.org>
AuthorDate: Mon Oct 3 15:52:05 2016 +0000

    SLING-5645 Embedded gson added default exclude all exports to jms and services-it bundles
    
    git-svn-id: https://svn.apache.org/repos/asf/sling/trunk/contrib/commons/mom/jms@1763176 13f79535-47bb-0310-9956-ffa450edef68
---
 pom.xml | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/pom.xml b/pom.xml
index 95a7a87..715ae96 100644
--- a/pom.xml
+++ b/pom.xml
@@ -73,7 +73,8 @@
                             activemq-protobuf,
                             activemq-kahadb-store,
                             activemq-jms-pool,
-                            hawtbuf</Embed-Dependency>
+                            hawtbuf,
+                            gson</Embed-Dependency>
                         <Import-Package>
                             org.apache.activemq.jaas*;resolution:=optional,
                             org.apache.commons.net.ftp*;resolution:=optional,
@@ -83,7 +84,6 @@
                             javax.jmdns*;resolution:=optional,
                             !org.apache.maven*,
                             *
-
                         </Import-Package>
                         <!--
                         <Import-Package>com.thoughtworks.xstream*;resolution:=optional,
@@ -106,6 +106,7 @@
                             *
                         </Import-Package>
                         -->
+                        <Export-Package></Export-Package>
                     </instructions>
                 </configuration>
             </plugin>

-- 
To stop receiving notification emails like this one, please contact
"commits@sling.apache.org" <co...@sling.apache.org>.

[sling-org-apache-sling-jms] 01/08: SLING-5646 MoM API and JMS implementation with example usage by Jobs implementation. Squashes 27 commits from https://github.com/ieb/sling/tree/jobs_28 as follows. Added first stab at a message oriented job subsystem Added basic implementation for the manager keeping queue implementation details abstracted Added ActiveMQ implementation for queues and topics, and fixed a number of the the SPI interfaces in the process Basic Test coverate for OOT JMS Broker Extracted a MOM API with no Jobs o [...]

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rombert pushed a commit to annotated tag org.apache.sling.jms-1.0.0
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-jms.git

commit 0d2ab666035ea47563339bfce27ed914ae8c2a16
Author: Ian Boston <ie...@apache.org>
AuthorDate: Wed Jul 27 12:10:12 2016 +0000

    SLING-5646 MoM API and JMS implementation with example usage by Jobs implementation.
    Squashes 27 commits from https://github.com/ieb/sling/tree/jobs_28 as follows.
    Added first stab at a message oriented job subsystem
    Added basic implementation for the manager keeping queue implementation details abstracted
    Added ActiveMQ implementation for queues and topics, and fixed a number of the the SPI interfaces in the process
    Basic Test coverate for OOT JMS Broker
    Extracted a MOM API with no Jobs or JMS references, test coverage for ActiveMQ impl is 100% class, 93% method, 75% line
    Added missing license headers, documentation and cleaned out unused  interfaces
    Fixed JMS Transaction issue found by @tmaret
    Coverage for the majority of the jobs code is complete
    Basic unit test coverage complete, core has 94% by lines, AMQ 74%, 100% classes and methods
    Added testing environment for a runnign server, Not working yet
    Added ability for detect when the OSGi container has completed bundle startup without having to perform http requests
    Start at IT testing with Crankstart
    Fixed issues with shutdown inside a Crankstart container
    Working Crankstart IT framework
    Version that uses Q->Jobs->JobConsumer pattern
    Added a Queue Factory to allow configuration of multiple queues between the MOM API and Job Subsystem and move JobConsumers to register with a Job type
    Migrated Subscribers and QueueReaders to a OSGi whiteboard pattern after discussion on Sling Dev
    Changes the JobConsumer to use a Callback rather than return a job. This was suggested offlist by others in Adobe as a way of improving resource consumption
    Added Types to improve type safety in certain areas after suggestions offlist
    Fixed issue with OSGi startup in IntelliJ caused by version 4 of the
    Felix framework bundle being present inside the maven pom. Strangely a
    command line build was not impacted.
    Added integration test bundle to test service. Adjusted some of the APIs to make using the Job Sub System easier
    Integration tests now starting jobs from messages
    Fixed Startup to work in real Sling/AEM container. The Active MQ OSGi bundle contains additional dependencies that cause all sorts of problems, AMQ is now being embedded into the AMQ MOM Impl bundle.
    Fixed Queue expriy bug and added AEM Fiddle to run jobs
    Added Documentation for configuration and default sample configuration
    Added Explicit requeue mechanims rather than relying on AMQ's requeue capabilities
    Moved MoM to new Home
    
    git-svn-id: https://svn.apache.org/repos/asf/sling/trunk/contrib/commons/mom/jms@1754255 13f79535-47bb-0310-9956-ffa450edef68
---
 .gitignore                                         |   1 +
 README.md                                          |  79 ++++++
 pom.xml                                            | 237 ++++++++++++++++
 .../amq/ActiveMQConnectionFactoryService.java      | 106 ++++++++
 .../apache/sling/jms/ConnectionFactoryService.java |  36 +++
 .../java/org/apache/sling/jms/JMSMessageTypes.java |  29 ++
 .../java/org/apache/sling/jms/JMSQueueManager.java | 289 ++++++++++++++++++++
 .../java/org/apache/sling/jms/JMSTopicManager.java | 248 +++++++++++++++++
 src/main/java/org/apache/sling/jms/Json.java       |  92 +++++++
 .../resources/org/apache/sling/amq/activemq.xml    | 170 ++++++++++++
 .../org/apache/sling/amq/credentials.properties    |  22 ++
 src/main/resources/org/apache/sling/amq/jetty.xml  | 144 ++++++++++
 .../amq/ActiveMQConnectionFactoryServiceTest.java  |  81 ++++++
 .../org/apache/sling/jms/JMSQueueManagerTest.java  | 301 +++++++++++++++++++++
 .../org/apache/sling/jms/JMSTopicManagerTest.java  | 235 ++++++++++++++++
 src/test/java/org/apache/sling/jms/JsonTest.java   | 118 ++++++++
 16 files changed, 2188 insertions(+)

diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..0d18955
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+activemq-data
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..d95f6d2
--- /dev/null
+++ b/README.md
@@ -0,0 +1,79 @@
+# Message Oriented Middleware API implementation using Active MQ.
+
+This bundle implements the MoM API using Active MQ. It supports both Pub/Sub and Queue patterns and will run out of the
+box, embedded or connected to a dedicated cluster.
+
+## Out of the Box.
+
+As the name suggests, no configuration is required, the components will start, create an ActiveMQ instance embedded inside
+the OSGi container and run. On restart, normal or after a crash, provided the working directory is not changed or modified, 
+the ActiveMQ embedded server will restart and resume operations. In the event the JVM crashes, ActiveMQ will perform recovery
+ by replaying its journal. The embedded server uses KahaDB to store data on local disk.
+ 
+## Embedded with custom configuration.
+
+AMQ can be run embedded with custom configuration to allow a cluster of Sling instances for form a multi master AMQ cluster with each 
+Sling instance embedding its own AMQ broker. This is achieved via OSGi configuration. (ToDo: Config + Doc)
+
+## External AMQ Broker cluster.
+
+The bundle can be run to use an external AMQ Broker cluster, maintained and setup separately from the Sling cluster. To do this, modify the
+Broker URL via OSGi configuration.
+
+# Implementation details.
+
+## AMQ ConnectionFactory.
+
+Running AMQ inside OSGi is very simple. All that is required is the AMQ dependencies, and instancing an AQM PooledConnectionFactory with a
+default localhost url. vm://localhost:61616. The PooledConnectionFactory will trigger the creation of an AMQ Broker if one is not present
+and AMQ will run normally. This service implements an internal service API org.apache.sling.jms.ConnectionFactoryService, which enables 
+consumers to get a JMS ConnectionFactory.
+
+## MoM API implementation
+
+The MoM API implementation uses the ConnectionFactory service to get a JMS connection. It opens JMS sessions using that connection factory
+and implements the methods in the API. JMS Support both Pub/Sub and Queue patterns in the MoM API without much additional work. 
+The JMS sessions are single threaded, so care is taken not to share between threads or cause throughput issues with synchronization.
+
+The Map of Map messages in the MoM API are serialised to Json using the Gson library and transmitted as Text messages. JMS Headers are currently
+not used other than to identify the JSON encoding of the text payload.
+
+The delivery of messages on Topics and Queues is entirely managed by AMQ with no additional code. The retry semantics of the QueueReader API
+is achieved by dispatching JMS messages from within a JMS MessageListener onMessage method, and throwing an IllegalArgumentException to JMS
+when a message needs to be re-queued. How retries work and the backoff algorithm used for messages that need to be retried is managed 
+by ActiveMQ configuration which supports many scenarios for retrying messages.
+
+## Delivery Retry for Queued messages
+
+The semantics of the MoM API is that a consumer may throw an exception when its QueueReader.onMessage method is called. That indicates that the 
+message could not be consumed at this time and should be retried. There are several ways that this can be achieved in general, and some 
+AMQ specific ways. By default JMS ensures delivery order. Hence a message on the queue that is not dequeued, will block other messages on the queue 
+until it is dequeued. AMQ deals with this by allowing a deployyer to configure queues to retry at the broker rather than attempting to redeliver in 
+order to the same JMS consumer. The configuration is not default and has to be provided by configuring the AMQ broker.
+
+    <broker xmlns="http://activemq.apache.org/schema/core"    schedulerSupport="true" >
+        .... 
+        <plugins>
+            <redeliveryPlugin fallbackToDeadLetter="true" sendToDlqIfMaxRetriesExceeded="true">
+                <redeliveryPolicyMap>
+                    <redeliveryPolicyMap>
+                        <redeliveryPolicyEntries>
+                            <!-- a destination specific policy -->
+                            <redeliveryPolicy queue="SpecialQueue" maximumRedeliveries="4" redeliveryDelay="10000" />
+                        </redeliveryPolicyEntries>
+                        <!-- the fallback policy for all other destinations -->
+                        <defaultEntry>
+                            <redeliveryPolicy maximumRedeliveries="4" initialRedeliveryDelay="5000" redeliveryDelay="10000" />
+                        </defaultEntry>
+                    </redeliveryPolicyMap>
+                </redeliveryPolicyMap>
+            </redeliveryPlugin>
+        </plugins>
+        
+This can also be achieved in code, by dequeing all messages regardless of failiure or not. Those generate an exception on dequeued get requeued. If the size of the 
+queue is so large as to significantly impact processing due to delays in processing the queue, then an alternative approach is to requeue to a special retry queue, ensuring
+that retries get a higher level of priority. This may not be necessary, as retries happen due to unavailability, and if the queue is long, then resources will be
+available, so no retry. If the queue is short, then the re-queue time is minimal. The approach is quite simular to the approach used in AMQ 5.7 and later, although
+it will work with any JMS provider.
+
+The code base is currently configured to use and explicit dequeue and requeue approach that does not depend on features of the JMS provider.
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..95a7a87
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,237 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.sling</groupId>
+        <artifactId>sling</artifactId>
+        <version>26</version>
+        <relativePath />
+    </parent>
+
+    <artifactId>org.apache.sling.jms</artifactId>
+    <packaging>bundle</packaging>
+    <version>0.0.1-SNAPSHOT</version>
+
+    <name>Apache Sling MoM Implementation using JMS with AMQ</name>
+    <description>
+        Provides a JMS Connection provider that works OOTB in a cluster or uses a pre-existing AMQ endpoint depending on the configuration.
+    </description>
+
+    <scm>
+        <connection>scm:svn:http://svn.apache.org/repos/asf/sling/trunk/contrib/commons/mom/jms</connection>
+        <developerConnection>scm:svn:https://svn.apache.org/repos/asf/sling/trunk/contrib/commons/mom/jms</developerConnection>
+        <url>http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/jms</url>
+    </scm>
+
+    <properties>
+        <site.jira.version.id>12315369</site.jira.version.id>
+        <sling.java.version>7</sling.java.version>
+        <exam.version>4.4.0</exam.version>
+        <url.version>2.4.5</url.version>
+        <bundle.build.dir>${basedir}/target</bundle.build.dir>
+        <bundle.file.name>${bundle.build.dir}/${project.build.finalName}.jar</bundle.file.name>
+        <min.port>37000</min.port>
+        <max.port>37999</max.port>
+    </properties>
+
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-scr-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <extensions>true</extensions>
+                <configuration>
+                    <instructions>
+                        <Embed-Dependency>
+                            activemq-broker,
+                            activemq-client,
+                            activemq-pool,
+                            activemq-protobuf,
+                            activemq-kahadb-store,
+                            activemq-jms-pool,
+                            hawtbuf</Embed-Dependency>
+                        <Import-Package>
+                            org.apache.activemq.jaas*;resolution:=optional,
+                            org.apache.commons.net.ftp*;resolution:=optional,
+                            org.apache.geronimo.transaction*;resolution:=optional,
+                            org.fusesource.hawtbuf*;resolution:=optional,
+                            org.apache.commons.pool2*;resolution:=optional,
+                            javax.jmdns*;resolution:=optional,
+                            !org.apache.maven*,
+                            *
+
+                        </Import-Package>
+                        <!--
+                        <Import-Package>com.thoughtworks.xstream*;resolution:=optional,
+                            org.apache.activeio*;resolution:=optional,
+                            org.apache.commons.pool*;resolution:=optional,
+                            org.apache.derby*;resolution:=optional,
+                            org.apache.tools.ant*;resolution:=optional,
+                            org.apache.maven*;resolution:=optional,
+                            org.apache.xbean*;resolution:=optional,
+                            '=org.apache.xbean.spring.context.v2;resolution:=optional',
+                            org.apache.xpath*;resolution:=optional,
+                            org.codehaus.jam*;resolution:=optional,
+                            org.springframework*;resolution:=optional,
+                            org.springframework.beans.factory.xml;resolution:=optional,
+                            org.w3c.dom.traversal*;resolution:=optional,
+                            org.apache.commons.net*;resolution:=optional,
+                            org.apache.kahadb*;resolution:=optional,
+                            org.apache.activemq.ra*;resolution:=optional,
+                            org.apache.geronimo.transaction.manager*;resolution:=optional,
+                            *
+                        </Import-Package>
+                        -->
+                    </instructions>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <configuration>
+                    <excludes>
+                        <exclude>activemq-data/**</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+    <reporting>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <configuration>
+                    <excludePackageNames>
+                    </excludePackageNames>
+                </configuration>
+            </plugin>
+        </plugins>
+    </reporting>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.mom</artifactId>
+            <version>0.0.1-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- ActiveMQ -->
+        <!-- The released OSGi bindle contains a bleuprint config with an invalid import that cant be satisfied without a custom
+         bundle, hence these will be embeded. -->
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-broker</artifactId>
+            <version>5.13.2</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-client</artifactId>
+            <version>5.13.2</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-pool</artifactId>
+            <version>5.13.2</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-jms-pool</artifactId>
+            <version>5.13.2</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-kahadb-store</artifactId>
+            <version>5.13.2</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.activemq.protobuf</groupId>
+            <artifactId>activemq-protobuf</artifactId>
+            <version>1.1</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.fusesource.hawtbuf</groupId>
+            <artifactId>hawtbuf</artifactId>
+            <version>1.11</version>
+            <scope>provided</scope>
+        </dependency>
+
+
+        <dependency>
+            <groupId>com.google.code.gson</groupId>
+            <artifactId>gson</artifactId>
+            <version>2.2.4</version>
+            <scope>provided</scope>
+        </dependency>
+
+
+
+        <dependency>
+            <groupId>org.apache.felix</groupId>
+            <artifactId>org.apache.felix.scr.annotations</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>osgi.core</artifactId>
+            <version>6.0.0</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.api</artifactId>
+            <version>2.4.0</version>
+            <scope>provided</scope>
+        </dependency>
+      <!-- Testing -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <version>1.10.19</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/src/main/java/org/apache/sling/amq/ActiveMQConnectionFactoryService.java b/src/main/java/org/apache/sling/amq/ActiveMQConnectionFactoryService.java
new file mode 100644
index 0000000..bb3798e
--- /dev/null
+++ b/src/main/java/org/apache/sling/amq/ActiveMQConnectionFactoryService.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.amq;
+
+import org.apache.activemq.pool.PooledConnectionFactory;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.jms.ConnectionFactoryService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+import javax.jms.ConnectionFactory;
+
+/**
+ * Creates a ConnectionFactoryService that makes a pooled  JMS ConnectionFactory available to consumers. The implementation
+ * of JMS is provided by ActiveMQ in this instances. If the component is left un-configured it will connection to vm://localhost:6161.
+ * If no server is present at that address, the component will create a standalone ActiveMQ broker on startup. Without additional
+ * configuration that AMQ Broker will operate standalone. With configuration it is possible to configure the broker to become
+ * a member of a multi master AMQ Broker network. Alternatively if a dedicated AMQ Broker is required the jms.brokerUrl configuration
+ * setting should be adjusted to point to that broker.
+ *
+ * This component works OOTB and in Unit tests with no further action.
+ *
+ * The jms.brokerUrl allows configuration of the broker in any way that ActiveMQ allows, including xbean and broker.
+ *
+ *
+ * Available URI patterns.
+ *
+ * xbean:org/apache/sling/amq/activemq.xml will result in the Broker searching for org/apache/sling/amq/activemq.xml in
+ * the classpath and using that to configure the Borker, see http://activemq.apache.org/xml-configuration.html for details
+ * of the format. See that location for an example of the default configuration.
+ *
+ *
+ *
+ * broker:tcp://localhost:61616 will create a broker on localhost port 61616 using the URI configuration format.
+ * See http://activemq.apache.org/broker-configuration-uri.html and http://activemq.apache.org/broker-uri.html for
+ * details of the format.
+ *
+ * properties:/foo/bar.properties uses a properties file as per http://activemq.apache.org/broker-properties-uri.html
+ *
+ */
+@Component(immediate = true, metatype = true)
+@Service(value=ConnectionFactoryService.class)
+public class ActiveMQConnectionFactoryService implements ConnectionFactoryService {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(ActiveMQConnectionFactoryService.class);
+    private PooledConnectionFactory pooledConnectionFactory;
+
+    // Where the broker is configured out of the box, the shutdown hook must be disabled.
+    // so that the deactivate method can perform the shutdown.
+    // This assumes that OSGi does shutdown properly.
+
+    public static final String DEFAULT_BROKER_URI = "vm://localhost:61616?broker.useShutdownHook=false";
+    @Property(value = DEFAULT_BROKER_URI)
+    public static final String BROKER_URI = "jms.brokerUri";
+
+
+
+
+    @Activate
+    public void activate(Map<String, Object> props) {
+
+        String brokerURL = (String) props.get(BROKER_URI);
+
+        pooledConnectionFactory = new PooledConnectionFactory(brokerURL);
+        pooledConnectionFactory.start();
+    }
+
+
+    @Deactivate
+    public void deactivate(Map<String, Object> props) {
+
+        LOGGER.info("Stopping ActiveMQ Pooled connection factory");
+        pooledConnectionFactory.stop();
+        pooledConnectionFactory = null;
+    }
+
+
+
+    @Override
+    public ConnectionFactory getConnectionFactory() {
+        return pooledConnectionFactory;
+    }
+
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/sling/jms/ConnectionFactoryService.java b/src/main/java/org/apache/sling/jms/ConnectionFactoryService.java
new file mode 100644
index 0000000..e14a3f7
--- /dev/null
+++ b/src/main/java/org/apache/sling/jms/ConnectionFactoryService.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.jms;
+
+import javax.jms.ConnectionFactory;
+
+/**
+ * Created by ieb on 30/03/2016.
+ * Implementations of this service provide JMS Connection factories. In general implementations should work OOTB with no
+ * further configuration and provide an efficient JMS Connection Factory that allows sessions to be created with minimal
+ * overhead.
+ */
+public interface ConnectionFactoryService {
+    /**
+     *
+     * Get a connection factory.
+     * @return the connection factory.
+     */
+    ConnectionFactory getConnectionFactory();
+}
diff --git a/src/main/java/org/apache/sling/jms/JMSMessageTypes.java b/src/main/java/org/apache/sling/jms/JMSMessageTypes.java
new file mode 100644
index 0000000..a6fcf29
--- /dev/null
+++ b/src/main/java/org/apache/sling/jms/JMSMessageTypes.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.jms;
+
+/**
+ * Created by ieb on 04/04/2016.
+ */
+public enum JMSMessageTypes {
+    /**
+     * A text message containing json.
+     */
+    JSON
+}
diff --git a/src/main/java/org/apache/sling/jms/JMSQueueManager.java b/src/main/java/org/apache/sling/jms/JMSQueueManager.java
new file mode 100644
index 0000000..59d888d
--- /dev/null
+++ b/src/main/java/org/apache/sling/jms/JMSQueueManager.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.jms;
+
+import org.apache.felix.scr.annotations.*;
+import org.apache.sling.mom.*;
+import org.osgi.framework.ServiceReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.jms.*;
+import java.io.Closeable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Created by ieb on 30/03/2016.
+ * A JMS implementation of a QueueManager. It will allow callers to add messages to named queues, and consumers to read
+ * messages from named queues in order. The component uses a single connection to the JMS broker, but dedicated sessions
+ * for each send and for each Queue reader.
+ */
+@Component(immediate = true)
+@Service(value = QueueManager.class)
+public class JMSQueueManager implements QueueManager {
+
+
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(JMSQueueManager.class);
+    private static final String NRETRIES = "_nr";
+
+    @Reference
+    private ConnectionFactoryService connectionFactoryService;
+
+
+    /**
+     * Holds all QueueReader registrations.
+     */
+    @Reference(referenceInterface = QueueReader.class,
+            cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE,
+            policy = ReferencePolicy.DYNAMIC,
+            bind="addReader",
+            unbind="removeReader")
+    private final Map<ServiceReference<QueueReader>, QueueReaderHolder> registrations =
+            new ConcurrentHashMap<ServiceReference<QueueReader>, QueueReaderHolder>();
+
+    private Connection connection;
+
+    @Activate
+    public synchronized void activate(Map<String, Object> properties) throws JMSException {
+        connection = connectionFactoryService.getConnectionFactory().createConnection();
+        connection.start();
+    }
+
+    @Deactivate
+    public synchronized void deactivate(Map<String, Object> properties) throws JMSException {
+        for ( Map.Entry<ServiceReference<QueueReader>, QueueReaderHolder> e : registrations.entrySet()) {
+            e.getValue().close();
+        }
+        registrations.clear();
+        connection.stop();
+        connection.close();
+    }
+
+
+
+    /**
+     * Add a message to the queue. The message is added to the queue transactionally and auto acknowledged.
+     * @param name the name of the queue.
+     * @param message the message to post to the queue.
+     */
+    @Override
+    public void add(@Nonnull Types.QueueName name, @Nonnull Map<String, Object> message) {
+        Session session = null;
+        try {
+            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+            message.put(NRETRIES, 0L); // set the number of retries to 0.
+            TextMessage textMessage = session.createTextMessage(Json.toJson(message));
+            textMessage.setJMSType(JMSMessageTypes.JSON.toString());
+            LOGGER.info("Sending to {} message {} ", name, textMessage);
+            session.createProducer(session.createQueue(name.toString())).send(textMessage);
+            session.commit();
+            session.close();
+        } catch (JMSException e) {
+            LOGGER.error("Unable to send message to queue "+name, e);
+            close(session);
+
+        }
+
+    }
+
+
+    /**
+     * quietly close the session.
+     * @param session
+     */
+    private void close(Session session) {
+        if(session != null) {
+            try {
+                session.close();
+            } catch (JMSException e) {
+                LOGGER.warn("Unable to close session ",e);
+            }
+        }
+    }
+
+
+    // Register Readers using OSGi Whiteboard pattern
+    public synchronized  void addReader(ServiceReference<QueueReader> serviceRef) {
+        if (registrations.containsKey(serviceRef)) {
+            LOGGER.error("Registration for service reference is already present {}",serviceRef);
+            return;
+        }
+        QueueReaderHolder queueReaderHolder = new QueueReaderHolder(connection, serviceRef.getBundle().getBundleContext().getService(serviceRef), getServiceProperties(serviceRef));
+        registrations.put(serviceRef, queueReaderHolder);
+    }
+
+    private Map<String, Object> getServiceProperties(ServiceReference<QueueReader> serviceRef) {
+        Map<String, Object> m = new HashMap<String, Object>();
+        for ( String k : serviceRef.getPropertyKeys()) {
+            m.put(k, serviceRef.getProperty(k));
+        }
+        return Collections.unmodifiableMap(m);
+    }
+
+    public synchronized void removeReader(ServiceReference<QueueReader> serviceRef) {
+        QueueReaderHolder queueReaderHolder = registrations.remove(serviceRef);
+        if ( queueReaderHolder != null) {
+            queueReaderHolder.close();
+        }
+    }
+
+    private static class QueueReaderHolder implements Closeable {
+        private final JMSQueueSession session;
+
+        public QueueReaderHolder(Connection connection, QueueReader queueReader, Map<String, Object> properties) {
+            try {
+                LOGGER.info("Creating Queue holder for {} ", queueReader.getClass());
+                String name = (String) properties.get(QueueReader.QUEUE_NAME_PROP);
+                checkNotNull(name, "A valid queue name as property " + QueueReader.QUEUE_NAME_PROP + " is required for QueueReader registration");
+                if (queueReader instanceof MessageFilter) {
+                    session = new JMSQueueSession(connection, queueReader, name, (MessageFilter) queueReader, true, 5);
+                } else {
+                    session = new JMSQueueSession(connection, queueReader, name, new MessageFilter() {
+                        @Override
+                        public boolean accept(Types.Name name, Map<String, Object> mapMessage) {
+                            return true;
+                        }
+
+                    }, true, 5);
+
+                }
+            } catch (JMSException e) {
+                throw new IllegalArgumentException("Unable to register QueueReader with JMS ",e);
+            }
+
+        }
+
+        public void close() {
+            session.close();
+        }
+    }
+
+    private static void checkNotNull(Object v, String message) {
+        if ( v == null) {
+            throw new IllegalArgumentException(message);
+        }
+    }
+
+
+    public static class JMSQueueSession implements Closeable, MessageListener {
+        private static final Logger LOGGER = LoggerFactory.getLogger(JMSQueueSession.class);
+        private final QueueReader queueReader;
+        private final String name;
+        private final MessageFilter messageFilter;
+        private final Session session;
+        private final MessageConsumer queueConsumer;
+        private final MessageProducer queueProducer;
+        private boolean retryByRequeue;
+        private int maxRetries;
+
+        public JMSQueueSession(Connection connection, QueueReader queueReader, String name,  MessageFilter messageFilter, boolean retryByRequeue, int maxRetries) throws JMSException {
+            this.queueReader = queueReader;
+            this.name = name;
+            this.messageFilter = messageFilter;
+            this.retryByRequeue = retryByRequeue;
+            this.maxRetries = maxRetries;
+            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue(name);
+            queueConsumer = session.createConsumer(queue);
+            queueProducer = session.createProducer(queue);
+            queueConsumer.setMessageListener(this);
+        }
+
+        @Override
+        public void close() {
+            if ( queueConsumer != null) {
+                try {
+                    queueConsumer.close();
+                } catch (JMSException e) {
+                    LOGGER.warn("Failed to close queue consumer on "+name,e);
+                }
+            }
+            if ( session != null) {
+                try {
+                    session.close();
+                } catch (JMSException e) {
+                    LOGGER.warn("Failed to close queue session on " + name, e);
+                }
+            }
+
+        }
+
+        @Override
+        public void onMessage(Message message) {
+            boolean committed = false;
+            TextMessage textMessage = null;
+            try {
+                try {
+                    LOGGER.info("Got from {} message {} ", name, message);
+                    Destination destination = message.getJMSDestination();
+                    if (destination instanceof Queue) {
+                        Queue queue = (Queue) destination;
+                        if ( JMSMessageTypes.JSON.equals(JMSMessageTypes.valueOf(message.getJMSType()))) {
+                            textMessage = (TextMessage) message;
+                            final Map<String, Object> mapMessage = Json.toMap(textMessage.getText());
+                            Types.QueueName queueName = Types.queueName(queue.getQueueName());
+                            if (queueName.equals(name) && messageFilter.accept(queueName, mapMessage)) {
+                                queueReader.onMessage(queueName, mapMessage);
+                                session.commit();
+                                // all ok.
+                                committed = true;
+                                return;
+                            }
+                        }
+                    }
+                } catch (RequeueMessageException e) {
+                    LOGGER.info("QueueReader requested requeue of message ", e);
+                    if (retryByRequeue && textMessage != null) {
+                        Map<String, Object> mapMessage = Json.toMap(textMessage.getText());
+                        if ((int)mapMessage.get(NRETRIES) < maxRetries) {
+                            mapMessage.put(NRETRIES, ((long) mapMessage.get(NRETRIES)) + 1);
+                            TextMessage retryMessage = session.createTextMessage(Json.toJson(mapMessage));
+                            retryMessage.setJMSType(JMSMessageTypes.JSON.toString());
+                            LOGGER.info("Retrying message Sending to {} message {} ", name, textMessage);
+                            queueProducer.send(retryMessage);
+                            session.commit();
+                            committed = true;
+                            return;
+                        }
+                    }
+                }
+            } catch (JMSException e) {
+                LOGGER.info("Receive failed leaving to provider to require if required. ", e);
+            } finally {
+                try {
+                    if (!committed) {
+                        session.rollback();
+                    }
+                } catch (JMSException e) {
+                    LOGGER.info("QueueReader rollback failed. ",e);
+                }
+            }
+            // If onMessage throws an exception JMS will put the message back onto the queue.
+            // the delay before it gets retried is a JMS server configuration.
+            throw new IllegalArgumentException("Unable to process message, requeue");
+        }
+
+    }
+
+}
diff --git a/src/main/java/org/apache/sling/jms/JMSTopicManager.java b/src/main/java/org/apache/sling/jms/JMSTopicManager.java
new file mode 100644
index 0000000..e3b0588
--- /dev/null
+++ b/src/main/java/org/apache/sling/jms/JMSTopicManager.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.jms;
+
+import org.apache.felix.scr.annotations.*;
+import org.apache.sling.mom.*;
+import org.osgi.framework.ServiceReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Created by ieb on 30/03/2016.
+ * This class provides support for sending messages to topics over JMS and subscribing to topics. It uses the ConnectionFactoryService
+ * to interact with JMS. There is nothing in
+ */
+@Component(immediate = true)
+@Service(value = TopicManager.class)
+public class JMSTopicManager implements TopicManager {
+
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(JMSTopicManager.class);
+
+
+    /**
+     * Holds all QueueReader registrations.
+     */
+    @Reference(referenceInterface = Subscriber.class,
+            cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE,
+            policy = ReferencePolicy.DYNAMIC,
+            bind="addSubscriber",
+            unbind="removeSubscriber")
+    private final Map<ServiceReference<Subscriber>, SubscriberHolder> registrations =
+            new ConcurrentHashMap<ServiceReference<Subscriber>, SubscriberHolder>();
+
+    @Reference
+    private ConnectionFactoryService connectionFactoryService;
+    // A single connection is maintained open per instance of this component.
+    private Connection connection;
+    // A single session is used for listening to messages. Separate sessions are opened for sending to avoid synchronisation on sending operations.
+    private Session session;
+    private final Object lock = new Object();
+
+    @Activate
+    public synchronized  void activate(Map<String, Object> properties) throws JMSException {
+            connection = connectionFactoryService.getConnectionFactory().createConnection();
+            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+            connection.start();
+    }
+
+    @Deactivate
+    public synchronized  void deactivate(Map<String, Object> properties) throws JMSException {
+        for ( Map.Entry<ServiceReference<Subscriber>, SubscriberHolder> e : registrations.entrySet()) {
+            e.getValue().close();
+        }
+        registrations.clear();
+        // don't close the session, there is a bug in JMS which means an already closed session wont go quietly
+        // and the hook that shutsdown an embedded connection still gets fired when OSGi shutsdown even with
+        // a flag to prevent it. connection.stop and close are clean.
+        connection.stop();
+        connection.close();
+    }
+
+
+    @Override
+    public void publish(Types.TopicName name, Types.CommandName commandName, Map<String, Object> message) {
+        Session session = null;
+        try {
+            // use a fresh session per message.
+            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+            TextMessage textMessage = session.createTextMessage(Json.toJson(message));
+            textMessage.setJMSType(JMSMessageTypes.JSON.toString());
+            session.createProducer(session.createTopic(name.toString())).send(textMessage);
+            session.commit();
+            session.close();
+        } catch (JMSException e) {
+            LOGGER.error("Unable to send message to queue "+name, e);
+            if(session != null) {
+                try {
+                    session.close();
+                } catch (JMSException e1) {
+                    LOGGER.warn("Unable to close session ",e1);
+                }
+            }
+
+        }
+    }
+
+
+    // Register Subscribers using OSGi Whiteboard pattern
+    public synchronized  void addSubscriber(ServiceReference<Subscriber> serviceRef) {
+        if (registrations.containsKey(serviceRef)) {
+            LOGGER.error("Registration for service reference is already present {}",serviceRef);
+            return;
+        }
+        SubscriberHolder subscriberHolder = new SubscriberHolder(session, serviceRef.getBundle().getBundleContext().getService(serviceRef), getServiceProperties(serviceRef));
+        registrations.put(serviceRef, subscriberHolder);
+    }
+
+    private Map<String, Object> getServiceProperties(ServiceReference<Subscriber> serviceRef) {
+        Map<String, Object> m = new HashMap<String, Object>();
+        for ( String k : serviceRef.getPropertyKeys()) {
+            m.put(k, serviceRef.getProperty(k));
+        }
+        return Collections.unmodifiableMap(m);
+    }
+
+    public synchronized void removeSubscriber(ServiceReference<Subscriber> serviceRef) {
+        SubscriberHolder subscriberHolder = registrations.remove(serviceRef);
+        if (subscriberHolder != null) {
+            subscriberHolder.close();
+        }
+    }
+
+    private static class SubscriberHolder implements Closeable {
+
+
+        private final FilteredTopicSubscriber filteredTopicSubscriber;
+
+        public SubscriberHolder(Session session, Subscriber subscriber, Map<String, Object> properties) {
+            try {
+                LOGGER.info("Creating Subscriber holder for {} ", subscriber.getClass());
+                String[] topicNames = (String[]) properties.get(Subscriber.TOPIC_NAMES_PROP);
+
+                if ( topicNames == null || topicNames.length == 0) {
+                    throw new IllegalArgumentException("At least one valid topic name in property " + Subscriber.TOPIC_NAMES_PROP + " is required for Subscriber registration");
+                }
+                if ( subscriber instanceof MessageFilter) {
+                    filteredTopicSubscriber = new FilteredTopicSubscriber(session, subscriber, topicNames, (MessageFilter)subscriber);
+                } else {
+                    filteredTopicSubscriber = new FilteredTopicSubscriber(session, subscriber, topicNames, new MessageFilter() {
+
+
+                        @Override
+                        public boolean accept(Types.Name name, Map<String, Object> mapMessage) {
+                            return true;
+                        }
+                    });
+
+                }
+            } catch (JMSException e) {
+                throw new IllegalArgumentException("Unable to register QueueReader with JMS ",e);
+            }
+
+        }
+
+        public void close() {
+            try {
+                filteredTopicSubscriber.close();
+            } catch (IOException e) {
+                LOGGER.warn("Unable to close topic subscriber {} ", e);
+            }
+        }
+    }
+
+    /**
+     * This listens to topic messages, and applies the message filter prior to sending to the subscriber.
+     * Although JMS has its own filtering language, this is JMS specific and since we don't want to expose implementation
+     * details in the JOBs API either explicitly or out of band, the JMS specific filters cant be used. As a replacement the
+     * API provides the MessageFilter API.
+     */
+    private static final class FilteredTopicSubscriber implements Closeable, MessageListener {
+        private final Subscriber subscriber;
+        private final MessageFilter filter;
+        private final List<MessageConsumer> consumers = new ArrayList<MessageConsumer>();
+
+        public FilteredTopicSubscriber(@Nonnull Session session,
+                                       @Nonnull Subscriber subscriber,
+                                       @Nonnull String[] topicNames,
+                                       @Nonnull MessageFilter filter) throws JMSException {
+            this.subscriber = subscriber;
+            this.filter = filter;
+            for (String t : topicNames) {
+                MessageConsumer c = session.createConsumer(session.createTopic(t));
+                c.setMessageListener(this);
+                consumers.add(c);
+            }
+        }
+
+
+        @Override
+        public void onMessage(Message message) {
+            try {
+                LOGGER.info("Got message {} ", message);
+                Destination destination = message.getJMSDestination();
+                if (destination instanceof Topic) {
+                    Topic topic = (Topic) destination;
+                    String type = message.getJMSType();
+                    if (JMSMessageTypes.JSON.equals(JMSMessageTypes.valueOf(type))) {
+                        TextMessage textMessage = (TextMessage) message;
+                        Map<String, Object> mapMessage = Json.toMap(textMessage.getText());
+                        Types.TopicName topicName = Types.topicName(topic.getTopicName());
+                        if ( filter.accept(topicName, mapMessage) ) {
+                            subscriber.onMessage(topicName, mapMessage);
+                        }
+                    }
+                }
+            } catch (JMSException e) {
+                LOGGER.warn("Failed to deliver message ",e);
+            }
+        }
+
+        @Override
+        public void close() throws IOException {
+            for (MessageConsumer c : consumers) {
+                try {
+                    LOGGER.info("Closing consumer on dispose {} ",c);
+                    c.close();
+                } catch (JMSException e) {
+                    LOGGER.warn(e.getMessage(), e);
+                }
+            }
+        }
+    }
+
+
+
+}
diff --git a/src/main/java/org/apache/sling/jms/Json.java b/src/main/java/org/apache/sling/jms/Json.java
new file mode 100644
index 0000000..b4715d2
--- /dev/null
+++ b/src/main/java/org/apache/sling/jms/Json.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.jms;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonPrimitive;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by ieb on 31/03/2016.
+ */
+public class Json {
+
+    public static Map<String, Object> toMap(String text) {
+        JsonElement root = new JsonParser().parse(text);
+        return toMapValue(root);
+    }
+
+    private static <T> T toMapValue(JsonElement element) {
+        if (element.isJsonObject()) {
+            return (T) toMapValue(element.getAsJsonObject());
+        } else if (element.isJsonArray()) {
+            return (T) toMapValue(element.getAsJsonArray());
+        } else if (element.isJsonNull()) {
+            return null;
+        } else if (element.isJsonPrimitive()) {
+            return (T) toMapValue(element.getAsJsonPrimitive());
+        }
+        throw new IllegalArgumentException("Encountered JsonElement that is not an object, array, primitive or null: "+element);
+    }
+    private static <T> T toMapValue(JsonArray array) {
+        List<Object> list = new ArrayList<Object>();
+        for( JsonElement e : array) {
+            list.add(toMapValue(e));
+        }
+        return (T) list;
+    }
+
+    private static <T> T toMapValue(JsonObject obj) {
+        Map<String, Object> m = new HashMap<String, Object>();
+        for (Map.Entry<String, JsonElement> e : obj.entrySet()) {
+            m.put(e.getKey(), toMapValue(e.getValue()));
+        }
+        return (T) m;
+    }
+
+    private static <T> T toMapValue(JsonPrimitive p) {
+        if (p.isString()) {
+            return (T) p.getAsString();
+        } else if (p.isBoolean()) {
+            return (T) ((Boolean)p.getAsBoolean());
+        } else if (p.isNumber()) {
+            double d = p.getAsDouble();
+            if (Math.floor(d) == d) {
+                return (T)((Long)p.getAsLong());
+            }
+            return (T)((Double)d);
+        } else {
+            return null;
+        }
+    }
+
+    public static String toJson(Map<String, Object> message) {
+        return new Gson().toJson(message);
+    }
+
+
+}
diff --git a/src/main/resources/org/apache/sling/amq/activemq.xml b/src/main/resources/org/apache/sling/amq/activemq.xml
new file mode 100644
index 0000000..d6313d5
--- /dev/null
+++ b/src/main/resources/org/apache/sling/amq/activemq.xml
@@ -0,0 +1,170 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<!--  START SNIPPET: example  -->
+<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+    <!--
+     Allows us to use system properties as variables in this configuration file
+    -->
+    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
+        <property name="locations">
+            <value>file:${activemq.conf}/credentials.properties</value>
+        </property>
+    </bean>
+    <!--
+
+            The <broker> element is used to configure the ActiveMQ broker.
+
+    -->
+    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
+        <!--
+
+                    For better performances use VM cursor and small memory limit.
+                    For more information, see:
+
+                    http://activemq.apache.org/message-cursors.html
+
+                    Also, if your producer is "hanging", it's probably due to producer flow control.
+                    For more information, see:
+                    http://activemq.apache.org/producer-flow-control.html
+
+        -->
+        <destinationPolicy>
+            <policyMap>
+                <policyEntries>
+                    <policyEntry topic=">" producerFlowControl="true">
+                        <!--
+                         The constantPendingMessageLimitStrategy is used to prevent
+                                                 slow topic consumers to block producers and affect other consumers
+                                                 by limiting the number of messages that are retained
+                                                 For more information, see:
+
+                                                 http://activemq.apache.org/slow-consumer-handling.html
+
+
+                        -->
+                        <pendingMessageLimitStrategy>
+                            <constantPendingMessageLimitStrategy limit="1000"/>
+                        </pendingMessageLimitStrategy>
+                    </policyEntry>
+                    <policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
+                        <!--
+                         Use VM cursor for better latency
+                                               For more information, see:
+
+                                               http://activemq.apache.org/message-cursors.html
+
+                                          <pendingQueuePolicy>
+                                            <vmQueueCursor/>
+                                          </pendingQueuePolicy>
+
+                        -->
+                    </policyEntry>
+                </policyEntries>
+            </policyMap>
+        </destinationPolicy>
+        <!--
+
+                    The managementContext is used to configure how ActiveMQ is exposed in
+                    JMX. By default, ActiveMQ uses the MBean server that is started by
+                    the JVM. For more information, see:
+
+                    http://activemq.apache.org/jmx.html
+
+        -->
+        <managementContext>
+            <managementContext createConnector="false"/>
+        </managementContext>
+        <!--
+
+                    Configure message persistence for the broker. The default persistence
+                    mechanism is the KahaDB store (identified by the kahaDB tag).
+                    For more information, see:
+
+                    http://activemq.apache.org/persistence.html
+
+        -->
+        <persistenceAdapter>
+            <kahaDB directory="${activemq.data}/kahadb"/>
+        </persistenceAdapter>
+        <!--
+
+                    The systemUsage controls the maximum amount of space the broker will
+                    use before slowing down producers. For more information, see:
+                    http://activemq.apache.org/producer-flow-control.html
+                    If using ActiveMQ embedded - the following limits could safely be used:
+
+                <systemUsage>
+                    <systemUsage>
+                        <memoryUsage>
+                            <memoryUsage limit="20 mb"/>
+                        </memoryUsage>
+                        <storeUsage>
+                            <storeUsage limit="1 gb"/>
+                        </storeUsage>
+                        <tempUsage>
+                            <tempUsage limit="100 mb"/>
+                        </tempUsage>
+                    </systemUsage>
+                </systemUsage>
+
+        -->
+        <systemUsage>
+            <systemUsage>
+                <memoryUsage>
+                    <memoryUsage limit="64 mb"/>
+                </memoryUsage>
+                <storeUsage>
+                    <storeUsage limit="100 gb"/>
+                </storeUsage>
+                <tempUsage>
+                    <tempUsage limit="50 gb"/>
+                </tempUsage>
+            </systemUsage>
+        </systemUsage>
+        <!--
+
+                    The transport connectors expose ActiveMQ over a given protocol to
+                    clients and other brokers. For more information, see:
+
+                    http://activemq.apache.org/configuring-transports.html
+
+        -->
+        <transportConnectors>
+            <!--
+             DOS protection, limit concurrent connections to 1000 and frame size to 100MB
+            -->
+            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
+            <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
+        </transportConnectors>
+        <!--
+         destroy the spring context on shutdown to stop jetty
+        -->
+        <shutdownHooks>
+            <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook"/>
+        </shutdownHooks>
+    </broker>
+    <!--
+
+            Enable web consoles, REST and Ajax APIs and demos
+
+            Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
+
+    -->
+    <import resource="jetty.xml"/>
+</beans>
+        <!--  END SNIPPET: example  -->
\ No newline at end of file
diff --git a/src/main/resources/org/apache/sling/amq/credentials.properties b/src/main/resources/org/apache/sling/amq/credentials.properties
new file mode 100644
index 0000000..b8be66e
--- /dev/null
+++ b/src/main/resources/org/apache/sling/amq/credentials.properties
@@ -0,0 +1,22 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+# Defines credentials that will be used by components (like web console) to access the broker
+
+activemq.username=system
+activemq.password=manager
+guest.password=password
\ No newline at end of file
diff --git a/src/main/resources/org/apache/sling/amq/jetty.xml b/src/main/resources/org/apache/sling/amq/jetty.xml
new file mode 100644
index 0000000..bc03f77
--- /dev/null
+++ b/src/main/resources/org/apache/sling/amq/jetty.xml
@@ -0,0 +1,144 @@
+<!--
+
+        Licensed to the Apache Software Foundation (ASF) under one or more contributor
+        license agreements. See the NOTICE file distributed with this work for additional
+        information regarding copyright ownership. The ASF licenses this file to You under
+        the Apache License, Version 2.0 (the "License"); you may not use this file except in
+        compliance with the License. You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or
+        agreed to in writing, software distributed under the License is distributed on an
+        "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+        implied. See the License for the specific language governing permissions and
+        limitations under the License.
+
+-->
+<!--
+
+        An embedded servlet engine for serving up the Admin consoles, REST and Ajax APIs and
+        some demos Include this file in your configuration to enable ActiveMQ web components
+        e.g. <import resource="jetty.xml"/>
+
+-->
+<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
+    <bean id="securityLoginService" class="org.eclipse.jetty.security.HashLoginService">
+        <property name="name" value="ActiveMQRealm"/>
+        <property name="config" value="${activemq.conf}/jetty-realm.properties"/>
+    </bean>
+    <bean id="securityConstraint" class="org.eclipse.jetty.util.security.Constraint">
+        <property name="name" value="BASIC"/>
+        <property name="roles" value="user,admin"/>
+        <property name="authenticate" value="true"/>
+    </bean>
+    <bean id="adminSecurityConstraint" class="org.eclipse.jetty.util.security.Constraint">
+        <property name="name" value="BASIC"/>
+        <property name="roles" value="admin"/>
+        <property name="authenticate" value="true"/>
+    </bean>
+    <bean id="securityConstraintMapping" class="org.eclipse.jetty.security.ConstraintMapping">
+        <property name="constraint" ref="securityConstraint"/>
+        <property name="pathSpec" value="/,*.jsp"/>
+    </bean>
+    <bean id="adminSecurityConstraintMapping" class="org.eclipse.jetty.security.ConstraintMapping">
+        <property name="constraint" ref="adminSecurityConstraint"/>
+        <property name="pathSpec" value="*.action"/>
+    </bean>
+    <bean id="securityHandler" class="org.eclipse.jetty.security.ConstraintSecurityHandler">
+        <property name="loginService" ref="securityLoginService"/>
+        <property name="authenticator">
+            <bean class="org.eclipse.jetty.security.authentication.BasicAuthenticator"/>
+        </property>
+        <property name="constraintMappings">
+            <list>
+                <ref bean="adminSecurityConstraintMapping"/>
+                <ref bean="securityConstraintMapping"/>
+            </list>
+        </property>
+        <property name="handler">
+            <bean id="sec" class="org.eclipse.jetty.server.handler.HandlerCollection">
+                <property name="handlers">
+                    <list>
+                        <bean class="org.eclipse.jetty.webapp.WebAppContext">
+                            <property name="contextPath" value="/hawtio"/>
+                            <property name="war" value="${activemq.home}/webapps/hawtio"/>
+                            <property name="logUrlOnStart" value="true"/>
+                        </bean>
+                        <bean class="org.eclipse.jetty.webapp.WebAppContext">
+                            <property name="contextPath" value="/admin"/>
+                            <property name="resourceBase" value="${activemq.home}/webapps/admin"/>
+                            <property name="logUrlOnStart" value="true"/>
+                        </bean>
+                        <bean class="org.eclipse.jetty.webapp.WebAppContext">
+                            <property name="contextPath" value="/fileserver"/>
+                            <property name="resourceBase" value="${activemq.home}/webapps/fileserver"/>
+                            <property name="logUrlOnStart" value="true"/>
+                            <property name="parentLoaderPriority" value="true"/>
+                        </bean>
+                        <bean class="org.eclipse.jetty.webapp.WebAppContext">
+                            <property name="contextPath" value="/api"/>
+                            <property name="resourceBase" value="${activemq.home}/webapps/api"/>
+                            <property name="logUrlOnStart" value="true"/>
+                        </bean>
+                        <bean class="org.eclipse.jetty.server.handler.ResourceHandler">
+                            <property name="directoriesListed" value="false"/>
+                            <property name="welcomeFiles">
+                                <list>
+                                    <value>index.html</value>
+                                </list>
+                            </property>
+                            <property name="resourceBase" value="${activemq.home}/webapps/"/>
+                        </bean>
+                        <bean id="defaultHandler" class="org.eclipse.jetty.server.handler.DefaultHandler">
+                            <property name="serveIcon" value="false"/>
+                        </bean>
+                    </list>
+                </property>
+            </bean>
+        </property>
+    </bean>
+    <bean id="rewrite" class="org.eclipse.jetty.rewrite.handler.RewriteHandler">
+        <property name="rules">
+            <set>
+                <bean class="org.eclipse.jetty.rewrite.handler.RedirectRegexRule">
+                    <property name="regex" value="/api/jolokia(.*)"/>
+                    <property name="replacement" value="/hawtio/jolokia$1"/>
+                </bean>
+            </set>
+        </property>
+    </bean>
+    <bean id="contexts" class="org.eclipse.jetty.server.handler.ContextHandlerCollection"></bean>
+    <bean id="Server" class="org.eclipse.jetty.server.Server" init-method="start" destroy-method="stop">
+        <property name="connectors">
+            <list>
+                <bean id="Connector" class="org.eclipse.jetty.server.nio.SelectChannelConnector">
+                    <property name="port" value="8161"/>
+                </bean>
+                <!--
+
+                                    Enable this connector if you wish to use https with web console
+
+                -->
+                <!--
+
+                                <bean id="SecureConnector" class="org.eclipse.jetty.server.ssl.SslSelectChannelConnector">
+                                    <property name="port" value="8162" />
+                                    <property name="keystore" value="file:${activemq.conf}/broker.ks" />
+                                    <property name="password" value="password" />
+                                </bean>
+
+                -->
+            </list>
+        </property>
+        <property name="handler">
+            <bean id="handlers" class="org.eclipse.jetty.server.handler.HandlerCollection">
+                <property name="handlers">
+                    <list>
+                        <ref bean="rewrite"/>
+                        <ref bean="contexts"/>
+                        <ref bean="securityHandler"/>
+                    </list>
+                </property>
+            </bean>
+        </property>
+    </bean>
+</beans>
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/amq/ActiveMQConnectionFactoryServiceTest.java b/src/test/java/org/apache/sling/amq/ActiveMQConnectionFactoryServiceTest.java
new file mode 100644
index 0000000..0a1da45
--- /dev/null
+++ b/src/test/java/org/apache/sling/amq/ActiveMQConnectionFactoryServiceTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.amq;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.jms.*;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+/**
+ * Created by ieb on 31/03/2016.
+ */
+public class ActiveMQConnectionFactoryServiceTest {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(ActiveMQConnectionFactoryServiceTest.class);
+
+    @Test
+    public void testGetConnectionFactory() throws Exception {
+        LOGGER.info("Starting test");
+        ActiveMQConnectionFactoryService cfs = ActiveMQConnectionFactoryServiceTest.activate(null);
+        ConnectionFactory cf = cfs.getConnectionFactory();
+        Connection connection = cf.createConnection();
+        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        Topic t = session.createTopic("testTopic");
+        MessageConsumer consumer = session.createConsumer(t);
+        LOGGER.info("Starting connection");
+        connection.start();
+        LOGGER.info("Connection started.. sending message");
+        session.createProducer(t).send(session.createTextMessage("testing with a message"));
+        session.commit();
+        LOGGER.info("Message sent ... receiving message");
+        Message m = consumer.receive();
+        LOGGER.info("Message received");
+        assertTrue(m instanceof TextMessage);
+        assertEquals("testing with a message", ((TextMessage)m).getText());
+        session.close();
+        connection.stop();
+
+        deactivate(cfs);
+    }
+
+    public static void deactivate(@Nonnull ActiveMQConnectionFactoryService cfs) {
+        cfs.deactivate(new HashMap<String, Object>());
+    }
+
+    @Nonnull
+    public static ActiveMQConnectionFactoryService activate(@Nullable Map<String, Object> props) {
+        ActiveMQConnectionFactoryService amqConnectionFactoryService = new ActiveMQConnectionFactoryService();
+        if ( props == null ) {
+            props = new HashMap<String, Object>();
+            props.put(ActiveMQConnectionFactoryService.BROKER_URI, ActiveMQConnectionFactoryService.DEFAULT_BROKER_URI);
+        }
+        amqConnectionFactoryService.activate(props);
+        return amqConnectionFactoryService;
+    }
+
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/jms/JMSQueueManagerTest.java b/src/test/java/org/apache/sling/jms/JMSQueueManagerTest.java
new file mode 100644
index 0000000..ddf20a7
--- /dev/null
+++ b/src/test/java/org/apache/sling/jms/JMSQueueManagerTest.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.jms;
+
+import org.apache.sling.amq.ActiveMQConnectionFactoryService;
+import org.apache.sling.amq.ActiveMQConnectionFactoryServiceTest;
+import org.apache.sling.mom.*;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import java.lang.reflect.Field;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+/**
+ * Created by ieb on 01/04/2016.
+ */
+public class JMSQueueManagerTest {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(JMSQueueManagerTest.class);
+    private ActiveMQConnectionFactoryService amqConnectionFactoryService;
+    private JMSQueueManager jmsQueueManager;
+    private Map<String, Object> testMap;
+    private boolean passed;
+    private int ndeliveries;
+
+    @Mock
+    private ServiceReference<QueueReader> serviceReference;
+    @Mock
+    private Bundle bundle;
+    @Mock
+    private BundleContext bundleContext;
+    private Map<String, Object> serviceProperties = new HashMap<String, Object>();
+
+    public JMSQueueManagerTest() {
+        MockitoAnnotations.initMocks(this);
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        Mockito.when(serviceReference.getBundle()).thenReturn(bundle);
+        Mockito.when(bundle.getBundleContext()).thenReturn(bundleContext);
+        Mockito.when(serviceReference.getPropertyKeys()).thenAnswer(new Answer<String[]>() {
+            @Override
+            public String[] answer(InvocationOnMock invocationOnMock) throws Throwable {
+                return (String[]) serviceProperties.keySet().toArray(new String[serviceProperties.size()]);
+            }
+        });
+        Mockito.when(serviceReference.getProperty(Mockito.anyString())).thenAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+                return serviceProperties.get(invocationOnMock.getArguments()[0]);
+            }
+        });
+        amqConnectionFactoryService = ActiveMQConnectionFactoryServiceTest.activate(null);
+        jmsQueueManager = JMSQueueManagerTest.activate(amqConnectionFactoryService);
+        testMap = JsonTest.createTestMap();
+        passed = false;
+
+    }
+
+
+    private static JMSQueueManager activate(ActiveMQConnectionFactoryService amqConnectionFactoryService) throws NoSuchFieldException, IllegalAccessException, JMSException {
+        JMSQueueManager jmsQueueManager = new JMSQueueManager();
+        setPrivate(jmsQueueManager, "connectionFactoryService", amqConnectionFactoryService);
+        jmsQueueManager.activate(new HashMap<String, Object>());
+        return jmsQueueManager;
+
+    }
+
+    private static void setPrivate(Object object, String name, Object value) throws NoSuchFieldException, IllegalAccessException {
+        Field field = object.getClass().getDeclaredField(name);
+        if ( !field.isAccessible()) {
+            field.setAccessible(true);
+        }
+        field.set(object, value);
+    }
+
+
+    @After
+    public void after() throws JMSException {
+        JMSQueueManagerTest.deactivate(jmsQueueManager);
+        ActiveMQConnectionFactoryServiceTest.deactivate(amqConnectionFactoryService);
+    }
+
+    public static void deactivate(JMSQueueManager jmsQueueManager) throws JMSException {
+        jmsQueueManager.deactivate(new HashMap<String, Object>());
+    }
+
+    @Test
+    public void testQueue() throws JMSException, InterruptedException {
+        // clean the queue out of messages from earlier tests, which may have failed.
+        final String queueName = "testQueueReject";
+
+        emptyQueue(queueName);
+        // make the test map unique.
+        testMap.put("testing", queueName + System.currentTimeMillis());
+        jmsQueueManager.add(Types.queueName(queueName), testMap);
+
+        checkMessagesInQueue(queueName, 1);
+
+        ndeliveries = 0;
+        QueueReader queueReader = new QueueReader() {
+            @Override
+            public void onMessage(Types.QueueName queueName, Map<String, Object> message) throws RequeueMessageException {
+                ndeliveries++;
+                JsonTest.checkEquals(testMap, message);
+                passed = true;
+            }
+        };
+
+        serviceProperties.clear();
+        serviceProperties.put(QueueReader.QUEUE_NAME_PROP, queueName);
+
+        Mockito.when(bundleContext.getService(Mockito.eq(serviceReference))).thenReturn(queueReader);
+        jmsQueueManager.addReader(serviceReference);
+
+
+
+
+        waitForPassed(1000);
+        checkMessagesInQueue(queueName, 0);
+        waitForErrors(1000);
+
+        jmsQueueManager.removeReader(serviceReference);
+        assertEquals(1, ndeliveries);
+
+
+    }
+
+
+    private void waitForErrors(long t) throws InterruptedException {
+        Thread.sleep(t);
+    }
+
+    private boolean waitForPassed(long t) {
+        long end = System.currentTimeMillis() + t;
+        while(System.currentTimeMillis() < end) {
+            if (passed) {
+                return true;
+            } else {
+                Thread.yield();
+            }
+        }
+        LOGGER.info("Message not received after " + t + " ms");
+        return false;
+    }
+
+    private void checkMessagesInQueue(String name, int expected) throws JMSException {
+        Connection connection = amqConnectionFactoryService.getConnectionFactory().createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(name);
+        QueueBrowser browser = session.createBrowser(queue);
+        int n = 0;
+        for(Enumeration e = browser.getEnumeration(); e.hasMoreElements(); ) {
+            Message m = (Message) e.nextElement();
+            LOGGER.info("Message at {} is {} ", n,m);
+            n++;
+        }
+        browser.close();
+        session.close();
+        connection.stop();
+        assertEquals(expected, n);
+    }
+
+    @Test
+    public void testQueueReject() throws JMSException, InterruptedException {
+        // clean the queue out of messages from earlier tests, which may have failed.
+        final String queueName = "testQueueReject";
+        emptyQueue(queueName);
+        // make the test map unique, if the dequeue fails, then the message wont be the first.
+        testMap.put("testing", queueName + System.currentTimeMillis());
+        LOGGER.info("Sending message to queue");
+        jmsQueueManager.add(Types.queueName(queueName), testMap);
+        LOGGER.info("Sent message to queue ... receiving from queue");
+
+        checkMessagesInQueue(queueName, 1);
+
+        ndeliveries = 0;
+        QueueReader queueReader = new QueueReader() {
+            @Override
+            public void onMessage(Types.QueueName queueName, Map<String, Object> message) throws RequeueMessageException {
+                JsonTest.checkEquals(testMap, message);
+                ndeliveries++;
+                if ( ndeliveries == 1) {
+                    LOGGER.info("Requesting requeue of message");
+                    throw new RequeueMessageException("Requeing");
+                } else if ( ndeliveries == 2) {
+                    LOGGER.info("Got message, accepting with no retry.");
+                    passed = true;
+                } else if ( ndeliveries > 2) {
+                    fail("Multiple delivered");
+                }
+            }
+        };
+
+        serviceProperties.clear();
+        serviceProperties.put(QueueReader.QUEUE_NAME_PROP, queueName);
+
+        Mockito.when(bundleContext.getService(Mockito.eq(serviceReference))).thenReturn(queueReader);
+        jmsQueueManager.addReader(serviceReference);
+
+
+
+        waitForPassed(30000);
+
+        jmsQueueManager.removeReader(serviceReference);
+        checkMessagesInQueue(queueName, 0);
+        assertEquals(2, ndeliveries);
+
+
+    }
+
+    private void dumpQueue(String name) throws JMSException {
+        Connection connection = amqConnectionFactoryService.getConnectionFactory().createConnection();
+        connection.start();
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        Queue queue = session.createQueue(name);
+        QueueBrowser browser = session.createBrowser(queue);
+        LOGGER.info("Starting to dump queue {} ", name);
+        int n = 0;
+        for ( Enumeration messages = browser.getEnumeration();  messages.hasMoreElements(); ) {
+            Message m = (Message) messages.nextElement();
+            LOGGER.info("Message at {}  is {} ", n, m);
+            n++;
+        }
+        LOGGER.info("Done dump queue {} ", name);
+        browser.close();
+        session.close();
+        connection.stop();
+
+    }
+
+    private void emptyQueue(String name) throws JMSException {
+        dumpQueue(name);
+        Connection connection = amqConnectionFactoryService.getConnectionFactory().createConnection();
+        connection.start();
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        Queue queue = session.createQueue(name);
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        for (;;) {
+            Message m = consumer.receive(100);
+            if ( m == null) {
+                LOGGER.info("No more messages in queue {} ", name);
+                break;
+            }
+            LOGGER.info("Got message  {}",m);
+            m.acknowledge();
+            session.commit();
+        }
+        boolean shouldFail = false;
+        QueueBrowser browser = session.createBrowser(queue);
+        for ( Enumeration messages = browser.getEnumeration(); messages.hasMoreElements(); ) {
+            Message m = (Message) messages.nextElement();
+            LOGGER.info("Queued message {} ", m);
+            shouldFail = true;
+        }
+        browser.close();
+        if ( shouldFail) {
+            fail("Queue was not emptied as expected");
+        }
+        consumer.close();
+        session.close();
+        connection.stop();
+    }
+
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/jms/JMSTopicManagerTest.java b/src/test/java/org/apache/sling/jms/JMSTopicManagerTest.java
new file mode 100644
index 0000000..71004e7
--- /dev/null
+++ b/src/test/java/org/apache/sling/jms/JMSTopicManagerTest.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.jms;
+
+import org.apache.sling.amq.ActiveMQConnectionFactoryService;
+import org.apache.sling.amq.ActiveMQConnectionFactoryServiceTest;
+import org.apache.sling.mom.*;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.*;
+
+/**
+ * Created by ieb on 31/03/2016.
+ */
+public class JMSTopicManagerTest {
+
+    private static final long MESSAGE_LATENCY = 1000;
+    private static final Logger LOGGER = LoggerFactory.getLogger(JMSTopicManagerTest.class);
+    private JMSTopicManager jsmTopicManager;
+    private ActiveMQConnectionFactoryService amqConnectionFactoryService;
+    private Map<String, Object> testMap;
+    private boolean passed;
+    private long lastSent;
+    @Mock
+    private ServiceReference<Subscriber> serviceReference;
+    @Mock
+    private Bundle bundle;
+    @Mock
+    private BundleContext bundleContext;
+    private Map<String, Object> serviceProperties = new HashMap<String, Object>();
+
+    public  JMSTopicManagerTest() {
+        MockitoAnnotations.initMocks(this);
+    }
+
+    @Before
+    public void before() throws NoSuchFieldException, IllegalAccessException, JMSException {
+        Mockito.when(serviceReference.getBundle()).thenReturn(bundle);
+        Mockito.when(bundle.getBundleContext()).thenReturn(bundleContext);
+        Mockito.when(serviceReference.getPropertyKeys()).thenAnswer(new Answer<String[]>() {
+            @Override
+            public String[] answer(InvocationOnMock invocationOnMock) throws Throwable {
+                return (String[]) serviceProperties.keySet().toArray(new String[serviceProperties.size()]);
+            }
+        });
+        Mockito.when(serviceReference.getProperty(Mockito.anyString())).thenAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+                return serviceProperties.get(invocationOnMock.getArguments()[0]);
+            }
+        });
+        amqConnectionFactoryService = ActiveMQConnectionFactoryServiceTest.activate(null);
+        jsmTopicManager = JMSTopicManagerTest.activate(amqConnectionFactoryService);
+        testMap = JsonTest.createTestMap();
+        passed = false;
+    }
+
+    public static JMSTopicManager activate(ActiveMQConnectionFactoryService amqConnectionFactoryService) throws NoSuchFieldException, IllegalAccessException, JMSException {
+        JMSTopicManager jsmTopicManager = new JMSTopicManager();
+        setPrivate(jsmTopicManager, "connectionFactoryService", amqConnectionFactoryService);
+        jsmTopicManager.activate(new HashMap<String, Object>());
+        return jsmTopicManager;
+
+    }
+
+    private static void setPrivate(Object object, String name, Object value) throws NoSuchFieldException, IllegalAccessException {
+        Field field = object.getClass().getDeclaredField(name);
+        if ( !field.isAccessible()) {
+            field.setAccessible(true);
+        }
+        field.set(object, value);
+    }
+
+    @After
+    public void after() throws JMSException {
+        JMSTopicManagerTest.deactivate(jsmTopicManager);
+        ActiveMQConnectionFactoryServiceTest.deactivate(amqConnectionFactoryService);
+    }
+
+    public static void deactivate(JMSTopicManager jsmTopicManager) throws JMSException {
+        jsmTopicManager.deactivate(new HashMap<String, Object>());
+    }
+
+
+    /**
+     * Test a working publish operation, read the message and check all ok. Will try and read the message for 1s. Normally messages
+     * arrive within 15ms.
+     * @throws Exception
+     */
+    @Test
+    public void testPublish() throws Exception {
+        // make the test map unique.
+        testMap.put("testing", "testPublish" + System.currentTimeMillis());
+
+        addSubscriber(new String[]{"testtopic"}, true);
+
+        jsmTopicManager.publish(Types.topicName("testtopic"), Types.commandName("testcommand"), testMap);
+        lastSent = System.currentTimeMillis();
+        assertTrue(waitForPassed(MESSAGE_LATENCY));
+
+        removeSubscriber();
+    }
+
+
+    private void addSubscriber(String[] topics, boolean match) {
+
+        Subscriber subscriber = new TestingSubscriber(this, match, topics);
+
+        serviceProperties.clear();
+        serviceProperties.put(Subscriber.TOPIC_NAMES_PROP, topics);
+
+        Mockito.when(bundleContext.getService(Mockito.eq(serviceReference))).thenReturn(subscriber);
+        jsmTopicManager.addSubscriber(serviceReference);
+
+    }
+
+    private void removeSubscriber() {
+        jsmTopicManager.removeSubscriber(serviceReference);
+    }
+
+
+    /**
+     * Test that a message sent with the wrong topic doesn't arrive, filtered by the topic inside the jmsTopicManager.
+     * @throws Exception
+     */
+    @Test
+    public void testFilterdByTopic() throws Exception {
+        // make the test map unique.
+        testMap.put("testing", "testFilterdByTopic" + System.currentTimeMillis());
+        addSubscriber(new String[]{"testtopic"}, false);
+
+        lastSent = System.currentTimeMillis();
+        assertFalse(waitForPassed(MESSAGE_LATENCY)); // not expecting a message at all
+
+        removeSubscriber();
+    }
+
+    /**
+     * Check that a message sent to the correct topic is filtered by the MessageFilter.
+     * The test waits 1s for the message to arrive. If testPublish does not fail, message
+     * latency is < 1s.
+     * @throws Exception
+     */
+    @Test
+    public void testFilterdByFilter() throws Exception {
+        // make the test map unique.
+        testMap.put("testing", "testFilterdByFilter" + System.currentTimeMillis());
+        addSubscriber(new String[]{"testtopic"}, false);
+
+        jsmTopicManager.publish(Types.topicName("testtopic"), Types.commandName("testcommand"), testMap);
+        lastSent = System.currentTimeMillis();
+        assertFalse(waitForPassed(MESSAGE_LATENCY)); // not expecting a message at all
+
+        removeSubscriber();
+    }
+
+
+    private boolean waitForPassed(long t) {
+        long end = System.currentTimeMillis() + t;
+        while(System.currentTimeMillis() < end) {
+            if (passed) {
+                return true;
+            } else {
+                Thread.yield();
+            }
+        }
+        LOGGER.info("Message not recieved after "+t+" ms");
+        return false;
+    }
+
+
+    private static class TestingSubscriber implements Subscriber, MessageFilter {
+        private JMSTopicManagerTest test;
+        private final boolean accept;
+        private final Set<Types.Name> topicnames;
+
+        public TestingSubscriber(JMSTopicManagerTest test, boolean accept, String[] topicname) {
+            this.test = test;
+            this.accept = accept;
+            this.topicnames = new HashSet<Types.Name>();
+            for(String t : topicname) {
+                topicnames.add(Types.topicName(t));
+            }
+        }
+
+        @Override
+        public void onMessage(Types.TopicName topic, Map<String, Object> message) {
+            LOGGER.info("Got message in "+(System.currentTimeMillis()-test.lastSent)+" ms");
+            JsonTest.checkEquals(test.testMap, message);
+            test.passed = true;
+        }
+
+        @Override
+        public boolean accept(Types.Name name, Map<String, Object> mapMessage) {
+            return topicnames.contains(name) == accept;
+        }
+
+
+    }
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/jms/JsonTest.java b/src/test/java/org/apache/sling/jms/JsonTest.java
new file mode 100644
index 0000000..6880c5b
--- /dev/null
+++ b/src/test/java/org/apache/sling/jms/JsonTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.jms;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.*;
+
+import  org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by ieb on 31/03/2016.
+ */
+public class JsonTest {
+
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(JsonTest.class);
+    private Map<String, Object> testMap;
+
+    @Before
+    public void setup() {
+        testMap = JsonTest.createTestMap();
+    }
+
+    public static Map<String,Object> createTestMap() {
+        Map<String, Object> testMap = new HashMap<String, Object>();
+        Map<String, Object> innerTestMap = new HashMap<String, Object>();
+        Map<String, Object> inner2TestMap = new HashMap<String, Object>();
+        Map<String, Object> listMap = new HashMap<String, Object>();
+
+        listMap.put("listMaplong",100L);
+        listMap.put("listMapboolean",true);
+        listMap.put("listMapstring","A String");
+        listMap.put("listMapdouble",1.001D);
+
+        testMap.put("long",100L);
+        testMap.put("boolean",true);
+        testMap.put("string","A String");
+        testMap.put("double",1.001D);
+        testMap.put("map",innerTestMap);
+        innerTestMap.put("innerlong",100L);
+        innerTestMap.put("innerboolean",true);
+        innerTestMap.put("innerstring","A String");
+        innerTestMap.put("innerdouble",1.001D);
+        innerTestMap.put("innermap",inner2TestMap);
+        inner2TestMap.put("inner2long",100L);
+        inner2TestMap.put("inner3boolean",true);
+        inner2TestMap.put("inner3string","A String");
+        inner2TestMap.put("inner3double",1.001D);
+        inner2TestMap.put("inner3list", Arrays.asList("string1","string2", "string2"));
+        inner2TestMap.put("inner3listofMaps", Arrays.asList(listMap, listMap, listMap));
+        return testMap;
+    }
+
+    @Test
+    public void testJson() throws Exception {
+        checkEquals(testMap, Json.toMap(Json.toJson(testMap)));
+
+    }
+
+    public static void checkEquals(Map<String, Object> expected, Map<String, Object> actual) {
+        LOGGER.info("Expected {}", expected);
+        LOGGER.info("Actual   {}", actual);
+        for(Map.Entry<String, Object> e : expected.entrySet()) {
+            if ( e.getValue() instanceof Map ) {
+                checkEquals((Map<String, Object>) e.getValue(), (Map<String, Object>) actual.get(e.getKey()));
+            } else if ( e.getValue() instanceof List ) {
+                checkEquals((List<Object>) e.getValue(), (List<Object>) actual.get(e.getKey()));
+            } else {
+                if ( e.getValue() == null && actual.get(e.getKey()) != null ) {
+                    LOGGER.info("Expected value for {}  is null but actual is {} ",  e.getKey(), actual.get(e.getKey()));
+                }
+                if ( e.getValue() != null && !e.getValue().equals(actual.get(e.getKey()))) {
+                    LOGGER.info("Expected value for {}  is {} but actual is {}",  new Object[]{e.getKey(), e.getValue(), actual.get(e.getKey())});
+                    LOGGER.info("Expected value for {}  is {} but actual is {}",  new Object[]{e.getKey(), e.getValue().getClass(), actual.get(e.getKey()).getClass()});
+                }
+                Assert.assertEquals(e.getValue(), actual.get(e.getKey()));
+            }
+        }
+        LOGGER.info("Maps equal ok");
+    }
+
+    private static void checkEquals(List<Object> expected, List<Object> actual) {
+        Assert.assertEquals(expected.size(), actual.size());
+        for (int i = 0; i < expected.size(); i++) {
+            Object e = expected.get(i);
+            Object a = actual.get(i);
+            if ( e instanceof Map ) {
+                checkEquals((Map<String, Object>) e, (Map<String, Object>) a);
+            } else if ( e instanceof List ) {
+                checkEquals((List<Object>) e, (List<Object>) a);
+            } else {
+                Assert.assertEquals(e,a);
+            }
+        }
+    }
+
+}
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
"commits@sling.apache.org" <co...@sling.apache.org>.

[sling-org-apache-sling-jms] 03/08: SLING-5647 - Provide ActiveMQ implementation of the MoM API in SLING-5646

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rombert pushed a commit to annotated tag org.apache.sling.jms-1.0.0
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-jms.git

commit 47f7bfd5cf7b25920460d94039b695de3073f969
Author: Chetan Mehrotra <ch...@apache.org>
AuthorDate: Wed Sep 21 13:28:18 2016 +0000

    SLING-5647 - Provide ActiveMQ implementation of the MoM API in SLING-5646
    
    Map passed in add call may be immutable so add internal state on a copy and then ensure that such internal props are not passed to reader.
    
    Also fix the case around casting of numtries
    
    git-svn-id: https://svn.apache.org/repos/asf/sling/trunk/contrib/commons/mom/jms@1761729 13f79535-47bb-0310-9956-ffa450edef68
---
 .../java/org/apache/sling/jms/JMSQueueManager.java    | 19 +++++++++++++++----
 .../org/apache/sling/jms/JMSQueueManagerTest.java     |  3 ++-
 2 files changed, 17 insertions(+), 5 deletions(-)

diff --git a/src/main/java/org/apache/sling/jms/JMSQueueManager.java b/src/main/java/org/apache/sling/jms/JMSQueueManager.java
index 7c7b83b..4fcc904 100644
--- a/src/main/java/org/apache/sling/jms/JMSQueueManager.java
+++ b/src/main/java/org/apache/sling/jms/JMSQueueManager.java
@@ -30,6 +30,7 @@ import java.io.Closeable;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -46,6 +47,7 @@ public class JMSQueueManager implements QueueManager {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(JMSQueueManager.class);
     private static final String NRETRIES = "_nr";
+    private static final Set<String> INTERNAL_PROPS = Collections.singleton(NRETRIES);
 
     @Reference
     private ConnectionFactoryService connectionFactoryService;
@@ -92,8 +94,10 @@ public class JMSQueueManager implements QueueManager {
         Session session = null;
         try {
             session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-            message.put(NRETRIES, 0L); // set the number of retries to 0.
-            TextMessage textMessage = session.createTextMessage(Json.toJson(message));
+            //TODO Instead of copy do addition at JSON writer level
+            Map<String, Object> msgCopy = new HashMap<>(message);
+            msgCopy.put(NRETRIES, 0L); // set the number of retries to 0.
+            TextMessage textMessage = session.createTextMessage(Json.toJson(msgCopy));
             textMessage.setJMSType(JMSMessageTypes.JSON.toString());
             LOGGER.info("Sending to {} message {} ", name, textMessage);
             session.createProducer(session.createQueue(name.toString())).send(textMessage);
@@ -189,6 +193,13 @@ public class JMSQueueManager implements QueueManager {
         }
     }
 
+    private static Map<String,Object> filter(Map<String, Object> map) {
+        //Filter out internal properties
+        for (String internalKey : INTERNAL_PROPS){
+            map.remove(internalKey);
+        }
+        return map;
+    }
 
     public static class JMSQueueSession implements Closeable, MessageListener {
         private static final Logger LOGGER = LoggerFactory.getLogger(JMSQueueSession.class);
@@ -248,7 +259,7 @@ public class JMSQueueManager implements QueueManager {
                             final Map<String, Object> mapMessage = Json.toMap(textMessage.getText());
                             Types.QueueName queueName = Types.queueName(queue.getQueueName());
                             if (queueName.equals(name) && messageFilter.accept(queueName, mapMessage)) {
-                                queueReader.onMessage(queueName, mapMessage);
+                                queueReader.onMessage(queueName, filter(mapMessage));
                                 session.commit();
                                 // all ok.
                                 committed = true;
@@ -260,7 +271,7 @@ public class JMSQueueManager implements QueueManager {
                     LOGGER.info("QueueReader requested requeue of message ", e);
                     if (retryByRequeue && textMessage != null) {
                         Map<String, Object> mapMessage = Json.toMap(textMessage.getText());
-                        if ((int)mapMessage.get(NRETRIES) < maxRetries) {
+                        if ((long)mapMessage.get(NRETRIES) < maxRetries) {
                             mapMessage.put(NRETRIES, ((long) mapMessage.get(NRETRIES)) + 1);
                             TextMessage retryMessage = session.createTextMessage(Json.toJson(mapMessage));
                             retryMessage.setJMSType(JMSMessageTypes.JSON.toString());
diff --git a/src/test/java/org/apache/sling/jms/JMSQueueManagerTest.java b/src/test/java/org/apache/sling/jms/JMSQueueManagerTest.java
index ddf20a7..4c3aa36 100644
--- a/src/test/java/org/apache/sling/jms/JMSQueueManagerTest.java
+++ b/src/test/java/org/apache/sling/jms/JMSQueueManagerTest.java
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.jms.*;
 import java.lang.reflect.Field;
+import java.util.Collections;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Map;
@@ -204,7 +205,7 @@ public class JMSQueueManagerTest {
         // make the test map unique, if the dequeue fails, then the message wont be the first.
         testMap.put("testing", queueName + System.currentTimeMillis());
         LOGGER.info("Sending message to queue");
-        jmsQueueManager.add(Types.queueName(queueName), testMap);
+        jmsQueueManager.add(Types.queueName(queueName), Collections.unmodifiableMap(testMap));
         LOGGER.info("Sent message to queue ... receiving from queue");
 
         checkMessagesInQueue(queueName, 1);

-- 
To stop receiving notification emails like this one, please contact
"commits@sling.apache.org" <co...@sling.apache.org>.