You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/10/06 19:41:13 UTC

[5/6] git commit: start queues

start queues

moving files

added fig

queues!!!

validating credentials

moving dependencies

adding serialization

change formatting

dependency issues, removing old queueing mechanism

tests passing

yaml version for aws

organization

adding wait time

fixing tests

better logging

bad variable


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/53810396
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/53810396
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/53810396

Branch: refs/heads/sqs_queues
Commit: 5381039658274b9e584bb98eb86249d7fed157ce
Parents: 9c4b26e
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Oct 3 10:34:19 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Oct 6 11:35:02 2014 -0600

----------------------------------------------------------------------
 stack/core/pom.xml                              |  15 ++
 .../usergrid/corepersistence/GuiceModule.java   |   4 +
 stack/corepersistence/map/pom.xml               |  16 ++
 stack/corepersistence/pom.xml                   |  17 ++
 stack/corepersistence/queue/pom.xml             |  92 ++++++++
 .../usergrid/persistence/queue/Queue.java       |  31 +++
 .../usergrid/persistence/queue/QueueFig.java    |  16 ++
 .../persistence/queue/QueueManager.java         |  40 ++++
 .../persistence/queue/QueueManagerFactory.java  |  23 ++
 .../persistence/queue/QueueMessage.java         |  42 ++++
 .../usergrid/persistence/queue/QueueScope.java  |  31 +++
 .../persistence/queue/guice/QueueModule.java    |  51 +++++
 .../persistence/queue/impl/QueueScopeImpl.java  |  87 ++++++++
 .../queue/impl/SQSQueueManagerImpl.java         | 222 +++++++++++++++++++
 .../persistence/queue/QueueManagerTest.java     |  81 +++++++
 .../queue/guice/TestQueueModule.java            |  33 +++
 stack/pom.xml                                   |   8 +-
 .../notifications/ApplicationQueueManager.java  |  96 ++++----
 .../notifications/ApplicationQueueMessage.java  |  67 ++----
 .../notifications/NotificationsService.java     |  15 +-
 .../services/notifications/QueueListener.java   |  69 +++---
 .../services/notifications/QueueManager.java    |  31 ---
 .../services/notifications/TaskManager.java     |  78 +++----
 .../apns/NotificationsServiceIT.java            |  47 ++--
 .../gcm/NotificationsServiceIT.java             |   2 +-
 25 files changed, 973 insertions(+), 241 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/core/pom.xml
----------------------------------------------------------------------
diff --git a/stack/core/pom.xml b/stack/core/pom.xml
index 6cd5dac..30a14d1 100644
--- a/stack/core/pom.xml
+++ b/stack/core/pom.xml
@@ -526,6 +526,21 @@
 	    <type>jar</type>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.usergrid</groupId>
+      <artifactId>map</artifactId>
+      <version>2.0.0-SNAPSHOT</version>
+      <type>jar</type>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.usergrid</groupId>
+      <artifactId>queue</artifactId>
+      <version>2.0.0-SNAPSHOT</version>
+      <type>jar</type>
+    </dependency>
+
+
     <!--<dependency>-->
       <!--<artifactId>lucene-core</artifactId>-->
       <!--<groupId>org.apache.lucene</groupId>-->

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
index 1f3d615..42f81d5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
@@ -17,6 +17,8 @@
 package org.apache.usergrid.corepersistence;
 
 
+import org.apache.usergrid.persistence.map.guice.MapModule;
+import org.apache.usergrid.persistence.queue.guice.QueueModule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,6 +43,8 @@ public class GuiceModule  extends AbstractModule {
         install(new CollectionModule());
         install(new GraphModule());
         install(new IndexModule());
+        install(new MapModule());
+        install(new QueueModule());
 
         bind(CpEntityDeleteListener.class).asEagerSingleton();
         bind(CpEntityIndexDeleteListener.class).asEagerSingleton();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/corepersistence/map/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/pom.xml b/stack/corepersistence/map/pom.xml
index 93b1030..e9cb5ab 100644
--- a/stack/corepersistence/map/pom.xml
+++ b/stack/corepersistence/map/pom.xml
@@ -1,4 +1,20 @@
 <?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
 <project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/corepersistence/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/pom.xml b/stack/corepersistence/pom.xml
index 5e6bf01..7482271 100644
--- a/stack/corepersistence/pom.xml
+++ b/stack/corepersistence/pom.xml
@@ -1,4 +1,20 @@
 <?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
 <project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
@@ -68,6 +84,7 @@
         <module>queryindex</module>
         <module>common</module>
         <module>map</module>
+        <module>queue</module>
     </modules>
 
     <build>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/corepersistence/queue/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/pom.xml b/stack/corepersistence/queue/pom.xml
new file mode 100644
index 0000000..94f11a8
--- /dev/null
+++ b/stack/corepersistence/queue/pom.xml
@@ -0,0 +1,92 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+~ Licensed to the Apache Software Foundation (ASF) under one
+~ or more contributor license agreements.  See the NOTICE file
+~ distributed with this work for additional information
+~ regarding copyright ownership.  The ASF licenses this file
+~ to you under the Apache License, Version 2.0 (the
+~ "License"); you may not use this file except in compliance
+~ with the License.  You may obtain a copy of the License at
+~
+~    http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing,
+~ software distributed under the License is distributed on an
+~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+~ KIND, either express or implied.  See the License for the
+~ specific language governing permissions and limitations
+~ under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>persistence</artifactId>
+    <groupId>org.apache.usergrid</groupId>
+    <version>2.0.0-SNAPSHOT</version>
+  </parent>
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>queue</artifactId>
+
+  <name>Usergrid Queue</name>
+
+  <dependencies>
+
+
+    <dependency>
+      <groupId>org.apache.usergrid</groupId>
+      <artifactId>common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+
+
+    <!-- lang utils for setting uuids etc -->
+
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <version>${commons.lang.version}</version>
+    </dependency>
+
+    <!-- tests -->
+
+    <dependency>
+      <groupId>org.apache.usergrid</groupId>
+      <artifactId>common</artifactId>
+      <version>${project.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+
+
+    <dependency>
+      <groupId>org.jukito</groupId>
+      <artifactId>jukito</artifactId>
+      <version>${jukito.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+
+    <dependency>
+      <groupId>org.apache.usergrid</groupId>
+      <artifactId>collection</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>com.amazonaws</groupId>
+      <artifactId>aws-java-sdk</artifactId>
+      <version>1.8.11</version>
+    </dependency>
+
+
+  </dependencies>
+
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/Queue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/Queue.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/Queue.java
new file mode 100644
index 0000000..2cc49aa
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/Queue.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.queue;
+
+
+public class Queue {
+    private final String url;
+
+    public Queue(String url) {
+        this.url = url;
+    }
+
+    public String getUrl(){
+        return url;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
new file mode 100644
index 0000000..fd71f9e
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
@@ -0,0 +1,16 @@
+package org.apache.usergrid.persistence.queue;
+
+import org.safehaus.guicyfig.Default;
+import org.safehaus.guicyfig.FigSingleton;
+import org.safehaus.guicyfig.GuicyFig;
+import org.safehaus.guicyfig.Key;
+
+@FigSingleton
+public interface QueueFig extends GuicyFig {
+
+    @Key( "queue.region" )
+    @Default("us-east-1")
+    public String getRegion();
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
new file mode 100644
index 0000000..3509e4e
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.queue;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+
+public interface QueueManager {
+
+    Queue createQueue( );
+
+    Queue getQueue();
+
+    List<QueueMessage> getMessages(int limit,int transactionTimeout, int waitTime);
+
+    void commitMessage( QueueMessage queueMessage);
+
+    void commitMessages( List<QueueMessage> queueMessages);
+
+    void sendMessages(List<Serializable> bodies) throws IOException;
+
+    void sendMessage(Serializable body)throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManagerFactory.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManagerFactory.java
new file mode 100644
index 0000000..4cdb5e2
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManagerFactory.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.queue;
+
+public interface QueueManagerFactory {
+    public QueueManager getQueueManager( final QueueScope scope );
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
new file mode 100644
index 0000000..1aef3a3
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.queue;
+
+public class QueueMessage {
+    private final Object body;
+    private final String messageId;
+    private final String handle;
+
+    public QueueMessage(String messageId, String handle, Object body) {
+        this.body = body;
+        this.messageId = messageId;
+        this.handle = handle;
+    }
+
+    public String getHandle() {
+        return handle;
+    }
+
+    public Object getBody(){
+        return body;
+    }
+
+    public String getMessageId() {
+        return messageId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScope.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScope.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScope.java
new file mode 100644
index 0000000..b2b2ec6
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScope.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.queue;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+
+public interface QueueScope extends ApplicationScope {
+
+    /**
+     * Get the name of the the map
+     * @return
+     */
+    public String getName();
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
new file mode 100644
index 0000000..e8fc7c8
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.queue.guice;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+import org.apache.usergrid.persistence.queue.QueueFig;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+import org.apache.usergrid.persistence.queue.impl.SQSQueueManagerImpl;
+import org.safehaus.guicyfig.GuicyFigModule;
+
+
+/**
+ * Simple module for wiring our collection api
+ *
+ * @author tnine
+ */
+public class QueueModule extends AbstractModule {
+
+
+    @Override
+    protected void configure() {
+
+        install( new GuicyFigModule( QueueFig.class) );
+        // create a guice factory for getting our collection manager
+        install( new FactoryModuleBuilder().implement( QueueManager.class, SQSQueueManagerImpl.class )
+                                           .build( QueueManagerFactory.class ) );
+
+    }
+
+
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeImpl.java
new file mode 100644
index 0000000..d78a66d
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeImpl.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.queue.impl;
+
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.queue.QueueScope;
+
+/**
+ * Created by ApigeeCorporation on 10/3/14.
+ */
+public class QueueScopeImpl implements QueueScope {
+    private final Id owner;
+    private final String name;
+
+
+    public QueueScopeImpl( final Id owner, final String name ) {
+        this.owner = owner;
+        this.name = name;
+    }
+
+
+
+    @Override
+    public Id getApplication() {
+        return owner;
+    }
+
+
+    @Override
+    public String getName() {
+        return name;
+    }
+
+
+    @Override
+    public boolean equals( final Object o ) {
+        if ( this == o ) {
+            return true;
+        }
+        if ( !( o instanceof QueueScopeImpl ) ) {
+            return false;
+        }
+
+        final QueueScopeImpl queueScope = ( QueueScopeImpl ) o;
+
+        if ( !name.equals( queueScope.name ) ) {
+            return false;
+        }
+        if ( !owner.equals( queueScope.owner ) ) {
+            return false;
+        }
+
+        return true;
+    }
+
+
+    @Override
+    public int hashCode() {
+        int result = owner.hashCode();
+        result = 31 * result + name.hashCode();
+        return result;
+    }
+
+
+    @Override
+    public String toString() {
+        return "QueueScopeImpl{" +
+                "owner=" + owner +
+                ", name='" + name + '\'' +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
new file mode 100644
index 0000000..cf6ff45
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.queue.impl;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.SDKGlobalConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
+import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.sqs.AmazonSQSClient;
+import com.amazonaws.services.sqs.model.*;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import org.apache.commons.lang.StringUtils;
+import org.apache.usergrid.persistence.queue.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.util.Base64Coder;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.List;
+
+public class SQSQueueManagerImpl implements QueueManager {
+    private static final Logger LOG = LoggerFactory.getLogger(SQSQueueManagerImpl.class);
+
+    private final AmazonSQSClient sqs;
+    private final QueueScope scope;
+    private final QueueFig fig;
+    private Queue queue;
+
+    @Inject
+    public SQSQueueManagerImpl(@Assisted QueueScope scope, QueueFig fig){
+        this.fig = fig;
+        this.scope = scope;
+        UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
+        this.sqs = new AmazonSQSClient(ugProvider.getCredentials());
+        Regions regions = Regions.fromName(fig.getRegion());
+        Region region = Region.getRegion(regions);
+        sqs.setRegion(region);
+    }
+
+
+    public Queue createQueue(){
+        String name = getName();
+        CreateQueueRequest createQueueRequest = new CreateQueueRequest()
+                .withQueueName(name);
+        CreateQueueResult result = sqs.createQueue(createQueueRequest);
+        String url = result.getQueueUrl();
+        LOG.info("Created queue with url {}",url);
+        return new Queue(url);
+    }
+
+    private String getName() {
+        String name = scope.getApplication().getType() + scope.getApplication().getUuid().toString() + scope.getName();
+        return name;
+    }
+
+    public Queue getQueue(){
+        if(queue == null) {
+            ListQueuesResult result =  sqs.listQueues();
+            for (String queueUrl : result.getQueueUrls()) {
+                boolean found = queueUrl.contains(getName());
+                if (found) {
+                    queue = new Queue(queueUrl);
+                    break;
+                }
+            }
+        }
+        if(queue == null) {
+            queue = createQueue();
+        }
+        return queue;
+    }
+
+    public void sendMessage(Serializable body) throws IOException{
+        String url = getQueue().getUrl();
+        LOG.info("Sending Message...{} to {}",body.toString(),url);
+        SendMessageRequest request = new SendMessageRequest(url,toString(body));
+        sqs.sendMessage(request);
+    }
+
+
+    public void sendMessages(List<Serializable> bodies) throws IOException{
+        String url = getQueue().getUrl();
+        LOG.info("Sending Messages...{} to {}",bodies.size(),url);
+
+        SendMessageBatchRequest request = new SendMessageBatchRequest(url);
+        List<SendMessageBatchRequestEntry> entries = new ArrayList<>(bodies.size());
+        for(Serializable body : bodies){
+            SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry();
+            entry.setMessageBody(toString(body));
+            entries.add(entry);
+        }
+        request.setEntries(entries);
+        sqs.sendMessageBatch(request);
+    }
+
+    public  List<QueueMessage> getMessages( int limit,int timeout, int waitTime) {
+        waitTime = waitTime/1000;
+        String url = getQueue().getUrl();
+        LOG.info("Getting {} messages from {}",limit,url);
+        ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(url);
+        receiveMessageRequest.setMaxNumberOfMessages(limit);
+        receiveMessageRequest.setVisibilityTimeout(timeout);
+        receiveMessageRequest.setWaitTimeSeconds(waitTime);
+        ReceiveMessageResult result = sqs.receiveMessage(receiveMessageRequest);
+        List<Message> messages = result.getMessages();
+        LOG.info("Received {} messages from {}",messages.size(),url);
+        List<QueueMessage> queueMessages = new ArrayList<>(messages.size());
+        for (Message message : messages) {
+            Object body ;
+            try{
+                body = fromString(message.getBody());
+            }catch (Exception e){
+                LOG.error("failed to deserialize message", e);
+                body  = message.getBody();
+            }
+            QueueMessage queueMessage = new QueueMessage(message.getMessageId(),message.getReceiptHandle(),body);
+            queueMessages.add(queueMessage);
+        }
+        return queueMessages;
+    }
+
+    public void commitMessage( QueueMessage queueMessage){
+        String url = getQueue().getUrl();
+        LOG.info("Commit message {} to queue {}",queueMessage.getMessageId(),url);
+
+        sqs.deleteMessage(new DeleteMessageRequest()
+                .withQueueUrl(url)
+                .withReceiptHandle(queueMessage.getHandle()));
+    }
+
+    public void commitMessages( List<QueueMessage> queueMessages){
+        String url = getQueue().getUrl();
+        LOG.info("Commit messages {} to queue {}",queueMessages.size(),url);
+        List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>();
+        for(QueueMessage message : queueMessages){
+            entries.add(new DeleteMessageBatchRequestEntry(message.getMessageId(),message.getHandle()));
+        }
+        DeleteMessageBatchRequest request = new DeleteMessageBatchRequest(url,entries);
+        DeleteMessageBatchResult result = sqs.deleteMessageBatch(request);
+        boolean successful = result.getFailed().size() > 0;
+        if(!successful){
+            LOG.error("Commit failed {} messages",result.getFailed().size());
+        }
+    }
+
+    /** Read the object from Base64 string. */
+    private static Object fromString( String s ) throws IOException, ClassNotFoundException {
+        byte [] data = Base64Coder.decode(s.toCharArray());
+        ObjectInputStream ois = new ObjectInputStream(
+                new ByteArrayInputStream(  data ) );
+        Object o  = ois.readObject();
+        ois.close();
+        return o;
+    }
+
+    /** Write the object to a Base64 string. */
+    private static String toString( Serializable o ) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutputStream oos = new ObjectOutputStream( baos );
+        oos.writeObject( o );
+        oos.close();
+        return new String( Base64Coder.encode( baos.toByteArray() ) );
+    }
+
+    public class UsergridAwsCredentialsProvider implements AWSCredentialsProvider {
+
+        private AWSCredentials creds;
+
+        public  UsergridAwsCredentialsProvider(){
+            init();
+        }
+
+        private void init() {
+            creds = new AWSCredentials() {
+                @Override
+                public String getAWSAccessKeyId() {
+                    return StringUtils.trim(System.getProperty(SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR));
+                }
+
+                @Override
+                public String getAWSSecretKey() {
+                    return StringUtils.trim(System.getProperty(SDKGlobalConfiguration.SECRET_KEY_ENV_VAR));
+                }
+            };
+            if(StringUtils.isEmpty(creds.getAWSAccessKeyId()) || StringUtils.isEmpty(creds.getAWSSecretKey()) ){
+                throw new AmazonClientException("could not retrieve credentials from system properties");
+            }
+        }
+
+        @Override
+        public AWSCredentials getCredentials() {
+            return creds;
+        }
+
+
+        @Override
+        public void refresh() {
+            init();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
new file mode 100644
index 0000000..772b75e
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.queue;
+
+import org.apache.usergrid.persistence.collection.util.InvalidEntityGenerator;
+import org.apache.usergrid.persistence.queue.guice.TestQueueModule;
+import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
+import org.jukito.UseModules;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.usergrid.persistence.core.cassandra.ITRunner;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+
+import com.google.inject.Inject;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+@RunWith( ITRunner.class )
+@UseModules( { TestQueueModule.class } )
+public class QueueManagerTest {
+
+
+    @Inject
+    protected QueueManagerFactory qmf;
+
+    protected QueueScope scope;
+    private QueueManager qm;
+
+
+    @Before
+    public void mockApp() {
+        this.scope = new QueueScopeImpl( new SimpleId( "application" ), "testQueue" );
+        qm = qmf.getQueueManager(scope);
+    }
+
+    @Ignore("need aws creds")
+    @Test
+    public void get() {
+        Queue queue = qm.getQueue();
+        assertNotNull(queue);
+    }
+    @Ignore("need aws creds")
+    @Test
+    public void send() throws IOException{
+        String value = "bodytest";
+        qm.sendMessage(value);
+        List<QueueMessage> messageList = qm.getMessages(1,5000,5000);
+        assertTrue(messageList.size() >= 1);
+        for(QueueMessage message : messageList){
+            assertTrue(message.getBody().equals(value));
+            qm.commitMessage(message);
+        }
+        messageList = qm.getMessages(1,5000,5000);
+        assertTrue(messageList.size() <= 0);
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/guice/TestQueueModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/guice/TestQueueModule.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/guice/TestQueueModule.java
new file mode 100644
index 0000000..b65725f
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/guice/TestQueueModule.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.queue.guice;
+
+
+import org.apache.usergrid.persistence.collection.guice.TestModule;
+import org.apache.usergrid.persistence.core.guice.CommonModule;
+
+
+
+public class TestQueueModule extends TestModule {
+
+    @Override
+    protected void configure() {
+        install( new CommonModule());
+        install( new QueueModule() );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/pom.xml
----------------------------------------------------------------------
diff --git a/stack/pom.xml b/stack/pom.xml
index e105c79..b1bfe3c 100644
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@ -111,7 +111,7 @@
     <org.springframework.version>3.1.2.RELEASE</org.springframework.version>
     <shiro-version>1.2.0</shiro-version>
     <slf4j-version>1.6.1</slf4j-version>
-    <snakeyaml-version>1.8</snakeyaml-version>
+    <snakeyaml-version>1.6</snakeyaml-version>
     <tomcat-version>7.0.52</tomcat-version>
     <antlr.version>3.4</antlr.version>
     <tika.version>1.4</tika.version>
@@ -276,7 +276,7 @@
       <dependency>
         <groupId>org.apache.httpcomponents</groupId>
         <artifactId>httpclient</artifactId>
-        <version>4.1.3</version>
+        <version>4.2</version>
         <exclusions>
           <exclusion>
             <groupId>commons-codec</groupId>
@@ -457,6 +457,10 @@
         <version>${cassandra-version}</version>
         <exclusions>
           <exclusion>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+          </exclusion>
+          <exclusion>
             <groupId>commons-lang</groupId>
             <artifactId>commons-lang</artifactId>
           </exclusion>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
index c88cead..60c1602 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
@@ -27,6 +27,8 @@ import org.apache.usergrid.persistence.entities.Notification;
 import org.apache.usergrid.persistence.entities.Notifier;
 import org.apache.usergrid.persistence.entities.Receipt;
 import org.apache.usergrid.persistence.index.query.Query;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.QueueMessage;
 import org.apache.usergrid.services.notifications.apns.APNsAdapter;
 import org.apache.usergrid.services.notifications.gcm.GCMAdapter;
 import org.slf4j.Logger;
@@ -42,13 +44,10 @@ import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
 
-/**
- * Created by ApigeeCorporation on 8/27/14.
- */
-public class ApplicationQueueManager implements QueueManager {
+public class ApplicationQueueManager  {
 
-    public static  String DEFAULT_QUEUE_NAME = "notifications/queuelistenerv1_40;notifications/queuelistenerv1_41;notifications/queuelistenerv1_42";
-    public static final String DEFAULT_QUEUE_PROPERTY = "usergrid.notifications.listener.queueName";
+    public static  String DEFAULT_QUEUE_NAME = "queuelistenerv1_60";
+    public static final String DEFAULT_QUEUE_PROPERTY = "usergrid.notifications.listener.queue";
     private static final Logger LOG = LoggerFactory.getLogger(ApplicationQueueManager.class);
 
     //this is for tests, will not mark initial post complete, set to false for tests
@@ -57,11 +56,10 @@ public class ApplicationQueueManager implements QueueManager {
     public static final String NOTIFIER_ID_POSTFIX = ".notifier.id";
 
     private final EntityManager em;
-    private final org.apache.usergrid.mq.QueueManager qm;
+    private final QueueManager qm;
     private final JobScheduler jobScheduler;
     private final MetricsFactory metricsFactory;
-    private final String[] queueNames;
-    private boolean sendNow = true;
+    private final String queueName;
 
     HashMap<Object, Notifier> notifierHashMap; // only retrieve notifiers once
 
@@ -77,13 +75,12 @@ public class ApplicationQueueManager implements QueueManager {
     public static ProviderAdapter TEST_ADAPTER = new TestAdapter();
 
 
-    public ApplicationQueueManager(JobScheduler jobScheduler, EntityManager entityManager, org.apache.usergrid.mq.QueueManager queueManager, MetricsFactory metricsFactory, Properties properties){
+    public ApplicationQueueManager(JobScheduler jobScheduler, EntityManager entityManager, QueueManager queueManager, MetricsFactory metricsFactory, Properties properties){
         this.em = entityManager;
         this.qm = queueManager;
         this.jobScheduler = jobScheduler;
         this.metricsFactory = metricsFactory;
-        this.queueNames = getQueueNames(properties);
-        this.sendNow = new Boolean(properties.getProperty("usergrid.notifications.sendNow",""+sendNow));
+        this.queueName = getQueueNames(properties);
     }
 
 
@@ -101,26 +98,24 @@ public class ApplicationQueueManager implements QueueManager {
         long startTime = System.currentTimeMillis();
 
         if (notification.getCanceled() == Boolean.TRUE) {
-            LOG.info("ApplicationQueueMessage: notification " + notification.getUuid() + " canceled");
+            LOG.info("notification " + notification.getUuid() + " canceled");
             if (jobExecution != null) {
                 jobExecution.killed();
             }
             return;
         }
 
-        LOG.info("ApplicationQueueMessage: notification {} start queuing", notification.getUuid());
+        LOG.info("notification {} start queuing", notification.getUuid());
 
         final PathQuery<Device> pathQuery = notification.getPathQuery() ; //devices query
         final AtomicInteger deviceCount = new AtomicInteger(); //count devices so you can make a judgement on batching
         final ConcurrentLinkedQueue<String> errorMessages = new ConcurrentLinkedQueue<String>(); //build up list of issues
 
         final HashMap<Object,Notifier> notifierMap =  getNotifierMap();
-        final String queueName = getRandomQueue(queueNames);
-        final List<ApplicationQueueMessage> messages = new ArrayList<>();
 
         //get devices in querystring, and make sure you have access
         if (pathQuery != null) {
-            LOG.info("ApplicationQueueMessage: notification {} start query", notification.getUuid());
+            LOG.info("notification {} start query", notification.getUuid());
             final Iterator<Device> iterator = pathQuery.iterator(em);
             //if there are more pages (defined by PAGE_SIZE) you probably want this to be async, also if this is already a job then don't reschedule
             if (iterator instanceof ResultsIterator && ((ResultsIterator) iterator).hasPages() && jobExecution == null) {
@@ -132,8 +127,6 @@ public class ApplicationQueueManager implements QueueManager {
             final UUID appId = em.getApplication().getUuid();
             final Map<String,Object> payloads = notification.getPayloads();
 
-            final boolean sendNow = this.sendNow; //&& jobExecution == null;
-
             final Func1<Entity,Entity> entityListFunct = new Func1<Entity, Entity>() {
                 @Override
                 public Entity call(Entity entity) {
@@ -143,13 +136,13 @@ public class ApplicationQueueManager implements QueueManager {
                         long now = System.currentTimeMillis();
                         List<EntityRef> devicesRef = getDevices(entity); // resolve group
 
-                        LOG.info("ApplicationQueueMessage: notification {} queue  {} devices, duration "+(System.currentTimeMillis()-now)+" ms", notification.getUuid(), devicesRef.size());
+                        LOG.info("notification {} queue  {} devices, duration "+(System.currentTimeMillis()-now)+" ms", notification.getUuid(), devicesRef.size());
 
                         for (EntityRef deviceRef : devicesRef) {
-                            LOG.info("ApplicationQueueMessage: notification {} starting to queue device {} ", notification.getUuid(), deviceRef.getUuid());
+                            LOG.info("notification {} starting to queue device {} ", notification.getUuid(), deviceRef.getUuid());
                             long hash = MurmurHash.hash(deviceRef.getUuid());
                             if (sketch.estimateCount(hash) > 0) { //look for duplicates
-                                LOG.debug("ApplicationQueueMessage: Maybe Found duplicate device: {}", deviceRef.getUuid());
+                                LOG.warn("Maybe Found duplicate device: {}", deviceRef.getUuid());
                                 continue;
                             } else {
                                 sketch.add(hash, 1);
@@ -167,11 +160,11 @@ public class ApplicationQueueManager implements QueueManager {
                                     notifierKey = entry.getKey().toLowerCase();
                                     break;
                                 }
-                                LOG.info("ApplicationQueueMessage: Provider query for notification {} device {} took "+(System.currentTimeMillis()-now)+" ms",notification.getUuid(),deviceRef.getUuid());
+                                LOG.info("Provider query for notification {} device {} took "+(System.currentTimeMillis()-now)+" ms",notification.getUuid(),deviceRef.getUuid());
                             }
 
                             if (notifierId == null) {
-                                LOG.debug("ApplicationQueueMessage: Notifier did not match for device {} ", deviceRef);
+                                LOG.info("Notifier did not match for device {} ", deviceRef);
                                 continue;
                             }
 
@@ -180,15 +173,11 @@ public class ApplicationQueueManager implements QueueManager {
                                 // update queued time
                                 now = System.currentTimeMillis();
                                 notification.setQueued(System.currentTimeMillis());
-                                LOG.info("ApplicationQueueMessage: notification {} device {} queue time set. duration "+(System.currentTimeMillis()-now)+" ms", notification.getUuid(), deviceRef.getUuid());
+                                LOG.info("notification {} device {} queue time set. duration "+(System.currentTimeMillis()-now)+" ms", notification.getUuid(), deviceRef.getUuid());
                             }
                             now = System.currentTimeMillis();
-                            if(sendNow){ //if(jobExecution == null && sendNow) {
-                                messages.add(message);
-                            }else{
-                                qm.postToQueue(queueName, message);
-                            }
-                            LOG.info("ApplicationQueueMessage: notification {} post-queue to device {} duration " + (System.currentTimeMillis() - now) + " ms "+queueName+" queue", notification.getUuid(), deviceRef.getUuid());
+                            qm.sendMessage(message);
+                            LOG.info("notification {} post-queue to device {} duration " + (System.currentTimeMillis() - now) + " ms "+queueName+" queue", notification.getUuid(), deviceRef.getUuid());
                             deviceCount.incrementAndGet();
                             queueMeter.mark();
                         }
@@ -215,7 +204,7 @@ public class ApplicationQueueManager implements QueueManager {
                         }
                     });
             o.toBlocking().lastOrDefault(null);
-            LOG.info("ApplicationQueueMessage: notification {} done queueing duration {} ms", notification.getUuid(), System.currentTimeMillis() - now);
+            LOG.info("notification {} done queueing duration {} ms", notification.getUuid(), System.currentTimeMillis() - now);
 
 
         }
@@ -238,26 +227,19 @@ public class ApplicationQueueManager implements QueueManager {
 
         em.update(notification);
 
-        LOG.info("ApplicationQueueMessage: notification {} updated notification duration {} ms", notification.getUuid(),System.currentTimeMillis() - now);
+        LOG.info("notification {} updated notification duration {} ms", notification.getUuid(),System.currentTimeMillis() - now);
 
         //do i have devices, and have i already started batching.
         if (deviceCount.get() <= 0) {
-            TaskManager taskManager = new TaskManager(em, qm, this, notification,queueName);
+            TaskManager taskManager = new TaskManager(em, this, notification,this.qm);
             //if i'm in a test value will be false, do not mark finished for test orchestration, not ideal need real tests
             taskManager.finishedBatch();
         }
 
         if (LOG.isInfoEnabled()) {
             long elapsed = notification.getQueued() != null ? notification.getQueued() - startTime : 0;
-            LOG.info("ApplicationQueueMessage: notification {} done queuing to {} devices in "+elapsed+" ms",notification.getUuid().toString(),deviceCount.get());
+            LOG.info("notification {} done queuing to {} devices in "+elapsed+" ms",notification.getUuid().toString(),deviceCount.get());
         }
-
-        if(messages.size()>0){
-            now = System.currentTimeMillis();
-            sendBatchToProviders(messages,null).toBlocking().lastOrDefault(null);
-            LOG.info("ApplicationQueueMessage: notification {} done sending to "+messages.size()+" devicess in {} ms", notification.getUuid(), System.currentTimeMillis() - now);
-        }
-
     }
 
     /**
@@ -300,20 +282,22 @@ public class ApplicationQueueManager implements QueueManager {
      * @throws Exception
      */
 
-    public Observable sendBatchToProviders( final List<ApplicationQueueMessage> messages, final String queuePath) {
+    public Observable sendBatchToProviders( final List<QueueMessage> messages, final String queuePath) {
         LOG.info("sending batch of {} notifications.", messages.size());
         final Meter sendMeter = metricsFactory.getMeter(NotificationsService.class, "send");
 
         final Map<Object, Notifier> notifierMap = getNotifierMap();
-        final QueueManager proxy = this;
+        final ApplicationQueueManager proxy = this;
         final ConcurrentHashMap<UUID,TaskManager> taskMap = new ConcurrentHashMap<UUID, TaskManager>(messages.size());
         final ConcurrentHashMap<UUID,Notification> notificationMap = new ConcurrentHashMap<UUID, Notification>(messages.size());
 
-        final Func1<ApplicationQueueMessage, ApplicationQueueMessage> func = new Func1<ApplicationQueueMessage, ApplicationQueueMessage>() {
+        final Func1<QueueMessage, ApplicationQueueMessage> func = new Func1<QueueMessage, ApplicationQueueMessage>() {
             @Override
-            public ApplicationQueueMessage call(ApplicationQueueMessage message) {
+            public ApplicationQueueMessage call(QueueMessage queueMessage) {
                 boolean messageCommitted = false;
+                ApplicationQueueMessage message = null;
                 try {
+                    message = (ApplicationQueueMessage) queueMessage.getBody();
                     LOG.info("start sending notification for device {} for Notification: {} on thread "+Thread.currentThread().getId(), message.getDeviceId(), message.getNotificationId());
 
                     UUID deviceUUID = message.getDeviceId();
@@ -325,7 +309,7 @@ public class ApplicationQueueManager implements QueueManager {
                     }
                     TaskManager taskManager = taskMap.get(message.getNotificationId());
                     if (taskManager == null) {
-                        taskManager = new TaskManager(em, qm, proxy, notification,queuePath);
+                        taskManager = new TaskManager(em, proxy, notification, qm);
                         taskMap.putIfAbsent(message.getNotificationId(), taskManager);
                         taskManager = taskMap.get(message.getNotificationId());
                     }
@@ -334,7 +318,7 @@ public class ApplicationQueueManager implements QueueManager {
                     final Map<String, Object> translatedPayloads = translatePayloads(payloads, notifierMap);
                     LOG.info("sending notification for device {} for Notification: {}", deviceUUID, notification.getUuid());
 
-                    taskManager.addMessage(deviceUUID,message);
+                    taskManager.addMessage(deviceUUID,queueMessage);
                     try {
                         String notifierName = message.getNotifierKey().toLowerCase();
                         Notifier notifier = notifierMap.get(notifierName.toLowerCase());
@@ -368,7 +352,7 @@ public class ApplicationQueueManager implements QueueManager {
                     LOG.error("Failure while sending",e);
                     try {
                         if(!messageCommitted && queuePath != null) {
-                            qm.commitTransaction(queuePath, message.getTransaction(), null);
+                            qm.commitMessage(queueMessage);
                         }
                     }catch (Exception queueException){
                         LOG.error("Failed to commit message.",queueException);
@@ -378,9 +362,9 @@ public class ApplicationQueueManager implements QueueManager {
             }
         };
         Observable o = rx.Observable.from(messages)
-                .parallel(new Func1<rx.Observable<ApplicationQueueMessage>, rx.Observable<ApplicationQueueMessage>>() {
+                .parallel(new Func1<rx.Observable<QueueMessage>, rx.Observable<ApplicationQueueMessage>>() {
                     @Override
-                    public rx.Observable<ApplicationQueueMessage> call(rx.Observable<ApplicationQueueMessage> messageObservable) {
+                    public rx.Observable<ApplicationQueueMessage> call(rx.Observable<QueueMessage> messageObservable) {
                         return messageObservable.map(func);
                     }
                 }, Schedulers.io())
@@ -446,14 +430,8 @@ public class ApplicationQueueManager implements QueueManager {
         return translatedPayloads;
     }
 
-    public static String[] getQueueNames(Properties properties) {
-        String[] names = properties.getProperty(ApplicationQueueManager.DEFAULT_QUEUE_PROPERTY,ApplicationQueueManager.DEFAULT_QUEUE_NAME).split(";");
-        return names;
-    }
-    public static String getRandomQueue(String[] queueNames) {
-        int size = queueNames.length;
-        Random random = new Random();
-        String name = queueNames[random.nextInt(size)];
+    public static String getQueueNames(Properties properties) {
+        String name = properties.getProperty(ApplicationQueueManager.DEFAULT_QUEUE_PROPERTY,ApplicationQueueManager.DEFAULT_QUEUE_NAME);
         return name;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueMessage.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueMessage.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueMessage.java
index 91f1312..fa75531 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueMessage.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueMessage.java
@@ -16,6 +16,7 @@
  */
 package org.apache.usergrid.services.notifications;
 
+import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import org.apache.usergrid.mq.Message;
@@ -28,23 +29,25 @@ import org.slf4j.LoggerFactory;
 /**
  * Created by ApigeeCorporation on 9/4/14.
  */
-public class ApplicationQueueMessage extends Message {
+public class ApplicationQueueMessage implements Serializable {
 
     private static final Logger log = LoggerFactory.getLogger(ApplicationQueueMessage.class);
-
-    static final String MESSAGE_PROPERTY_DEVICE_UUID = "deviceUUID";
-    static final String MESSAGE_PROPERTY_APPLICATION_UUID = "applicationUUID";
-    static final String MESSAGE_PROPERTY_NOTIFIER_ID = "notifierId";
-    static final String MESSAGE_PROPERTY_NOTIFICATION_ID = "notificationId";
-    static final String MESSAGE_PROPERTY_NOTIFIER_NAME = "notifierName";
+    private UUID applicationId;
+    private UUID notificationId;
+    private UUID deviceId;
+    private String notifierKey;
+    private String notifierId;
 
 
     public ApplicationQueueMessage() {
     }
 
     public ApplicationQueueMessage(UUID applicationId, UUID notificationId, UUID deviceId, String notifierKey, String notifierId) {
-        setApplicationId(applicationId);
-        setDeviceId(deviceId);
+        this.applicationId = applicationId;
+        this.notificationId = notificationId;
+        this.deviceId = deviceId;
+        this.notifierKey = notifierKey;
+        this.notifierId = notifierId;
         setNotificationId(notificationId);
         setNotifierKey(notifierKey);
         setNotifierId(notifierId);
@@ -58,71 +61,45 @@ public class ApplicationQueueMessage extends Message {
         return new UUID( msb, lsb );
     }
 
-    public static ApplicationQueueMessage generate(Message message) {
-
-        // this crazyness may indicate that Core Persistence is not storing UUIDs correctly
-
-        byte[] mpaBytes = (byte[])message.getObjectProperty(MESSAGE_PROPERTY_APPLICATION_UUID);
-        UUID mpaUuid = bytesToUuid(mpaBytes);
-
-        byte[] mpnBytes = (byte[])message.getObjectProperty(MESSAGE_PROPERTY_NOTIFICATION_ID);
-        UUID mpnUuid = bytesToUuid(mpnBytes);
-
-        final UUID mpdUuid;
-        Object o = message.getObjectProperty(MESSAGE_PROPERTY_DEVICE_UUID);
-        if ( o instanceof UUID ) {
-            mpdUuid = (UUID)message.getObjectProperty(MESSAGE_PROPERTY_DEVICE_UUID);
-        } else {
-            byte[] mpdBytes = (byte[])o;
-            mpdUuid =  bytesToUuid(mpdBytes);
-        }
-
-        // end of crazyness
-
-        return new ApplicationQueueMessage(
-                mpaUuid, mpnUuid, mpdUuid,
-                message.getStringProperty(MESSAGE_PROPERTY_NOTIFIER_NAME), 
-                message.getStringProperty(MESSAGE_PROPERTY_NOTIFIER_ID));
-    }
 
     public UUID getApplicationId() {
-        return (UUID) this.getObjectProperty(MESSAGE_PROPERTY_APPLICATION_UUID);
+        return applicationId;
     }
 
     public void setApplicationId(UUID applicationId) {
-        this.setProperty(MESSAGE_PROPERTY_APPLICATION_UUID, applicationId);
+       this.applicationId = applicationId;
     }
 
     public UUID getDeviceId() {
-        return (UUID) this.getObjectProperty(MESSAGE_PROPERTY_DEVICE_UUID);
+        return deviceId;
     }
 
     public void setDeviceId(UUID deviceId) {
-        this.setProperty(MESSAGE_PROPERTY_DEVICE_UUID, deviceId);
+        this.deviceId = deviceId;
     }
 
     public UUID getNotificationId() {
-        return (UUID) this.getObjectProperty(MESSAGE_PROPERTY_NOTIFICATION_ID);
+        return notificationId;
     }
 
     public void setNotificationId(UUID notificationId) {
-        this.setProperty(MESSAGE_PROPERTY_NOTIFICATION_ID, notificationId);
+       this.notificationId = notificationId;
     }
 
     public String getNotifierId() {
-        return this.getStringProperty(MESSAGE_PROPERTY_NOTIFIER_ID);
+        return notifierId;
     }
 
     public void setNotifierId(String notifierId) {
-        this.setProperty(MESSAGE_PROPERTY_NOTIFIER_ID, notifierId);
+         this.notifierId = notifierId;
     }
 
     public String getNotifierKey() {
-        return this.getStringProperty(MESSAGE_PROPERTY_NOTIFIER_NAME);
+        return notifierKey;
     }
 
     public void setNotifierKey(String name) {
-        this.setProperty(MESSAGE_PROPERTY_NOTIFIER_NAME, name);
+        notifierKey = name;
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
index 4e5692e..c5cd3c4 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
@@ -20,6 +20,7 @@ import java.util.*;
 
 import com.codahale.metrics.*;
 import com.codahale.metrics.Timer;
+import org.apache.usergrid.corepersistence.CpSetup;
 import org.apache.usergrid.metrics.MetricsFactory;
 import org.apache.usergrid.mq.Message;
 import org.apache.usergrid.persistence.*;
@@ -28,6 +29,11 @@ import org.apache.usergrid.persistence.entities.Notifier;
 import org.apache.usergrid.persistence.entities.Receipt;
 import org.apache.usergrid.persistence.index.query.Identifier;
 import org.apache.usergrid.persistence.index.query.Query;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+import org.apache.usergrid.persistence.queue.QueueScope;
+import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
 import org.apache.usergrid.services.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -80,10 +86,9 @@ public class NotificationsService extends AbstractCollectionService {
 
     private ApplicationQueueManager notificationQueueManager;
     private long gracePeriod;
-    @Autowired
     private ServiceManagerFactory smf;
-    @Autowired
     private EntityManagerFactory emf;
+    private QueueManagerFactory queueManagerFactory;
 
     public NotificationsService() {
         LOG.info("/notifications");
@@ -99,7 +104,11 @@ public class NotificationsService extends AbstractCollectionService {
         postMeter = metricsService.getMeter(NotificationsService.class, "requests");
         postTimer = metricsService.getTimer(this.getClass(), "execution_rest");
         JobScheduler jobScheduler = new JobScheduler(sm,em);
-        notificationQueueManager = new ApplicationQueueManager(jobScheduler,em,smf.getServiceManager(smf.getManagementAppId()).getQueueManager(),metricsService,props);
+        String name = ApplicationQueueManager.getQueueNames(props);
+        QueueScope queueScope = new QueueScopeImpl(new SimpleId(smf.getManagementAppId(),"notifications"),name);
+        queueManagerFactory = CpSetup.getInjector().getInstance(QueueManagerFactory.class);
+        QueueManager queueManager = queueManagerFactory.getQueueManager(queueScope);
+        notificationQueueManager = new ApplicationQueueManager(jobScheduler,em,queueManager,metricsService,props);
         gracePeriod = jobScheduler.SCHEDULER_GRACE_PERIOD;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 7d76ee9..a381c70 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -16,11 +16,17 @@
  */
 package org.apache.usergrid.services.notifications;
 
+import org.apache.usergrid.corepersistence.CpSetup;
 import org.apache.usergrid.metrics.MetricsFactory;
-import org.apache.usergrid.mq.*;
+
+import org.apache.usergrid.mq.QueueResults;
 import org.apache.usergrid.persistence.EntityManager;
 import org.apache.usergrid.persistence.EntityManagerFactory;
 
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.queue.*;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
 import org.apache.usergrid.services.ServiceManager;
 import org.apache.usergrid.services.ServiceManagerFactory;
 import org.slf4j.Logger;
@@ -35,7 +41,8 @@ import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class QueueListener  {
-    public  final long MESSAGE_TRANSACTION_TIMEOUT =  1 * 60 * 1000;
+    public  final int MESSAGE_TRANSACTION_TIMEOUT =  25 * 1000;
+    private final QueueManagerFactory queueManagerFactory;
 
     public   long DEFAULT_SLEEP = 5000;
 
@@ -50,7 +57,6 @@ public class QueueListener  {
 
     private Properties properties;
 
-    private org.apache.usergrid.mq.QueueManager queueManager;
 
     private ServiceManager svcMgr;
 
@@ -61,13 +67,12 @@ public class QueueListener  {
     private ExecutorService pool;
     private List<Future> futures;
 
-    public  final String MAX_THREADS = "2";
-    private Integer batchSize = 100;
-    private String[] queueNames;
-
-
+    public  final int MAX_THREADS = 2;
+    private Integer batchSize = 10;
+    private String queueName;
 
     public QueueListener(ServiceManagerFactory smf, EntityManagerFactory emf, MetricsFactory metricsService, Properties props){
+        this.queueManagerFactory = CpSetup.getInjector().getInstance(QueueManagerFactory.class);
         this.smf = smf;
         this.emf = emf;
         this.metricsService = metricsService;
@@ -76,7 +81,7 @@ public class QueueListener  {
 
     @PostConstruct
     public void start(){
-        boolean shouldRun = new Boolean(properties.getProperty("usergrid.notifications.listener.run", "false"));
+        boolean shouldRun = new Boolean(properties.getProperty("usergrid.notifications.listener.run", "true"));
 
         if(shouldRun) {
             LOG.info("QueueListener: starting.");
@@ -86,9 +91,9 @@ public class QueueListener  {
                 sleepBetweenRuns = new Long(properties.getProperty("usergrid.notifications.listener.sleep.between", "0")).longValue();
                 sleepWhenNoneFound = new Long(properties.getProperty("usergrid.notifications.listener.sleep.after", ""+DEFAULT_SLEEP)).longValue();
                 batchSize = new Integer(properties.getProperty("usergrid.notifications.listener.batchSize", (""+batchSize)));
-                queueNames = ApplicationQueueManager.getQueueNames(properties);
+                queueName = ApplicationQueueManager.getQueueNames(properties);
 
-                int maxThreads = new Integer(properties.getProperty("usergrid.notifications.listener.maxThreads", MAX_THREADS));
+                int maxThreads = new Integer(properties.getProperty("usergrid.notifications.listener.maxThreads", ""+MAX_THREADS));
                 futures = new ArrayList<Future>(maxThreads);
 
                 //create our thread pool based on our threadcount.
@@ -120,7 +125,7 @@ public class QueueListener  {
     }
 
     private void execute(){
-        Thread.currentThread().setDaemon(true);
+//        Thread.currentThread().setDaemon(true);
         Thread.currentThread().setName("Notifications_Processor"+UUID.randomUUID());
 
         final AtomicInteger consecutiveExceptions = new AtomicInteger();
@@ -130,31 +135,30 @@ public class QueueListener  {
         while ( true ) {
             try {
                 svcMgr = smf.getServiceManager(smf.getManagementAppId());
-                queueManager = svcMgr.getQueueManager();
-                String queueName = ApplicationQueueManager.getRandomQueue(queueNames);
                 LOG.info("getting from queue {} ", queueName);
-                QueueResults results = getDeliveryBatch(queueManager,queueName);
-                LOG.info("QueueListener: retrieved batch of {} messages from queue {} ", results.size(),queueName);
+                QueueScope queueScope = new QueueScopeImpl(new SimpleId(smf.getManagementAppId(),"notifications"),queueName);
+                QueueManager queueManager = queueManagerFactory.getQueueManager(queueScope);
+                List<QueueMessage> messages = queueManager.getMessages(getBatchSize(),MESSAGE_TRANSACTION_TIMEOUT,5000);
+                LOG.info("retrieved batch of {} messages from queue {} ", messages.size(),queueName);
 
-                List<Message> messages = results.getMessages();
                 if (messages.size() > 0) {
-                    HashMap<UUID, List<ApplicationQueueMessage>> messageMap = new HashMap<>(messages.size());
+                    HashMap<UUID, List<QueueMessage>> messageMap = new HashMap<>(messages.size());
                     //group messages into hash map by app id
-                    for (Message message : messages) {
-                        ApplicationQueueMessage queueMessage = ApplicationQueueMessage.generate(message);
+                    for (QueueMessage message : messages) {
+                        ApplicationQueueMessage queueMessage = (ApplicationQueueMessage) message.getBody();
                         UUID applicationId = queueMessage.getApplicationId();
                         if (!messageMap.containsKey(applicationId)) {
-                            List<ApplicationQueueMessage> applicationQueueMessages = new ArrayList<ApplicationQueueMessage>();
-                            applicationQueueMessages.add(queueMessage);
+                            List<QueueMessage> applicationQueueMessages = new ArrayList<QueueMessage>();
+                            applicationQueueMessages.add(message);
                             messageMap.put(applicationId, applicationQueueMessages);
                         } else {
-                            messageMap.get(applicationId).add(queueMessage);
+                            messageMap.get(applicationId).add(message);
                         }
                     }
                     long now = System.currentTimeMillis();
                     Observable merge = null;
                     //send each set of app ids together
-                    for (Map.Entry<UUID, List<ApplicationQueueMessage>> entry : messageMap.entrySet()) {
+                    for (Map.Entry<UUID, List<QueueMessage>> entry : messageMap.entrySet()) {
                         UUID applicationId = entry.getKey();
                         EntityManager entityManager = emf.getEntityManager(applicationId);
                         ServiceManager serviceManager = smf.getServiceManager(applicationId);
@@ -166,7 +170,7 @@ public class QueueListener  {
                                 properties
                         );
 
-                        LOG.info("QueueListener: send batch for app {} of {} messages", entry.getKey(), entry.getValue().size());
+                        LOG.info("send batch for app {} of {} messages", entry.getKey(), entry.getValue().size());
                         Observable current = manager.sendBatchToProviders(entry.getValue(),queueName);
                         if(merge == null)
                             merge = current;
@@ -176,16 +180,16 @@ public class QueueListener  {
                     }
                     if(merge!=null) {
                         merge.toBlocking().lastOrDefault(null);
+                        LOG.info("sent batch {} messages duration {} ms", messages.size(),System.currentTimeMillis() - now);
                     }
-                    LOG.info("QueueListener: sent batch {} messages duration {} ms", messages.size(),System.currentTimeMillis() - now);
 
                     if(sleepBetweenRuns > 0) {
-                        LOG.info("QueueListener: sleep between rounds...sleep...{}", sleepBetweenRuns);
+                        LOG.info("sleep between rounds...sleep...{}", sleepBetweenRuns);
                         Thread.sleep(sleepBetweenRuns);
                     }
                 }
                 else{
-                    LOG.info("QueueListener: no messages...sleep...{}", sleepWhenNoneFound);
+                    LOG.info("no messages...sleep...{}", sleepWhenNoneFound);
                     Thread.sleep(sleepWhenNoneFound);
                 }
                 //send to the providers
@@ -206,7 +210,7 @@ public class QueueListener  {
     }
 
     public void stop(){
-        LOG.info("QueueListener: stop processes");
+        LOG.info("stop processes");
 
         if(futures == null){
             return;
@@ -218,13 +222,6 @@ public class QueueListener  {
         pool.shutdownNow();
     }
 
-    private  QueueResults getDeliveryBatch(org.apache.usergrid.mq.QueueManager queueManager,String queuePath) throws Exception {
-        QueueQuery qq = new QueueQuery();
-        qq.setLimit(this.getBatchSize());
-        qq.setTimeout(MESSAGE_TRANSACTION_TIMEOUT);
-        QueueResults results = queueManager.getFromQueue(queuePath, qq);
-        return results;
-    }
 
     public void setBatchSize(int batchSize){
         this.batchSize = batchSize;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java
deleted file mode 100644
index 0024417..0000000
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.usergrid.services.notifications;
-
-import org.apache.usergrid.persistence.entities.Notifier;
-
-import java.util.HashMap;
-import java.util.Set;
-
-/**
- * Created by ApigeeCorporation on 9/4/14.
- */
-public interface QueueManager {
-
-    public void asyncCheckForInactiveDevices(Set<Notifier> notifiers) throws Exception ;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
index 08f067d..6b5441e 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
@@ -23,6 +23,8 @@ import org.apache.usergrid.persistence.entities.Device;
 import org.apache.usergrid.persistence.entities.Notification;
 import org.apache.usergrid.persistence.entities.Notifier;
 import org.apache.usergrid.persistence.entities.Receipt;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.QueueMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,28 +35,26 @@ import java.util.concurrent.atomic.AtomicLong;
 public class TaskManager {
 
     private static final Logger LOG = LoggerFactory.getLogger(TaskManager.class);
-    private final QueueManager proxy;
-    private final String queuePath;
+    private final ApplicationQueueManager proxy;
+    private final QueueManager queueManager;
 
     private Notification notification;
     private AtomicLong successes = new AtomicLong();
     private AtomicLong failures = new AtomicLong();
-    private org.apache.usergrid.mq.QueueManager qm;
     private EntityManager em;
-    private ConcurrentHashMap<UUID, ApplicationQueueMessage> messageMap;
+    private ConcurrentHashMap<UUID, QueueMessage> messageMap;
     private boolean hasFinished;
 
-    public TaskManager(EntityManager em, org.apache.usergrid.mq.QueueManager qm, QueueManager proxy, Notification notification, String queuePath) {
+    public TaskManager(EntityManager em,ApplicationQueueManager proxy, Notification notification, QueueManager queueManager) {
         this.em = em;
-        this.qm = qm;
         this.notification = notification;
         this.proxy = proxy;
-        this.messageMap = new ConcurrentHashMap<UUID, ApplicationQueueMessage>();
+        this.messageMap = new ConcurrentHashMap<UUID, QueueMessage>();
         hasFinished = false;
-        this.queuePath = queuePath;
+        this.queueManager = queueManager;
     }
 
-    public void addMessage(UUID deviceId, ApplicationQueueMessage message) {
+    public void addMessage(UUID deviceId, QueueMessage message) {
         messageMap.put(deviceId, message);
     }
 
@@ -62,8 +62,8 @@ public class TaskManager {
         LOG.debug("REMOVED {}", deviceUUID);
         try {
             LOG.debug("notification {} removing device {} from remaining", notification.getUuid(), deviceUUID);
-            if(queuePath!=null){
-                qm.commitTransaction(queuePath, messageMap.get(deviceUUID).getTransaction(), null);
+            if(queueManager!=null){
+                queueManager.commitMessage(messageMap.get(deviceUUID));
             }
 
             EntityRef deviceRef = new SimpleEntityRef(Device.ENTITY_TYPE, deviceUUID);
@@ -147,34 +147,36 @@ public class TaskManager {
         }
     }
 
-    public void finishedBatch() throws Exception {
-        long successes = this.successes.getAndSet(0); //reset counters
-        long failures = this.failures.getAndSet(0); //reset counters
-        this.hasFinished = true;
-
-        // refresh notification
-        Notification notification = em.get(this.notification.getUuid(), Notification.class);
-        notification.setModified(System.currentTimeMillis());
-
-        long sent = successes, errors = failures;
-        //and write them out again, this will produce the most accurate count
-        Map<String, Long> stats = new HashMap<>(2);
-        stats.put("sent", sent);
-        stats.put("errors", errors);
-        notification.updateStatistics(successes, errors);
-
-        //none of this is known and should you ever do this
-        if (notification.getExpectedCount() <= (notification.getStatistics().get("sent") + notification.getStatistics().get("errors"))) {
-            Map<String, Object> properties = new HashMap<>();
-            notification.setFinished(notification.getModified());
-            properties.put("finished", notification.getModified());
-            properties.put("state", notification.getState());
-            LOG.info("done sending to devices in {} ms", notification.getFinished() - notification.getStarted());
-            notification.addProperties(properties);
-        }
-        LOG.info("notification finished batch: {} of {} devices", notification.getUuid(),sent+errors);
-        em.update(notification);
+    public  void finishedBatch() throws Exception {
+        synchronized (this) {
+            long successes = this.successes.getAndSet(0); //reset counters
+            long failures = this.failures.getAndSet(0); //reset counters
+            this.hasFinished = true;
+
+            // refresh notification
+            Notification notification = em.get(this.notification.getUuid(), Notification.class);
+            notification.setModified(System.currentTimeMillis());
+
+            //and write them out again, this will produce the most accurate count
+            Map<String, Long> stats = new HashMap<>(2);
+            stats.put("sent", successes);
+            stats.put("errors", failures);
+            notification.updateStatistics(successes, failures);
+
+            //none of this is known and should you ever do this
+            if (notification.getExpectedCount() <= (notification.getStatistics().get("sent") + notification.getStatistics().get("errors"))) {
+                Map<String, Object> properties = new HashMap<>();
+                notification.setFinished(notification.getModified());
+                properties.put("finished", notification.getModified());
+                properties.put("state", notification.getState());
+                LOG.info("done sending to devices in {} ms", notification.getFinished() - notification.getStarted());
+                notification.addProperties(properties);
+            }
+
+            LOG.info("notification finished batch: {} of {} devices", notification.getUuid(), successes + failures);
+            em.update(notification);
 //        Set<Notifier> notifiers = new HashSet<>(proxy.getNotifierMap().values()); // remove dups
 //        proxy.asyncCheckForInactiveDevices(notifiers);
+        }
     }
 }
\ No newline at end of file