You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/10/07 21:33:13 UTC

[1/9] start queues

Repository: incubator-usergrid
Updated Branches:
  refs/heads/collection_multiget a6e6e3513 -> fd841758a


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
index 27e6d27..faa5091 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
@@ -62,10 +62,11 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
     private User user1;
     private NotificationsService ns;
     QueueListener listener;
+    private String  notifierName = "apNs";
 
     @BeforeClass
     public static void setup(){
-        ApplicationQueueManager.DEFAULT_QUEUE_NAME = "notifications/test/" + UUID.randomUUID().toString()+";"+"notifications/test/" + UUID.randomUUID().toString();
+        ApplicationQueueManager.DEFAULT_QUEUE_NAME = "test";
     }
 
     @Override
@@ -75,7 +76,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
         super.before();
         // create apns notifier //
         app.clear();
-        app.put("name", "apNs");
+        app.put("name", notifierName);
         app.put("provider",PROVIDER);
         app.put("environment", USE_REAL_CONNECTIONS ? "development" : "mock");
         // app.put("certificatePassword","pushy-test");
@@ -93,10 +94,8 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
         app.clear();
         app.put(notifierKey, PUSH_TOKEN);
         app.put("name", "device1");
-
         e = app.testRequest(ServiceAction.POST, 1, "devices").getEntity();
         app.testRequest(ServiceAction.GET, 1, "devices", e.getUuid());
-
         device1 = app.getEm().get(e.getUuid(), Device.class);
         assertEquals(device1.getProperty(notifierKey), PUSH_TOKEN);
 
@@ -105,6 +104,9 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
         app.put("name", "device2");
         e = app.testRequest(ServiceAction.POST, 1, "devices").getEntity();
         device2 = app.getEm().get(e.getUuid(), Device.class);
+        Map<String, Object> props = app.getEm().getProperties(e);
+        assertEquals(device2.getProperty(notifierKey), PUSH_TOKEN);
+        app.getEm().refreshIndex();
 
         // create User
         user1 = new User();
@@ -130,6 +132,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
         PathQuery pathQuery = new PathQuery( new SimpleEntityRef(app.getEm().getApplicationRef()), query);
 
         ns.TEST_PATH_QUERY = pathQuery;
+        app.getEm().refreshIndex();
 
         listener = new QueueListener(ns.getServiceManagerFactory(),ns.getEntityManagerFactory(),ns.getMetricsFactory(), new Properties());
         listener.DEFAULT_SLEEP = 200;
@@ -181,7 +184,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
 
         // verify Query for CREATED state
         Query query = new Query();
-        query.addEqualityFilter("state", Notification.State.FINISHED.toString());
+        query.addEqualityFilter("state", Notification.State.STARTED.toString());
         Results results = app.getEm().searchCollection(
                 app.getEm().getApplicationRef(), "notifications", query);
         Entity entity = results.getEntitiesMap().get(notification.getUuid());
@@ -465,24 +468,18 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
     public void twoDevicesOneNotifier() throws Exception {
 
         // create push notification //
-
-
         app.clear();
         String payload = getPayload();
         Map<String, String> payloads = new HashMap<String, String>(1);
-        payloads.put(notifier.getUuid().toString(), payload);
+        payloads.put(notifierName, payload);
         app.put("payloads", payloads);
         app.put("queued", System.currentTimeMillis());
 
-        Entity e = app.testRequest(ServiceAction.POST, 1, "notifications")
-                .getEntity();
+        Entity e = app.testRequest(ServiceAction.POST, 1, "notifications").getEntity();
         app.testRequest(ServiceAction.GET, 1, "notifications", e.getUuid());
 
-        Notification notification = app.getEm().get(e.getUuid(),
-                Notification.class);
-        assertEquals(
-                notification.getPayloads().get(notifier.getUuid().toString()),
-                payload);
+        Notification notification = app.getEm().get(e.getUuid(),Notification.class);
+        //assertEquals(notification.getPayloads().get(notifier.getUuid().toString()),payload);
 
 
         // perform push //
@@ -494,9 +491,10 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
     @Test
     public void twoDevicesTwoNotifiers() throws Exception {
 
+        String notifier2Name = "apNs2";
         // create a 2nd notifier //
         app.clear();
-        app.put("name", "apNs2");
+        app.put("name", notifier2Name);
         app.put("provider", PROVIDER);
         app.put("environment", "development");
         InputStream fis = getClass().getClassLoader().getResourceAsStream(
@@ -516,10 +514,10 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
         assertEquals(notifier2.getProvider(), PROVIDER);
         assertEquals(notifier2.getEnvironment(), "development");
 
-        String key = notifier.getName() + NOTIFIER_ID_POSTFIX;
-        String key2 = notifier2.getName() + NOTIFIER_ID_POSTFIX;
+        String key = notifierName + NOTIFIER_ID_POSTFIX;
+        String key2 = notifier2Name + NOTIFIER_ID_POSTFIX;
         device2.setProperty(key, null);
-        device2.setProperty(key2, null);
+        device2.setProperty(key2, PUSH_TOKEN);
         app.getEm().update(device2);
 
         app.getEm().refreshIndex();
@@ -529,8 +527,8 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
         app.clear();
         String payload = getPayload();
         Map<String, String> payloads = new HashMap<String, String>(1);
-        payloads.put(notifier.getUuid().toString(), payload);
-        payloads.put(notifier2.getUuid().toString(), payload);
+        payloads.put(notifierName, payload);
+        payloads.put(notifier2Name, payload);
         app.put("payloads", payloads);
         app.put("queued", System.currentTimeMillis());
 
@@ -539,11 +537,8 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
 
         app.getEm().refreshIndex();
 
-        Notification notification = app.getEm().get(e.getUuid(),
-                Notification.class);
-        assertEquals(
-                notification.getPayloads().get(notifier.getUuid().toString()),
-                payload);
+        Notification notification = app.getEm().get(e.getUuid(),  Notification.class);
+        assertEquals(notification.getPayloads().get(notifierName), payload);
 
         // perform push //
         notification = scheduleNotificationAndWait(notification);

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
index 8907226..034723e 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
@@ -61,7 +61,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
 
     @BeforeClass
     public static void setup(){
-        ApplicationQueueManager.DEFAULT_QUEUE_NAME = "notifications/test/" + UUID.randomUUID().toString()+";"+"notifications/test/" + UUID.randomUUID().toString();
+        ApplicationQueueManager.DEFAULT_QUEUE_NAME = "test";
     }
     @Override
     @Before


[7/9] git commit: Added TODO's

Posted by to...@apache.org.
Added TODO's


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/17cc01dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/17cc01dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/17cc01dc

Branch: refs/heads/collection_multiget
Commit: 17cc01dc92c39878c7f917d4f361f129dee162ad
Parents: c0c30a9
Author: Todd Nine <to...@apache.org>
Authored: Mon Oct 6 15:39:13 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Mon Oct 6 15:39:13 2014 -0600

----------------------------------------------------------------------
 .../persistence/collection/EntityCollectionManager.java         | 5 +++++
 .../org/apache/usergrid/persistence/index/EntityIndexBatch.java | 3 +++
 2 files changed, 8 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/17cc01dc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
index ee3a5d1..b49989f 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
@@ -51,6 +51,11 @@ public interface EntityCollectionManager {
      */
     public Observable<Entity> load( Id entityId );
 
+    //TODO TN Change load to use multiget and return multiple entities.  Only supports loading 1k per load operation.
+
+
+    //TODO Dave add a load versions using a multiget that will return a latest version structure for a collection of entity Ids
+
 
     /**
      * Takes the change and reloads an entity with all changes applied in this entity applied.

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/17cc01dc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
index 643174c..f98025b 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
@@ -59,6 +59,9 @@ public interface EntityIndexBatch {
      */
     public EntityIndexBatch deindex(final IndexScope scope, final Id id, final UUID version);
 
+
+    //TODO: Create a delete method that delete's  by Id.  This will delete all documents from ES with the same entity Id
+
     /**
      * Execute the batch
      */


[5/9] git commit: comments

Posted by to...@apache.org.
comments


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/4cb7a925
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/4cb7a925
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/4cb7a925

Branch: refs/heads/collection_multiget
Commit: 4cb7a9259c30d8905ec7f0feb6b81f3cf099c323
Parents: 6f971cf
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Oct 6 15:04:27 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Oct 6 15:04:27 2014 -0600

----------------------------------------------------------------------
 .../persistence/queue/QueueManager.java         | 40 ++++++++++++++++----
 .../queue/impl/SQSQueueManagerImpl.java         | 20 ++++------
 .../persistence/queue/QueueManagerTest.java     |  6 ---
 3 files changed, 39 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4cb7a925/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
index 1f5a9e2..89abe40 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
@@ -21,20 +21,44 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
 
-
+/**
+ * Manages queues for usergrid.  Current implementation is sqs based.
+ */
 public interface QueueManager {
 
-    Queue createQueue( );
-
-    Queue getQueue();
-
-    List<QueueMessage> getMessages(int limit,int transactionTimeout, int waitTime, Class klass) throws ClassNotFoundException, IOException;
+    /**
+     * Read messages from queue
+     * @param limit
+     * @param transactionTimeout timeout in ms
+     * @param waitTime wait time for next message in ms
+     * @param klass class to cast the return from
+     * @return List of Queue Messages
+     */
+    List<QueueMessage> getMessages(int limit,int transactionTimeout, int waitTime, Class klass);
 
+    /**
+     * Commit the transaction
+     * @param queueMessage
+     */
     void commitMessage( QueueMessage queueMessage);
 
+    /**
+     * commit multiple messages
+     * @param queueMessages
+     */
     void commitMessages( List<QueueMessage> queueMessages);
 
-    void sendMessages(List bodies) throws IOException;
+    /**
+     * send messages to queue
+     * @param bodies body objects must be serializable
+     * @throws IOException
+     */
+    void sendMessages(List<Serializable> bodies) throws IOException;
 
-    void sendMessage(Object body)throws IOException;
+    /**
+     * send a message to queue
+     * @param body
+     * @throws IOException
+     */
+    void sendMessage(Serializable body)throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4cb7a925/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index 4614089..85b7f1e 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -21,8 +21,6 @@ import com.amazonaws.AmazonClientException;
 import com.amazonaws.SDKGlobalConfiguration;
 import com.amazonaws.auth.AWSCredentials;
 import com.amazonaws.auth.AWSCredentialsProvider;
-import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
-import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
 import com.amazonaws.regions.Region;
 import com.amazonaws.regions.Regions;
 import com.amazonaws.services.sqs.AmazonSQSClient;
@@ -36,13 +34,11 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.usergrid.persistence.queue.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.yaml.snakeyaml.util.Base64Coder;
-
 import java.io.*;
 import java.util.ArrayList;
 import java.util.List;
 
-public class SQSQueueManagerImpl implements QueueManager {
+public class SQSQueueManagerImpl<T> implements QueueManager {
     private static final Logger LOG = LoggerFactory.getLogger(SQSQueueManagerImpl.class);
 
     private final AmazonSQSClient sqs;
@@ -72,7 +68,6 @@ public class SQSQueueManagerImpl implements QueueManager {
         }
     }
 
-    @Override
     public Queue createQueue(){
         String name = getName();
         CreateQueueRequest createQueueRequest = new CreateQueueRequest()
@@ -87,7 +82,6 @@ public class SQSQueueManagerImpl implements QueueManager {
         String name = scope.getApplication().getType() + scope.getApplication().getUuid().toString() + scope.getName();
         return name;
     }
-    @Override
     public Queue getQueue(){
         if(queue == null) {
             ListQueuesResult result =  sqs.listQueues();
@@ -106,7 +100,7 @@ public class SQSQueueManagerImpl implements QueueManager {
     }
 
     @Override
-    public List<QueueMessage> getMessages(int limit, int transactionTimeout, int waitTime, Class klass) throws ClassNotFoundException, IOException {
+    public List<QueueMessage> getMessages(int limit, int transactionTimeout, int waitTime, Class klass) {
         waitTime = waitTime/1000;
         String url = getQueue().getUrl();
         LOG.info("Getting {} messages from {}",limit,url);
@@ -124,7 +118,7 @@ public class SQSQueueManagerImpl implements QueueManager {
                 body = fromString(message.getBody(),klass);
             }catch (Exception e){
                 LOG.error("failed to deserialize message", e);
-                throw e;
+                throw new RuntimeException(e);
             }
             QueueMessage queueMessage = new QueueMessage(message.getMessageId(),message.getReceiptHandle(),body);
             queueMessages.add(queueMessage);
@@ -143,15 +137,15 @@ public class SQSQueueManagerImpl implements QueueManager {
     }
 
     @Override
-    public void sendMessages(List bodies) throws IOException {
+    public void sendMessages(List<Serializable> bodies) throws IOException {
         String url = getQueue().getUrl();
         LOG.info("Sending Messages...{} to {}",bodies.size(),url);
 
         SendMessageBatchRequest request = new SendMessageBatchRequest(url);
         List<SendMessageBatchRequestEntry> entries = new ArrayList<>(bodies.size());
-        for(Object body : bodies){
+        for(Serializable body : bodies){
             SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry();
-            entry.setMessageBody(toString((Serializable)body));
+            entry.setMessageBody(toString(body));
             entries.add(entry);
         }
         request.setEntries(entries);
@@ -160,7 +154,7 @@ public class SQSQueueManagerImpl implements QueueManager {
     }
 
     @Override
-    public void sendMessage(Object body) throws IOException {
+    public void sendMessage(Serializable body) throws IOException {
         String url = getQueue().getUrl();
         LOG.info("Sending Message...{} to {}",body.toString(),url);
         SendMessageRequest request = new SendMessageRequest(url,toString((Serializable)body));

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4cb7a925/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
index 8e38f8c..b8b7967 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
@@ -58,12 +58,6 @@ public class QueueManagerTest {
 
     @Ignore("need aws creds")
     @Test
-    public void get() {
-        Queue queue = qm.getQueue();
-        assertNotNull(queue);
-    }
-    @Ignore("need aws creds")
-    @Test
     public void send() throws IOException,ClassNotFoundException{
         String value = "bodytest";
         qm.sendMessage(value);


[6/9] git commit: remove finished batch check

Posted by to...@apache.org.
remove finished batch check


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c0c30a91
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c0c30a91
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c0c30a91

Branch: refs/heads/collection_multiget
Commit: c0c30a9163c01491809b125107167a618a8bb8a6
Parents: 4cb7a92
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Oct 6 15:26:25 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Oct 6 15:26:25 2014 -0600

----------------------------------------------------------------------
 .../services/notifications/TaskManager.java     | 58 ++++++++++----------
 1 file changed, 29 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0c30a91/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
index 6b5441e..755cb56 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
@@ -147,36 +147,36 @@ public class TaskManager {
         }
     }
 
-    public  void finishedBatch() throws Exception {
-        synchronized (this) {
-            long successes = this.successes.getAndSet(0); //reset counters
-            long failures = this.failures.getAndSet(0); //reset counters
-            this.hasFinished = true;
-
-            // refresh notification
-            Notification notification = em.get(this.notification.getUuid(), Notification.class);
-            notification.setModified(System.currentTimeMillis());
-
-            //and write them out again, this will produce the most accurate count
-            Map<String, Long> stats = new HashMap<>(2);
-            stats.put("sent", successes);
-            stats.put("errors", failures);
-            notification.updateStatistics(successes, failures);
-
-            //none of this is known and should you ever do this
-            if (notification.getExpectedCount() <= (notification.getStatistics().get("sent") + notification.getStatistics().get("errors"))) {
-                Map<String, Object> properties = new HashMap<>();
-                notification.setFinished(notification.getModified());
-                properties.put("finished", notification.getModified());
-                properties.put("state", notification.getState());
-                LOG.info("done sending to devices in {} ms", notification.getFinished() - notification.getStarted());
-                notification.addProperties(properties);
-            }
-
-            LOG.info("notification finished batch: {} of {} devices", notification.getUuid(), successes + failures);
-            em.update(notification);
+    public void finishedBatch() throws Exception {
+        long successes = this.successes.get(); //reset counters
+        long failures = this.failures.get(); //reset counters
+        for(int i = 0;i<successes;i++){this.successes.decrementAndGet();}
+        for(int i = 0;i<failures;i++){this.failures.decrementAndGet();}
+
+        this.hasFinished = true;
+
+        // refresh notification
+        Notification notification = em.get(this.notification.getUuid(), Notification.class);
+        notification.setModified(System.currentTimeMillis());
+
+        //and write them out again, this will produce the most accurate count
+        Map<String, Long> stats = new HashMap<>(2);
+        stats.put("sent", successes);
+        stats.put("errors", failures);
+        notification.updateStatistics(successes, failures);
+
+        long totals = (notification.getStatistics().get("sent") + notification.getStatistics().get("errors"));
+        //none of this is known and should you ever do this
+        Map<String, Object> properties = new HashMap<>();
+        notification.setFinished(notification.getModified());
+        properties.put("finished", notification.getModified());
+        properties.put("state", notification.getState());
+        LOG.info("done sending to devices in {} ms", notification.getFinished() - notification.getStarted());
+        notification.addProperties(properties);
+
+        LOG.info("notification finished batch: {} of {} devices", notification.getUuid(), totals);
+        em.update(notification);
 //        Set<Notifier> notifiers = new HashSet<>(proxy.getNotifierMap().values()); // remove dups
 //        proxy.asyncCheckForInactiveDevices(notifiers);
-        }
     }
 }
\ No newline at end of file


[3/9] git commit: Merge branch 'two-dot-o' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into sqs_queues

Posted by to...@apache.org.
Merge branch 'two-dot-o' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into sqs_queues


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/028d3d6c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/028d3d6c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/028d3d6c

Branch: refs/heads/collection_multiget
Commit: 028d3d6cc5779bb2ced1006b33b120b22f7403f9
Parents: 5381039 a6e6e35
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Oct 6 11:40:29 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Oct 6 11:40:29 2014 -0600

----------------------------------------------------------------------
 .../apache/usergrid/rest/SystemResource.java    | 67 +++++++++++++++-----
 1 file changed, 52 insertions(+), 15 deletions(-)
----------------------------------------------------------------------



[4/9] git commit: change serializer

Posted by to...@apache.org.
change serializer


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/6f971cf6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/6f971cf6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/6f971cf6

Branch: refs/heads/collection_multiget
Commit: 6f971cf6ea67deac3b4ec71f50ebe66aea1de69e
Parents: 028d3d6
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Oct 6 14:33:26 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Oct 6 14:33:26 2014 -0600

----------------------------------------------------------------------
 .../persistence/queue/QueueManager.java         |   6 +-
 .../queue/impl/SQSQueueManagerImpl.java         | 101 +++++++++++--------
 .../persistence/queue/QueueManagerTest.java     |   6 +-
 .../services/notifications/QueueListener.java   |   2 +-
 4 files changed, 64 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6f971cf6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
index 3509e4e..1f5a9e2 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
@@ -28,13 +28,13 @@ public interface QueueManager {
 
     Queue getQueue();
 
-    List<QueueMessage> getMessages(int limit,int transactionTimeout, int waitTime);
+    List<QueueMessage> getMessages(int limit,int transactionTimeout, int waitTime, Class klass) throws ClassNotFoundException, IOException;
 
     void commitMessage( QueueMessage queueMessage);
 
     void commitMessages( List<QueueMessage> queueMessages);
 
-    void sendMessages(List<Serializable> bodies) throws IOException;
+    void sendMessages(List bodies) throws IOException;
 
-    void sendMessage(Serializable body)throws IOException;
+    void sendMessage(Object body)throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6f971cf6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index cf6ff45..4614089 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -27,7 +27,10 @@ import com.amazonaws.regions.Region;
 import com.amazonaws.regions.Regions;
 import com.amazonaws.services.sqs.AmazonSQSClient;
 import com.amazonaws.services.sqs.model.*;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
 import com.google.inject.Inject;
+import com.fasterxml.jackson.dataformat.smile.SmileFactory;
 import com.google.inject.assistedinject.Assisted;
 import org.apache.commons.lang.StringUtils;
 import org.apache.usergrid.persistence.queue.*;
@@ -45,7 +48,10 @@ public class SQSQueueManagerImpl implements QueueManager {
     private final AmazonSQSClient sqs;
     private final QueueScope scope;
     private final QueueFig fig;
+    private final ObjectMapper mapper;
     private Queue queue;
+    private static SmileFactory smileFactory = new SmileFactory();
+
 
     @Inject
     public SQSQueueManagerImpl(@Assisted QueueScope scope, QueueFig fig){
@@ -56,9 +62,17 @@ public class SQSQueueManagerImpl implements QueueManager {
         Regions regions = Regions.fromName(fig.getRegion());
         Region region = Region.getRegion(regions);
         sqs.setRegion(region);
+        try {
+            smileFactory.delegateToTextual(true);
+            mapper = new ObjectMapper( smileFactory );
+            mapper.enable(SerializationFeature.INDENT_OUTPUT);
+            mapper.enableDefaultTypingAsProperty(ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class");
+        } catch ( Exception e ) {
+            throw new RuntimeException("Error setting up mapper", e);
+        }
     }
 
-
+    @Override
     public Queue createQueue(){
         String name = getName();
         CreateQueueRequest createQueueRequest = new CreateQueueRequest()
@@ -73,7 +87,7 @@ public class SQSQueueManagerImpl implements QueueManager {
         String name = scope.getApplication().getType() + scope.getApplication().getUuid().toString() + scope.getName();
         return name;
     }
-
+    @Override
     public Queue getQueue(){
         if(queue == null) {
             ListQueuesResult result =  sqs.listQueues();
@@ -91,36 +105,14 @@ public class SQSQueueManagerImpl implements QueueManager {
         return queue;
     }
 
-    public void sendMessage(Serializable body) throws IOException{
-        String url = getQueue().getUrl();
-        LOG.info("Sending Message...{} to {}",body.toString(),url);
-        SendMessageRequest request = new SendMessageRequest(url,toString(body));
-        sqs.sendMessage(request);
-    }
-
-
-    public void sendMessages(List<Serializable> bodies) throws IOException{
-        String url = getQueue().getUrl();
-        LOG.info("Sending Messages...{} to {}",bodies.size(),url);
-
-        SendMessageBatchRequest request = new SendMessageBatchRequest(url);
-        List<SendMessageBatchRequestEntry> entries = new ArrayList<>(bodies.size());
-        for(Serializable body : bodies){
-            SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry();
-            entry.setMessageBody(toString(body));
-            entries.add(entry);
-        }
-        request.setEntries(entries);
-        sqs.sendMessageBatch(request);
-    }
-
-    public  List<QueueMessage> getMessages( int limit,int timeout, int waitTime) {
+    @Override
+    public List<QueueMessage> getMessages(int limit, int transactionTimeout, int waitTime, Class klass) throws ClassNotFoundException, IOException {
         waitTime = waitTime/1000;
         String url = getQueue().getUrl();
         LOG.info("Getting {} messages from {}",limit,url);
         ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(url);
         receiveMessageRequest.setMaxNumberOfMessages(limit);
-        receiveMessageRequest.setVisibilityTimeout(timeout);
+        receiveMessageRequest.setVisibilityTimeout(transactionTimeout);
         receiveMessageRequest.setWaitTimeSeconds(waitTime);
         ReceiveMessageResult result = sqs.receiveMessage(receiveMessageRequest);
         List<Message> messages = result.getMessages();
@@ -129,10 +121,10 @@ public class SQSQueueManagerImpl implements QueueManager {
         for (Message message : messages) {
             Object body ;
             try{
-                body = fromString(message.getBody());
+                body = fromString(message.getBody(),klass);
             }catch (Exception e){
                 LOG.error("failed to deserialize message", e);
-                body  = message.getBody();
+                throw e;
             }
             QueueMessage queueMessage = new QueueMessage(message.getMessageId(),message.getReceiptHandle(),body);
             queueMessages.add(queueMessage);
@@ -140,7 +132,8 @@ public class SQSQueueManagerImpl implements QueueManager {
         return queueMessages;
     }
 
-    public void commitMessage( QueueMessage queueMessage){
+    @Override
+    public void commitMessage(QueueMessage queueMessage) {
         String url = getQueue().getUrl();
         LOG.info("Commit message {} to queue {}",queueMessage.getMessageId(),url);
 
@@ -149,7 +142,33 @@ public class SQSQueueManagerImpl implements QueueManager {
                 .withReceiptHandle(queueMessage.getHandle()));
     }
 
-    public void commitMessages( List<QueueMessage> queueMessages){
+    @Override
+    public void sendMessages(List bodies) throws IOException {
+        String url = getQueue().getUrl();
+        LOG.info("Sending Messages...{} to {}",bodies.size(),url);
+
+        SendMessageBatchRequest request = new SendMessageBatchRequest(url);
+        List<SendMessageBatchRequestEntry> entries = new ArrayList<>(bodies.size());
+        for(Object body : bodies){
+            SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry();
+            entry.setMessageBody(toString((Serializable)body));
+            entries.add(entry);
+        }
+        request.setEntries(entries);
+        sqs.sendMessageBatch(request);
+
+    }
+
+    @Override
+    public void sendMessage(Object body) throws IOException {
+        String url = getQueue().getUrl();
+        LOG.info("Sending Message...{} to {}",body.toString(),url);
+        SendMessageRequest request = new SendMessageRequest(url,toString((Serializable)body));
+        sqs.sendMessage(request);
+    }
+
+    @Override
+    public void commitMessages(List<QueueMessage> queueMessages) {
         String url = getQueue().getUrl();
         LOG.info("Commit messages {} to queue {}",queueMessages.size(),url);
         List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>();
@@ -160,27 +179,21 @@ public class SQSQueueManagerImpl implements QueueManager {
         DeleteMessageBatchResult result = sqs.deleteMessageBatch(request);
         boolean successful = result.getFailed().size() > 0;
         if(!successful){
-            LOG.error("Commit failed {} messages",result.getFailed().size());
+            LOG.error("Commit failed {} messages", result.getFailed().size());
         }
     }
 
+
+
     /** Read the object from Base64 string. */
-    private static Object fromString( String s ) throws IOException, ClassNotFoundException {
-        byte [] data = Base64Coder.decode(s.toCharArray());
-        ObjectInputStream ois = new ObjectInputStream(
-                new ByteArrayInputStream(  data ) );
-        Object o  = ois.readObject();
-        ois.close();
+    private Object fromString( String s, Class klass ) throws IOException, ClassNotFoundException {
+        Object o =  mapper.readValue(s,klass);
         return o;
     }
 
     /** Write the object to a Base64 string. */
-    private static String toString( Serializable o ) throws IOException {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        ObjectOutputStream oos = new ObjectOutputStream( baos );
-        oos.writeObject( o );
-        oos.close();
-        return new String( Base64Coder.encode( baos.toByteArray() ) );
+    private  String toString( Serializable o ) throws IOException {
+        return mapper.writeValueAsString(o);
     }
 
     public class UsergridAwsCredentialsProvider implements AWSCredentialsProvider {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6f971cf6/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
index 772b75e..8e38f8c 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
@@ -64,16 +64,16 @@ public class QueueManagerTest {
     }
     @Ignore("need aws creds")
     @Test
-    public void send() throws IOException{
+    public void send() throws IOException,ClassNotFoundException{
         String value = "bodytest";
         qm.sendMessage(value);
-        List<QueueMessage> messageList = qm.getMessages(1,5000,5000);
+        List<QueueMessage> messageList = qm.getMessages(1,5000,5000,String.class);
         assertTrue(messageList.size() >= 1);
         for(QueueMessage message : messageList){
             assertTrue(message.getBody().equals(value));
             qm.commitMessage(message);
         }
-        messageList = qm.getMessages(1,5000,5000);
+        messageList = qm.getMessages(1,5000,5000,String.class);
         assertTrue(messageList.size() <= 0);
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6f971cf6/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index a381c70..accfa94 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -138,7 +138,7 @@ public class QueueListener  {
                 LOG.info("getting from queue {} ", queueName);
                 QueueScope queueScope = new QueueScopeImpl(new SimpleId(smf.getManagementAppId(),"notifications"),queueName);
                 QueueManager queueManager = queueManagerFactory.getQueueManager(queueScope);
-                List<QueueMessage> messages = queueManager.getMessages(getBatchSize(),MESSAGE_TRANSACTION_TIMEOUT,5000);
+                List<QueueMessage> messages = queueManager.getMessages(getBatchSize(),MESSAGE_TRANSACTION_TIMEOUT,5000,ApplicationQueueMessage.class);
                 LOG.info("retrieved batch of {} messages from queue {} ", messages.size(),queueName);
 
                 if (messages.size() > 0) {


[2/9] git commit: start queues

Posted by to...@apache.org.
start queues

moving files

added fig

queues!!!

validating credentials

moving dependencies

adding serialization

change formatting

dependency issues, removing old queueing mechanism

tests passing

yaml version for aws

organization

adding wait time

fixing tests

better logging

bad variable


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/53810396
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/53810396
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/53810396

Branch: refs/heads/collection_multiget
Commit: 5381039658274b9e584bb98eb86249d7fed157ce
Parents: 9c4b26e
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Oct 3 10:34:19 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Oct 6 11:35:02 2014 -0600

----------------------------------------------------------------------
 stack/core/pom.xml                              |  15 ++
 .../usergrid/corepersistence/GuiceModule.java   |   4 +
 stack/corepersistence/map/pom.xml               |  16 ++
 stack/corepersistence/pom.xml                   |  17 ++
 stack/corepersistence/queue/pom.xml             |  92 ++++++++
 .../usergrid/persistence/queue/Queue.java       |  31 +++
 .../usergrid/persistence/queue/QueueFig.java    |  16 ++
 .../persistence/queue/QueueManager.java         |  40 ++++
 .../persistence/queue/QueueManagerFactory.java  |  23 ++
 .../persistence/queue/QueueMessage.java         |  42 ++++
 .../usergrid/persistence/queue/QueueScope.java  |  31 +++
 .../persistence/queue/guice/QueueModule.java    |  51 +++++
 .../persistence/queue/impl/QueueScopeImpl.java  |  87 ++++++++
 .../queue/impl/SQSQueueManagerImpl.java         | 222 +++++++++++++++++++
 .../persistence/queue/QueueManagerTest.java     |  81 +++++++
 .../queue/guice/TestQueueModule.java            |  33 +++
 stack/pom.xml                                   |   8 +-
 .../notifications/ApplicationQueueManager.java  |  96 ++++----
 .../notifications/ApplicationQueueMessage.java  |  67 ++----
 .../notifications/NotificationsService.java     |  15 +-
 .../services/notifications/QueueListener.java   |  69 +++---
 .../services/notifications/QueueManager.java    |  31 ---
 .../services/notifications/TaskManager.java     |  78 +++----
 .../apns/NotificationsServiceIT.java            |  47 ++--
 .../gcm/NotificationsServiceIT.java             |   2 +-
 25 files changed, 973 insertions(+), 241 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/core/pom.xml
----------------------------------------------------------------------
diff --git a/stack/core/pom.xml b/stack/core/pom.xml
index 6cd5dac..30a14d1 100644
--- a/stack/core/pom.xml
+++ b/stack/core/pom.xml
@@ -526,6 +526,21 @@
 	    <type>jar</type>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.usergrid</groupId>
+      <artifactId>map</artifactId>
+      <version>2.0.0-SNAPSHOT</version>
+      <type>jar</type>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.usergrid</groupId>
+      <artifactId>queue</artifactId>
+      <version>2.0.0-SNAPSHOT</version>
+      <type>jar</type>
+    </dependency>
+
+
     <!--<dependency>-->
       <!--<artifactId>lucene-core</artifactId>-->
       <!--<groupId>org.apache.lucene</groupId>-->

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
index 1f3d615..42f81d5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
@@ -17,6 +17,8 @@
 package org.apache.usergrid.corepersistence;
 
 
+import org.apache.usergrid.persistence.map.guice.MapModule;
+import org.apache.usergrid.persistence.queue.guice.QueueModule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,6 +43,8 @@ public class GuiceModule  extends AbstractModule {
         install(new CollectionModule());
         install(new GraphModule());
         install(new IndexModule());
+        install(new MapModule());
+        install(new QueueModule());
 
         bind(CpEntityDeleteListener.class).asEagerSingleton();
         bind(CpEntityIndexDeleteListener.class).asEagerSingleton();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/corepersistence/map/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/pom.xml b/stack/corepersistence/map/pom.xml
index 93b1030..e9cb5ab 100644
--- a/stack/corepersistence/map/pom.xml
+++ b/stack/corepersistence/map/pom.xml
@@ -1,4 +1,20 @@
 <?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
 <project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/corepersistence/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/pom.xml b/stack/corepersistence/pom.xml
index 5e6bf01..7482271 100644
--- a/stack/corepersistence/pom.xml
+++ b/stack/corepersistence/pom.xml
@@ -1,4 +1,20 @@
 <?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
 <project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
@@ -68,6 +84,7 @@
         <module>queryindex</module>
         <module>common</module>
         <module>map</module>
+        <module>queue</module>
     </modules>
 
     <build>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/corepersistence/queue/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/pom.xml b/stack/corepersistence/queue/pom.xml
new file mode 100644
index 0000000..94f11a8
--- /dev/null
+++ b/stack/corepersistence/queue/pom.xml
@@ -0,0 +1,92 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+~ Licensed to the Apache Software Foundation (ASF) under one
+~ or more contributor license agreements.  See the NOTICE file
+~ distributed with this work for additional information
+~ regarding copyright ownership.  The ASF licenses this file
+~ to you under the Apache License, Version 2.0 (the
+~ "License"); you may not use this file except in compliance
+~ with the License.  You may obtain a copy of the License at
+~
+~    http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing,
+~ software distributed under the License is distributed on an
+~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+~ KIND, either express or implied.  See the License for the
+~ specific language governing permissions and limitations
+~ under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>persistence</artifactId>
+    <groupId>org.apache.usergrid</groupId>
+    <version>2.0.0-SNAPSHOT</version>
+  </parent>
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>queue</artifactId>
+
+  <name>Usergrid Queue</name>
+
+  <dependencies>
+
+
+    <dependency>
+      <groupId>org.apache.usergrid</groupId>
+      <artifactId>common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+
+
+    <!-- lang utils for setting uuids etc -->
+
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <version>${commons.lang.version}</version>
+    </dependency>
+
+    <!-- tests -->
+
+    <dependency>
+      <groupId>org.apache.usergrid</groupId>
+      <artifactId>common</artifactId>
+      <version>${project.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+
+
+    <dependency>
+      <groupId>org.jukito</groupId>
+      <artifactId>jukito</artifactId>
+      <version>${jukito.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+
+    <dependency>
+      <groupId>org.apache.usergrid</groupId>
+      <artifactId>collection</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>com.amazonaws</groupId>
+      <artifactId>aws-java-sdk</artifactId>
+      <version>1.8.11</version>
+    </dependency>
+
+
+  </dependencies>
+
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/Queue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/Queue.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/Queue.java
new file mode 100644
index 0000000..2cc49aa
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/Queue.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.queue;
+
+
+public class Queue {
+    private final String url;
+
+    public Queue(String url) {
+        this.url = url;
+    }
+
+    public String getUrl(){
+        return url;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
new file mode 100644
index 0000000..fd71f9e
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
@@ -0,0 +1,16 @@
+package org.apache.usergrid.persistence.queue;
+
+import org.safehaus.guicyfig.Default;
+import org.safehaus.guicyfig.FigSingleton;
+import org.safehaus.guicyfig.GuicyFig;
+import org.safehaus.guicyfig.Key;
+
+@FigSingleton
+public interface QueueFig extends GuicyFig {
+
+    @Key( "queue.region" )
+    @Default("us-east-1")
+    public String getRegion();
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
new file mode 100644
index 0000000..3509e4e
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.queue;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+
+public interface QueueManager {
+
+    Queue createQueue( );
+
+    Queue getQueue();
+
+    List<QueueMessage> getMessages(int limit,int transactionTimeout, int waitTime);
+
+    void commitMessage( QueueMessage queueMessage);
+
+    void commitMessages( List<QueueMessage> queueMessages);
+
+    void sendMessages(List<Serializable> bodies) throws IOException;
+
+    void sendMessage(Serializable body)throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManagerFactory.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManagerFactory.java
new file mode 100644
index 0000000..4cdb5e2
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManagerFactory.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.queue;
+
+public interface QueueManagerFactory {
+    public QueueManager getQueueManager( final QueueScope scope );
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
new file mode 100644
index 0000000..1aef3a3
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.queue;
+
+public class QueueMessage {
+    private final Object body;
+    private final String messageId;
+    private final String handle;
+
+    public QueueMessage(String messageId, String handle, Object body) {
+        this.body = body;
+        this.messageId = messageId;
+        this.handle = handle;
+    }
+
+    public String getHandle() {
+        return handle;
+    }
+
+    public Object getBody(){
+        return body;
+    }
+
+    public String getMessageId() {
+        return messageId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScope.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScope.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScope.java
new file mode 100644
index 0000000..b2b2ec6
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScope.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.queue;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+
+public interface QueueScope extends ApplicationScope {
+
+    /**
+     * Get the name of the the map
+     * @return
+     */
+    public String getName();
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
new file mode 100644
index 0000000..e8fc7c8
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.queue.guice;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+import org.apache.usergrid.persistence.queue.QueueFig;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+import org.apache.usergrid.persistence.queue.impl.SQSQueueManagerImpl;
+import org.safehaus.guicyfig.GuicyFigModule;
+
+
+/**
+ * Simple module for wiring our collection api
+ *
+ * @author tnine
+ */
+public class QueueModule extends AbstractModule {
+
+
+    @Override
+    protected void configure() {
+
+        install( new GuicyFigModule( QueueFig.class) );
+        // create a guice factory for getting our collection manager
+        install( new FactoryModuleBuilder().implement( QueueManager.class, SQSQueueManagerImpl.class )
+                                           .build( QueueManagerFactory.class ) );
+
+    }
+
+
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeImpl.java
new file mode 100644
index 0000000..d78a66d
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeImpl.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.queue.impl;
+
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.queue.QueueScope;
+
+/**
+ * Created by ApigeeCorporation on 10/3/14.
+ */
+public class QueueScopeImpl implements QueueScope {
+    private final Id owner;
+    private final String name;
+
+
+    public QueueScopeImpl( final Id owner, final String name ) {
+        this.owner = owner;
+        this.name = name;
+    }
+
+
+
+    @Override
+    public Id getApplication() {
+        return owner;
+    }
+
+
+    @Override
+    public String getName() {
+        return name;
+    }
+
+
+    @Override
+    public boolean equals( final Object o ) {
+        if ( this == o ) {
+            return true;
+        }
+        if ( !( o instanceof QueueScopeImpl ) ) {
+            return false;
+        }
+
+        final QueueScopeImpl queueScope = ( QueueScopeImpl ) o;
+
+        if ( !name.equals( queueScope.name ) ) {
+            return false;
+        }
+        if ( !owner.equals( queueScope.owner ) ) {
+            return false;
+        }
+
+        return true;
+    }
+
+
+    @Override
+    public int hashCode() {
+        int result = owner.hashCode();
+        result = 31 * result + name.hashCode();
+        return result;
+    }
+
+
+    @Override
+    public String toString() {
+        return "QueueScopeImpl{" +
+                "owner=" + owner +
+                ", name='" + name + '\'' +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
new file mode 100644
index 0000000..cf6ff45
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.queue.impl;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.SDKGlobalConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
+import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.sqs.AmazonSQSClient;
+import com.amazonaws.services.sqs.model.*;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import org.apache.commons.lang.StringUtils;
+import org.apache.usergrid.persistence.queue.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.util.Base64Coder;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.List;
+
+public class SQSQueueManagerImpl implements QueueManager {
+    private static final Logger LOG = LoggerFactory.getLogger(SQSQueueManagerImpl.class);
+
+    private final AmazonSQSClient sqs;
+    private final QueueScope scope;
+    private final QueueFig fig;
+    private Queue queue;
+
+    @Inject
+    public SQSQueueManagerImpl(@Assisted QueueScope scope, QueueFig fig){
+        this.fig = fig;
+        this.scope = scope;
+        UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
+        this.sqs = new AmazonSQSClient(ugProvider.getCredentials());
+        Regions regions = Regions.fromName(fig.getRegion());
+        Region region = Region.getRegion(regions);
+        sqs.setRegion(region);
+    }
+
+
+    public Queue createQueue(){
+        String name = getName();
+        CreateQueueRequest createQueueRequest = new CreateQueueRequest()
+                .withQueueName(name);
+        CreateQueueResult result = sqs.createQueue(createQueueRequest);
+        String url = result.getQueueUrl();
+        LOG.info("Created queue with url {}",url);
+        return new Queue(url);
+    }
+
+    private String getName() {
+        String name = scope.getApplication().getType() + scope.getApplication().getUuid().toString() + scope.getName();
+        return name;
+    }
+
+    public Queue getQueue(){
+        if(queue == null) {
+            ListQueuesResult result =  sqs.listQueues();
+            for (String queueUrl : result.getQueueUrls()) {
+                boolean found = queueUrl.contains(getName());
+                if (found) {
+                    queue = new Queue(queueUrl);
+                    break;
+                }
+            }
+        }
+        if(queue == null) {
+            queue = createQueue();
+        }
+        return queue;
+    }
+
+    public void sendMessage(Serializable body) throws IOException{
+        String url = getQueue().getUrl();
+        LOG.info("Sending Message...{} to {}",body.toString(),url);
+        SendMessageRequest request = new SendMessageRequest(url,toString(body));
+        sqs.sendMessage(request);
+    }
+
+
+    public void sendMessages(List<Serializable> bodies) throws IOException{
+        String url = getQueue().getUrl();
+        LOG.info("Sending Messages...{} to {}",bodies.size(),url);
+
+        SendMessageBatchRequest request = new SendMessageBatchRequest(url);
+        List<SendMessageBatchRequestEntry> entries = new ArrayList<>(bodies.size());
+        for(Serializable body : bodies){
+            SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry();
+            entry.setMessageBody(toString(body));
+            entries.add(entry);
+        }
+        request.setEntries(entries);
+        sqs.sendMessageBatch(request);
+    }
+
+    public  List<QueueMessage> getMessages( int limit,int timeout, int waitTime) {
+        waitTime = waitTime/1000;
+        String url = getQueue().getUrl();
+        LOG.info("Getting {} messages from {}",limit,url);
+        ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(url);
+        receiveMessageRequest.setMaxNumberOfMessages(limit);
+        receiveMessageRequest.setVisibilityTimeout(timeout);
+        receiveMessageRequest.setWaitTimeSeconds(waitTime);
+        ReceiveMessageResult result = sqs.receiveMessage(receiveMessageRequest);
+        List<Message> messages = result.getMessages();
+        LOG.info("Received {} messages from {}",messages.size(),url);
+        List<QueueMessage> queueMessages = new ArrayList<>(messages.size());
+        for (Message message : messages) {
+            Object body ;
+            try{
+                body = fromString(message.getBody());
+            }catch (Exception e){
+                LOG.error("failed to deserialize message", e);
+                body  = message.getBody();
+            }
+            QueueMessage queueMessage = new QueueMessage(message.getMessageId(),message.getReceiptHandle(),body);
+            queueMessages.add(queueMessage);
+        }
+        return queueMessages;
+    }
+
+    public void commitMessage( QueueMessage queueMessage){
+        String url = getQueue().getUrl();
+        LOG.info("Commit message {} to queue {}",queueMessage.getMessageId(),url);
+
+        sqs.deleteMessage(new DeleteMessageRequest()
+                .withQueueUrl(url)
+                .withReceiptHandle(queueMessage.getHandle()));
+    }
+
+    public void commitMessages( List<QueueMessage> queueMessages){
+        String url = getQueue().getUrl();
+        LOG.info("Commit messages {} to queue {}",queueMessages.size(),url);
+        List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>();
+        for(QueueMessage message : queueMessages){
+            entries.add(new DeleteMessageBatchRequestEntry(message.getMessageId(),message.getHandle()));
+        }
+        DeleteMessageBatchRequest request = new DeleteMessageBatchRequest(url,entries);
+        DeleteMessageBatchResult result = sqs.deleteMessageBatch(request);
+        boolean successful = result.getFailed().size() > 0;
+        if(!successful){
+            LOG.error("Commit failed {} messages",result.getFailed().size());
+        }
+    }
+
+    /** Read the object from Base64 string. */
+    private static Object fromString( String s ) throws IOException, ClassNotFoundException {
+        byte [] data = Base64Coder.decode(s.toCharArray());
+        ObjectInputStream ois = new ObjectInputStream(
+                new ByteArrayInputStream(  data ) );
+        Object o  = ois.readObject();
+        ois.close();
+        return o;
+    }
+
+    /** Write the object to a Base64 string. */
+    private static String toString( Serializable o ) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutputStream oos = new ObjectOutputStream( baos );
+        oos.writeObject( o );
+        oos.close();
+        return new String( Base64Coder.encode( baos.toByteArray() ) );
+    }
+
+    public class UsergridAwsCredentialsProvider implements AWSCredentialsProvider {
+
+        private AWSCredentials creds;
+
+        public  UsergridAwsCredentialsProvider(){
+            init();
+        }
+
+        private void init() {
+            creds = new AWSCredentials() {
+                @Override
+                public String getAWSAccessKeyId() {
+                    return StringUtils.trim(System.getProperty(SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR));
+                }
+
+                @Override
+                public String getAWSSecretKey() {
+                    return StringUtils.trim(System.getProperty(SDKGlobalConfiguration.SECRET_KEY_ENV_VAR));
+                }
+            };
+            if(StringUtils.isEmpty(creds.getAWSAccessKeyId()) || StringUtils.isEmpty(creds.getAWSSecretKey()) ){
+                throw new AmazonClientException("could not retrieve credentials from system properties");
+            }
+        }
+
+        @Override
+        public AWSCredentials getCredentials() {
+            return creds;
+        }
+
+
+        @Override
+        public void refresh() {
+            init();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
new file mode 100644
index 0000000..772b75e
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.queue;
+
+import org.apache.usergrid.persistence.collection.util.InvalidEntityGenerator;
+import org.apache.usergrid.persistence.queue.guice.TestQueueModule;
+import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
+import org.jukito.UseModules;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.usergrid.persistence.core.cassandra.ITRunner;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+
+import com.google.inject.Inject;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+@RunWith( ITRunner.class )
+@UseModules( { TestQueueModule.class } )
+public class QueueManagerTest {
+
+
+    @Inject
+    protected QueueManagerFactory qmf;
+
+    protected QueueScope scope;
+    private QueueManager qm;
+
+
+    @Before
+    public void mockApp() {
+        this.scope = new QueueScopeImpl( new SimpleId( "application" ), "testQueue" );
+        qm = qmf.getQueueManager(scope);
+    }
+
+    @Ignore("need aws creds")
+    @Test
+    public void get() {
+        Queue queue = qm.getQueue();
+        assertNotNull(queue);
+    }
+    @Ignore("need aws creds")
+    @Test
+    public void send() throws IOException{
+        String value = "bodytest";
+        qm.sendMessage(value);
+        List<QueueMessage> messageList = qm.getMessages(1,5000,5000);
+        assertTrue(messageList.size() >= 1);
+        for(QueueMessage message : messageList){
+            assertTrue(message.getBody().equals(value));
+            qm.commitMessage(message);
+        }
+        messageList = qm.getMessages(1,5000,5000);
+        assertTrue(messageList.size() <= 0);
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/guice/TestQueueModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/guice/TestQueueModule.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/guice/TestQueueModule.java
new file mode 100644
index 0000000..b65725f
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/guice/TestQueueModule.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.queue.guice;
+
+
+import org.apache.usergrid.persistence.collection.guice.TestModule;
+import org.apache.usergrid.persistence.core.guice.CommonModule;
+
+
+
+public class TestQueueModule extends TestModule {
+
+    @Override
+    protected void configure() {
+        install( new CommonModule());
+        install( new QueueModule() );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/pom.xml
----------------------------------------------------------------------
diff --git a/stack/pom.xml b/stack/pom.xml
index e105c79..b1bfe3c 100644
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@ -111,7 +111,7 @@
     <org.springframework.version>3.1.2.RELEASE</org.springframework.version>
     <shiro-version>1.2.0</shiro-version>
     <slf4j-version>1.6.1</slf4j-version>
-    <snakeyaml-version>1.8</snakeyaml-version>
+    <snakeyaml-version>1.6</snakeyaml-version>
     <tomcat-version>7.0.52</tomcat-version>
     <antlr.version>3.4</antlr.version>
     <tika.version>1.4</tika.version>
@@ -276,7 +276,7 @@
       <dependency>
         <groupId>org.apache.httpcomponents</groupId>
         <artifactId>httpclient</artifactId>
-        <version>4.1.3</version>
+        <version>4.2</version>
         <exclusions>
           <exclusion>
             <groupId>commons-codec</groupId>
@@ -457,6 +457,10 @@
         <version>${cassandra-version}</version>
         <exclusions>
           <exclusion>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+          </exclusion>
+          <exclusion>
             <groupId>commons-lang</groupId>
             <artifactId>commons-lang</artifactId>
           </exclusion>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
index c88cead..60c1602 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
@@ -27,6 +27,8 @@ import org.apache.usergrid.persistence.entities.Notification;
 import org.apache.usergrid.persistence.entities.Notifier;
 import org.apache.usergrid.persistence.entities.Receipt;
 import org.apache.usergrid.persistence.index.query.Query;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.QueueMessage;
 import org.apache.usergrid.services.notifications.apns.APNsAdapter;
 import org.apache.usergrid.services.notifications.gcm.GCMAdapter;
 import org.slf4j.Logger;
@@ -42,13 +44,10 @@ import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
 
-/**
- * Created by ApigeeCorporation on 8/27/14.
- */
-public class ApplicationQueueManager implements QueueManager {
+public class ApplicationQueueManager  {
 
-    public static  String DEFAULT_QUEUE_NAME = "notifications/queuelistenerv1_40;notifications/queuelistenerv1_41;notifications/queuelistenerv1_42";
-    public static final String DEFAULT_QUEUE_PROPERTY = "usergrid.notifications.listener.queueName";
+    public static  String DEFAULT_QUEUE_NAME = "queuelistenerv1_60";
+    public static final String DEFAULT_QUEUE_PROPERTY = "usergrid.notifications.listener.queue";
     private static final Logger LOG = LoggerFactory.getLogger(ApplicationQueueManager.class);
 
     //this is for tests, will not mark initial post complete, set to false for tests
@@ -57,11 +56,10 @@ public class ApplicationQueueManager implements QueueManager {
     public static final String NOTIFIER_ID_POSTFIX = ".notifier.id";
 
     private final EntityManager em;
-    private final org.apache.usergrid.mq.QueueManager qm;
+    private final QueueManager qm;
     private final JobScheduler jobScheduler;
     private final MetricsFactory metricsFactory;
-    private final String[] queueNames;
-    private boolean sendNow = true;
+    private final String queueName;
 
     HashMap<Object, Notifier> notifierHashMap; // only retrieve notifiers once
 
@@ -77,13 +75,12 @@ public class ApplicationQueueManager implements QueueManager {
     public static ProviderAdapter TEST_ADAPTER = new TestAdapter();
 
 
-    public ApplicationQueueManager(JobScheduler jobScheduler, EntityManager entityManager, org.apache.usergrid.mq.QueueManager queueManager, MetricsFactory metricsFactory, Properties properties){
+    public ApplicationQueueManager(JobScheduler jobScheduler, EntityManager entityManager, QueueManager queueManager, MetricsFactory metricsFactory, Properties properties){
         this.em = entityManager;
         this.qm = queueManager;
         this.jobScheduler = jobScheduler;
         this.metricsFactory = metricsFactory;
-        this.queueNames = getQueueNames(properties);
-        this.sendNow = new Boolean(properties.getProperty("usergrid.notifications.sendNow",""+sendNow));
+        this.queueName = getQueueNames(properties);
     }
 
 
@@ -101,26 +98,24 @@ public class ApplicationQueueManager implements QueueManager {
         long startTime = System.currentTimeMillis();
 
         if (notification.getCanceled() == Boolean.TRUE) {
-            LOG.info("ApplicationQueueMessage: notification " + notification.getUuid() + " canceled");
+            LOG.info("notification " + notification.getUuid() + " canceled");
             if (jobExecution != null) {
                 jobExecution.killed();
             }
             return;
         }
 
-        LOG.info("ApplicationQueueMessage: notification {} start queuing", notification.getUuid());
+        LOG.info("notification {} start queuing", notification.getUuid());
 
         final PathQuery<Device> pathQuery = notification.getPathQuery() ; //devices query
         final AtomicInteger deviceCount = new AtomicInteger(); //count devices so you can make a judgement on batching
         final ConcurrentLinkedQueue<String> errorMessages = new ConcurrentLinkedQueue<String>(); //build up list of issues
 
         final HashMap<Object,Notifier> notifierMap =  getNotifierMap();
-        final String queueName = getRandomQueue(queueNames);
-        final List<ApplicationQueueMessage> messages = new ArrayList<>();
 
         //get devices in querystring, and make sure you have access
         if (pathQuery != null) {
-            LOG.info("ApplicationQueueMessage: notification {} start query", notification.getUuid());
+            LOG.info("notification {} start query", notification.getUuid());
             final Iterator<Device> iterator = pathQuery.iterator(em);
             //if there are more pages (defined by PAGE_SIZE) you probably want this to be async, also if this is already a job then don't reschedule
             if (iterator instanceof ResultsIterator && ((ResultsIterator) iterator).hasPages() && jobExecution == null) {
@@ -132,8 +127,6 @@ public class ApplicationQueueManager implements QueueManager {
             final UUID appId = em.getApplication().getUuid();
             final Map<String,Object> payloads = notification.getPayloads();
 
-            final boolean sendNow = this.sendNow; //&& jobExecution == null;
-
             final Func1<Entity,Entity> entityListFunct = new Func1<Entity, Entity>() {
                 @Override
                 public Entity call(Entity entity) {
@@ -143,13 +136,13 @@ public class ApplicationQueueManager implements QueueManager {
                         long now = System.currentTimeMillis();
                         List<EntityRef> devicesRef = getDevices(entity); // resolve group
 
-                        LOG.info("ApplicationQueueMessage: notification {} queue  {} devices, duration "+(System.currentTimeMillis()-now)+" ms", notification.getUuid(), devicesRef.size());
+                        LOG.info("notification {} queue  {} devices, duration "+(System.currentTimeMillis()-now)+" ms", notification.getUuid(), devicesRef.size());
 
                         for (EntityRef deviceRef : devicesRef) {
-                            LOG.info("ApplicationQueueMessage: notification {} starting to queue device {} ", notification.getUuid(), deviceRef.getUuid());
+                            LOG.info("notification {} starting to queue device {} ", notification.getUuid(), deviceRef.getUuid());
                             long hash = MurmurHash.hash(deviceRef.getUuid());
                             if (sketch.estimateCount(hash) > 0) { //look for duplicates
-                                LOG.debug("ApplicationQueueMessage: Maybe Found duplicate device: {}", deviceRef.getUuid());
+                                LOG.warn("Maybe Found duplicate device: {}", deviceRef.getUuid());
                                 continue;
                             } else {
                                 sketch.add(hash, 1);
@@ -167,11 +160,11 @@ public class ApplicationQueueManager implements QueueManager {
                                     notifierKey = entry.getKey().toLowerCase();
                                     break;
                                 }
-                                LOG.info("ApplicationQueueMessage: Provider query for notification {} device {} took "+(System.currentTimeMillis()-now)+" ms",notification.getUuid(),deviceRef.getUuid());
+                                LOG.info("Provider query for notification {} device {} took "+(System.currentTimeMillis()-now)+" ms",notification.getUuid(),deviceRef.getUuid());
                             }
 
                             if (notifierId == null) {
-                                LOG.debug("ApplicationQueueMessage: Notifier did not match for device {} ", deviceRef);
+                                LOG.info("Notifier did not match for device {} ", deviceRef);
                                 continue;
                             }
 
@@ -180,15 +173,11 @@ public class ApplicationQueueManager implements QueueManager {
                                 // update queued time
                                 now = System.currentTimeMillis();
                                 notification.setQueued(System.currentTimeMillis());
-                                LOG.info("ApplicationQueueMessage: notification {} device {} queue time set. duration "+(System.currentTimeMillis()-now)+" ms", notification.getUuid(), deviceRef.getUuid());
+                                LOG.info("notification {} device {} queue time set. duration "+(System.currentTimeMillis()-now)+" ms", notification.getUuid(), deviceRef.getUuid());
                             }
                             now = System.currentTimeMillis();
-                            if(sendNow){ //if(jobExecution == null && sendNow) {
-                                messages.add(message);
-                            }else{
-                                qm.postToQueue(queueName, message);
-                            }
-                            LOG.info("ApplicationQueueMessage: notification {} post-queue to device {} duration " + (System.currentTimeMillis() - now) + " ms "+queueName+" queue", notification.getUuid(), deviceRef.getUuid());
+                            qm.sendMessage(message);
+                            LOG.info("notification {} post-queue to device {} duration " + (System.currentTimeMillis() - now) + " ms "+queueName+" queue", notification.getUuid(), deviceRef.getUuid());
                             deviceCount.incrementAndGet();
                             queueMeter.mark();
                         }
@@ -215,7 +204,7 @@ public class ApplicationQueueManager implements QueueManager {
                         }
                     });
             o.toBlocking().lastOrDefault(null);
-            LOG.info("ApplicationQueueMessage: notification {} done queueing duration {} ms", notification.getUuid(), System.currentTimeMillis() - now);
+            LOG.info("notification {} done queueing duration {} ms", notification.getUuid(), System.currentTimeMillis() - now);
 
 
         }
@@ -238,26 +227,19 @@ public class ApplicationQueueManager implements QueueManager {
 
         em.update(notification);
 
-        LOG.info("ApplicationQueueMessage: notification {} updated notification duration {} ms", notification.getUuid(),System.currentTimeMillis() - now);
+        LOG.info("notification {} updated notification duration {} ms", notification.getUuid(),System.currentTimeMillis() - now);
 
         //do i have devices, and have i already started batching.
         if (deviceCount.get() <= 0) {
-            TaskManager taskManager = new TaskManager(em, qm, this, notification,queueName);
+            TaskManager taskManager = new TaskManager(em, this, notification,this.qm);
             //if i'm in a test value will be false, do not mark finished for test orchestration, not ideal need real tests
             taskManager.finishedBatch();
         }
 
         if (LOG.isInfoEnabled()) {
             long elapsed = notification.getQueued() != null ? notification.getQueued() - startTime : 0;
-            LOG.info("ApplicationQueueMessage: notification {} done queuing to {} devices in "+elapsed+" ms",notification.getUuid().toString(),deviceCount.get());
+            LOG.info("notification {} done queuing to {} devices in "+elapsed+" ms",notification.getUuid().toString(),deviceCount.get());
         }
-
-        if(messages.size()>0){
-            now = System.currentTimeMillis();
-            sendBatchToProviders(messages,null).toBlocking().lastOrDefault(null);
-            LOG.info("ApplicationQueueMessage: notification {} done sending to "+messages.size()+" devicess in {} ms", notification.getUuid(), System.currentTimeMillis() - now);
-        }
-
     }
 
     /**
@@ -300,20 +282,22 @@ public class ApplicationQueueManager implements QueueManager {
      * @throws Exception
      */
 
-    public Observable sendBatchToProviders( final List<ApplicationQueueMessage> messages, final String queuePath) {
+    public Observable sendBatchToProviders( final List<QueueMessage> messages, final String queuePath) {
         LOG.info("sending batch of {} notifications.", messages.size());
         final Meter sendMeter = metricsFactory.getMeter(NotificationsService.class, "send");
 
         final Map<Object, Notifier> notifierMap = getNotifierMap();
-        final QueueManager proxy = this;
+        final ApplicationQueueManager proxy = this;
         final ConcurrentHashMap<UUID,TaskManager> taskMap = new ConcurrentHashMap<UUID, TaskManager>(messages.size());
         final ConcurrentHashMap<UUID,Notification> notificationMap = new ConcurrentHashMap<UUID, Notification>(messages.size());
 
-        final Func1<ApplicationQueueMessage, ApplicationQueueMessage> func = new Func1<ApplicationQueueMessage, ApplicationQueueMessage>() {
+        final Func1<QueueMessage, ApplicationQueueMessage> func = new Func1<QueueMessage, ApplicationQueueMessage>() {
             @Override
-            public ApplicationQueueMessage call(ApplicationQueueMessage message) {
+            public ApplicationQueueMessage call(QueueMessage queueMessage) {
                 boolean messageCommitted = false;
+                ApplicationQueueMessage message = null;
                 try {
+                    message = (ApplicationQueueMessage) queueMessage.getBody();
                     LOG.info("start sending notification for device {} for Notification: {} on thread "+Thread.currentThread().getId(), message.getDeviceId(), message.getNotificationId());
 
                     UUID deviceUUID = message.getDeviceId();
@@ -325,7 +309,7 @@ public class ApplicationQueueManager implements QueueManager {
                     }
                     TaskManager taskManager = taskMap.get(message.getNotificationId());
                     if (taskManager == null) {
-                        taskManager = new TaskManager(em, qm, proxy, notification,queuePath);
+                        taskManager = new TaskManager(em, proxy, notification, qm);
                         taskMap.putIfAbsent(message.getNotificationId(), taskManager);
                         taskManager = taskMap.get(message.getNotificationId());
                     }
@@ -334,7 +318,7 @@ public class ApplicationQueueManager implements QueueManager {
                     final Map<String, Object> translatedPayloads = translatePayloads(payloads, notifierMap);
                     LOG.info("sending notification for device {} for Notification: {}", deviceUUID, notification.getUuid());
 
-                    taskManager.addMessage(deviceUUID,message);
+                    taskManager.addMessage(deviceUUID,queueMessage);
                     try {
                         String notifierName = message.getNotifierKey().toLowerCase();
                         Notifier notifier = notifierMap.get(notifierName.toLowerCase());
@@ -368,7 +352,7 @@ public class ApplicationQueueManager implements QueueManager {
                     LOG.error("Failure while sending",e);
                     try {
                         if(!messageCommitted && queuePath != null) {
-                            qm.commitTransaction(queuePath, message.getTransaction(), null);
+                            qm.commitMessage(queueMessage);
                         }
                     }catch (Exception queueException){
                         LOG.error("Failed to commit message.",queueException);
@@ -378,9 +362,9 @@ public class ApplicationQueueManager implements QueueManager {
             }
         };
         Observable o = rx.Observable.from(messages)
-                .parallel(new Func1<rx.Observable<ApplicationQueueMessage>, rx.Observable<ApplicationQueueMessage>>() {
+                .parallel(new Func1<rx.Observable<QueueMessage>, rx.Observable<ApplicationQueueMessage>>() {
                     @Override
-                    public rx.Observable<ApplicationQueueMessage> call(rx.Observable<ApplicationQueueMessage> messageObservable) {
+                    public rx.Observable<ApplicationQueueMessage> call(rx.Observable<QueueMessage> messageObservable) {
                         return messageObservable.map(func);
                     }
                 }, Schedulers.io())
@@ -446,14 +430,8 @@ public class ApplicationQueueManager implements QueueManager {
         return translatedPayloads;
     }
 
-    public static String[] getQueueNames(Properties properties) {
-        String[] names = properties.getProperty(ApplicationQueueManager.DEFAULT_QUEUE_PROPERTY,ApplicationQueueManager.DEFAULT_QUEUE_NAME).split(";");
-        return names;
-    }
-    public static String getRandomQueue(String[] queueNames) {
-        int size = queueNames.length;
-        Random random = new Random();
-        String name = queueNames[random.nextInt(size)];
+    public static String getQueueNames(Properties properties) {
+        String name = properties.getProperty(ApplicationQueueManager.DEFAULT_QUEUE_PROPERTY,ApplicationQueueManager.DEFAULT_QUEUE_NAME);
         return name;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueMessage.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueMessage.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueMessage.java
index 91f1312..fa75531 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueMessage.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueMessage.java
@@ -16,6 +16,7 @@
  */
 package org.apache.usergrid.services.notifications;
 
+import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import org.apache.usergrid.mq.Message;
@@ -28,23 +29,25 @@ import org.slf4j.LoggerFactory;
 /**
  * Created by ApigeeCorporation on 9/4/14.
  */
-public class ApplicationQueueMessage extends Message {
+public class ApplicationQueueMessage implements Serializable {
 
     private static final Logger log = LoggerFactory.getLogger(ApplicationQueueMessage.class);
-
-    static final String MESSAGE_PROPERTY_DEVICE_UUID = "deviceUUID";
-    static final String MESSAGE_PROPERTY_APPLICATION_UUID = "applicationUUID";
-    static final String MESSAGE_PROPERTY_NOTIFIER_ID = "notifierId";
-    static final String MESSAGE_PROPERTY_NOTIFICATION_ID = "notificationId";
-    static final String MESSAGE_PROPERTY_NOTIFIER_NAME = "notifierName";
+    private UUID applicationId;
+    private UUID notificationId;
+    private UUID deviceId;
+    private String notifierKey;
+    private String notifierId;
 
 
     public ApplicationQueueMessage() {
     }
 
     public ApplicationQueueMessage(UUID applicationId, UUID notificationId, UUID deviceId, String notifierKey, String notifierId) {
-        setApplicationId(applicationId);
-        setDeviceId(deviceId);
+        this.applicationId = applicationId;
+        this.notificationId = notificationId;
+        this.deviceId = deviceId;
+        this.notifierKey = notifierKey;
+        this.notifierId = notifierId;
         setNotificationId(notificationId);
         setNotifierKey(notifierKey);
         setNotifierId(notifierId);
@@ -58,71 +61,45 @@ public class ApplicationQueueMessage extends Message {
         return new UUID( msb, lsb );
     }
 
-    public static ApplicationQueueMessage generate(Message message) {
-
-        // this crazyness may indicate that Core Persistence is not storing UUIDs correctly
-
-        byte[] mpaBytes = (byte[])message.getObjectProperty(MESSAGE_PROPERTY_APPLICATION_UUID);
-        UUID mpaUuid = bytesToUuid(mpaBytes);
-
-        byte[] mpnBytes = (byte[])message.getObjectProperty(MESSAGE_PROPERTY_NOTIFICATION_ID);
-        UUID mpnUuid = bytesToUuid(mpnBytes);
-
-        final UUID mpdUuid;
-        Object o = message.getObjectProperty(MESSAGE_PROPERTY_DEVICE_UUID);
-        if ( o instanceof UUID ) {
-            mpdUuid = (UUID)message.getObjectProperty(MESSAGE_PROPERTY_DEVICE_UUID);
-        } else {
-            byte[] mpdBytes = (byte[])o;
-            mpdUuid =  bytesToUuid(mpdBytes);
-        }
-
-        // end of crazyness
-
-        return new ApplicationQueueMessage(
-                mpaUuid, mpnUuid, mpdUuid,
-                message.getStringProperty(MESSAGE_PROPERTY_NOTIFIER_NAME), 
-                message.getStringProperty(MESSAGE_PROPERTY_NOTIFIER_ID));
-    }
 
     public UUID getApplicationId() {
-        return (UUID) this.getObjectProperty(MESSAGE_PROPERTY_APPLICATION_UUID);
+        return applicationId;
     }
 
     public void setApplicationId(UUID applicationId) {
-        this.setProperty(MESSAGE_PROPERTY_APPLICATION_UUID, applicationId);
+       this.applicationId = applicationId;
     }
 
     public UUID getDeviceId() {
-        return (UUID) this.getObjectProperty(MESSAGE_PROPERTY_DEVICE_UUID);
+        return deviceId;
     }
 
     public void setDeviceId(UUID deviceId) {
-        this.setProperty(MESSAGE_PROPERTY_DEVICE_UUID, deviceId);
+        this.deviceId = deviceId;
     }
 
     public UUID getNotificationId() {
-        return (UUID) this.getObjectProperty(MESSAGE_PROPERTY_NOTIFICATION_ID);
+        return notificationId;
     }
 
     public void setNotificationId(UUID notificationId) {
-        this.setProperty(MESSAGE_PROPERTY_NOTIFICATION_ID, notificationId);
+       this.notificationId = notificationId;
     }
 
     public String getNotifierId() {
-        return this.getStringProperty(MESSAGE_PROPERTY_NOTIFIER_ID);
+        return notifierId;
     }
 
     public void setNotifierId(String notifierId) {
-        this.setProperty(MESSAGE_PROPERTY_NOTIFIER_ID, notifierId);
+         this.notifierId = notifierId;
     }
 
     public String getNotifierKey() {
-        return this.getStringProperty(MESSAGE_PROPERTY_NOTIFIER_NAME);
+        return notifierKey;
     }
 
     public void setNotifierKey(String name) {
-        this.setProperty(MESSAGE_PROPERTY_NOTIFIER_NAME, name);
+        notifierKey = name;
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
index 4e5692e..c5cd3c4 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
@@ -20,6 +20,7 @@ import java.util.*;
 
 import com.codahale.metrics.*;
 import com.codahale.metrics.Timer;
+import org.apache.usergrid.corepersistence.CpSetup;
 import org.apache.usergrid.metrics.MetricsFactory;
 import org.apache.usergrid.mq.Message;
 import org.apache.usergrid.persistence.*;
@@ -28,6 +29,11 @@ import org.apache.usergrid.persistence.entities.Notifier;
 import org.apache.usergrid.persistence.entities.Receipt;
 import org.apache.usergrid.persistence.index.query.Identifier;
 import org.apache.usergrid.persistence.index.query.Query;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+import org.apache.usergrid.persistence.queue.QueueScope;
+import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
 import org.apache.usergrid.services.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -80,10 +86,9 @@ public class NotificationsService extends AbstractCollectionService {
 
     private ApplicationQueueManager notificationQueueManager;
     private long gracePeriod;
-    @Autowired
     private ServiceManagerFactory smf;
-    @Autowired
     private EntityManagerFactory emf;
+    private QueueManagerFactory queueManagerFactory;
 
     public NotificationsService() {
         LOG.info("/notifications");
@@ -99,7 +104,11 @@ public class NotificationsService extends AbstractCollectionService {
         postMeter = metricsService.getMeter(NotificationsService.class, "requests");
         postTimer = metricsService.getTimer(this.getClass(), "execution_rest");
         JobScheduler jobScheduler = new JobScheduler(sm,em);
-        notificationQueueManager = new ApplicationQueueManager(jobScheduler,em,smf.getServiceManager(smf.getManagementAppId()).getQueueManager(),metricsService,props);
+        String name = ApplicationQueueManager.getQueueNames(props);
+        QueueScope queueScope = new QueueScopeImpl(new SimpleId(smf.getManagementAppId(),"notifications"),name);
+        queueManagerFactory = CpSetup.getInjector().getInstance(QueueManagerFactory.class);
+        QueueManager queueManager = queueManagerFactory.getQueueManager(queueScope);
+        notificationQueueManager = new ApplicationQueueManager(jobScheduler,em,queueManager,metricsService,props);
         gracePeriod = jobScheduler.SCHEDULER_GRACE_PERIOD;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 7d76ee9..a381c70 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -16,11 +16,17 @@
  */
 package org.apache.usergrid.services.notifications;
 
+import org.apache.usergrid.corepersistence.CpSetup;
 import org.apache.usergrid.metrics.MetricsFactory;
-import org.apache.usergrid.mq.*;
+
+import org.apache.usergrid.mq.QueueResults;
 import org.apache.usergrid.persistence.EntityManager;
 import org.apache.usergrid.persistence.EntityManagerFactory;
 
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.queue.*;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
 import org.apache.usergrid.services.ServiceManager;
 import org.apache.usergrid.services.ServiceManagerFactory;
 import org.slf4j.Logger;
@@ -35,7 +41,8 @@ import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class QueueListener  {
-    public  final long MESSAGE_TRANSACTION_TIMEOUT =  1 * 60 * 1000;
+    public  final int MESSAGE_TRANSACTION_TIMEOUT =  25 * 1000;
+    private final QueueManagerFactory queueManagerFactory;
 
     public   long DEFAULT_SLEEP = 5000;
 
@@ -50,7 +57,6 @@ public class QueueListener  {
 
     private Properties properties;
 
-    private org.apache.usergrid.mq.QueueManager queueManager;
 
     private ServiceManager svcMgr;
 
@@ -61,13 +67,12 @@ public class QueueListener  {
     private ExecutorService pool;
     private List<Future> futures;
 
-    public  final String MAX_THREADS = "2";
-    private Integer batchSize = 100;
-    private String[] queueNames;
-
-
+    public  final int MAX_THREADS = 2;
+    private Integer batchSize = 10;
+    private String queueName;
 
     public QueueListener(ServiceManagerFactory smf, EntityManagerFactory emf, MetricsFactory metricsService, Properties props){
+        this.queueManagerFactory = CpSetup.getInjector().getInstance(QueueManagerFactory.class);
         this.smf = smf;
         this.emf = emf;
         this.metricsService = metricsService;
@@ -76,7 +81,7 @@ public class QueueListener  {
 
     @PostConstruct
     public void start(){
-        boolean shouldRun = new Boolean(properties.getProperty("usergrid.notifications.listener.run", "false"));
+        boolean shouldRun = new Boolean(properties.getProperty("usergrid.notifications.listener.run", "true"));
 
         if(shouldRun) {
             LOG.info("QueueListener: starting.");
@@ -86,9 +91,9 @@ public class QueueListener  {
                 sleepBetweenRuns = new Long(properties.getProperty("usergrid.notifications.listener.sleep.between", "0")).longValue();
                 sleepWhenNoneFound = new Long(properties.getProperty("usergrid.notifications.listener.sleep.after", ""+DEFAULT_SLEEP)).longValue();
                 batchSize = new Integer(properties.getProperty("usergrid.notifications.listener.batchSize", (""+batchSize)));
-                queueNames = ApplicationQueueManager.getQueueNames(properties);
+                queueName = ApplicationQueueManager.getQueueNames(properties);
 
-                int maxThreads = new Integer(properties.getProperty("usergrid.notifications.listener.maxThreads", MAX_THREADS));
+                int maxThreads = new Integer(properties.getProperty("usergrid.notifications.listener.maxThreads", ""+MAX_THREADS));
                 futures = new ArrayList<Future>(maxThreads);
 
                 //create our thread pool based on our threadcount.
@@ -120,7 +125,7 @@ public class QueueListener  {
     }
 
     private void execute(){
-        Thread.currentThread().setDaemon(true);
+//        Thread.currentThread().setDaemon(true);
         Thread.currentThread().setName("Notifications_Processor"+UUID.randomUUID());
 
         final AtomicInteger consecutiveExceptions = new AtomicInteger();
@@ -130,31 +135,30 @@ public class QueueListener  {
         while ( true ) {
             try {
                 svcMgr = smf.getServiceManager(smf.getManagementAppId());
-                queueManager = svcMgr.getQueueManager();
-                String queueName = ApplicationQueueManager.getRandomQueue(queueNames);
                 LOG.info("getting from queue {} ", queueName);
-                QueueResults results = getDeliveryBatch(queueManager,queueName);
-                LOG.info("QueueListener: retrieved batch of {} messages from queue {} ", results.size(),queueName);
+                QueueScope queueScope = new QueueScopeImpl(new SimpleId(smf.getManagementAppId(),"notifications"),queueName);
+                QueueManager queueManager = queueManagerFactory.getQueueManager(queueScope);
+                List<QueueMessage> messages = queueManager.getMessages(getBatchSize(),MESSAGE_TRANSACTION_TIMEOUT,5000);
+                LOG.info("retrieved batch of {} messages from queue {} ", messages.size(),queueName);
 
-                List<Message> messages = results.getMessages();
                 if (messages.size() > 0) {
-                    HashMap<UUID, List<ApplicationQueueMessage>> messageMap = new HashMap<>(messages.size());
+                    HashMap<UUID, List<QueueMessage>> messageMap = new HashMap<>(messages.size());
                     //group messages into hash map by app id
-                    for (Message message : messages) {
-                        ApplicationQueueMessage queueMessage = ApplicationQueueMessage.generate(message);
+                    for (QueueMessage message : messages) {
+                        ApplicationQueueMessage queueMessage = (ApplicationQueueMessage) message.getBody();
                         UUID applicationId = queueMessage.getApplicationId();
                         if (!messageMap.containsKey(applicationId)) {
-                            List<ApplicationQueueMessage> applicationQueueMessages = new ArrayList<ApplicationQueueMessage>();
-                            applicationQueueMessages.add(queueMessage);
+                            List<QueueMessage> applicationQueueMessages = new ArrayList<QueueMessage>();
+                            applicationQueueMessages.add(message);
                             messageMap.put(applicationId, applicationQueueMessages);
                         } else {
-                            messageMap.get(applicationId).add(queueMessage);
+                            messageMap.get(applicationId).add(message);
                         }
                     }
                     long now = System.currentTimeMillis();
                     Observable merge = null;
                     //send each set of app ids together
-                    for (Map.Entry<UUID, List<ApplicationQueueMessage>> entry : messageMap.entrySet()) {
+                    for (Map.Entry<UUID, List<QueueMessage>> entry : messageMap.entrySet()) {
                         UUID applicationId = entry.getKey();
                         EntityManager entityManager = emf.getEntityManager(applicationId);
                         ServiceManager serviceManager = smf.getServiceManager(applicationId);
@@ -166,7 +170,7 @@ public class QueueListener  {
                                 properties
                         );
 
-                        LOG.info("QueueListener: send batch for app {} of {} messages", entry.getKey(), entry.getValue().size());
+                        LOG.info("send batch for app {} of {} messages", entry.getKey(), entry.getValue().size());
                         Observable current = manager.sendBatchToProviders(entry.getValue(),queueName);
                         if(merge == null)
                             merge = current;
@@ -176,16 +180,16 @@ public class QueueListener  {
                     }
                     if(merge!=null) {
                         merge.toBlocking().lastOrDefault(null);
+                        LOG.info("sent batch {} messages duration {} ms", messages.size(),System.currentTimeMillis() - now);
                     }
-                    LOG.info("QueueListener: sent batch {} messages duration {} ms", messages.size(),System.currentTimeMillis() - now);
 
                     if(sleepBetweenRuns > 0) {
-                        LOG.info("QueueListener: sleep between rounds...sleep...{}", sleepBetweenRuns);
+                        LOG.info("sleep between rounds...sleep...{}", sleepBetweenRuns);
                         Thread.sleep(sleepBetweenRuns);
                     }
                 }
                 else{
-                    LOG.info("QueueListener: no messages...sleep...{}", sleepWhenNoneFound);
+                    LOG.info("no messages...sleep...{}", sleepWhenNoneFound);
                     Thread.sleep(sleepWhenNoneFound);
                 }
                 //send to the providers
@@ -206,7 +210,7 @@ public class QueueListener  {
     }
 
     public void stop(){
-        LOG.info("QueueListener: stop processes");
+        LOG.info("stop processes");
 
         if(futures == null){
             return;
@@ -218,13 +222,6 @@ public class QueueListener  {
         pool.shutdownNow();
     }
 
-    private  QueueResults getDeliveryBatch(org.apache.usergrid.mq.QueueManager queueManager,String queuePath) throws Exception {
-        QueueQuery qq = new QueueQuery();
-        qq.setLimit(this.getBatchSize());
-        qq.setTimeout(MESSAGE_TRANSACTION_TIMEOUT);
-        QueueResults results = queueManager.getFromQueue(queuePath, qq);
-        return results;
-    }
 
     public void setBatchSize(int batchSize){
         this.batchSize = batchSize;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java
deleted file mode 100644
index 0024417..0000000
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.usergrid.services.notifications;
-
-import org.apache.usergrid.persistence.entities.Notifier;
-
-import java.util.HashMap;
-import java.util.Set;
-
-/**
- * Created by ApigeeCorporation on 9/4/14.
- */
-public interface QueueManager {
-
-    public void asyncCheckForInactiveDevices(Set<Notifier> notifiers) throws Exception ;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53810396/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
index 08f067d..6b5441e 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
@@ -23,6 +23,8 @@ import org.apache.usergrid.persistence.entities.Device;
 import org.apache.usergrid.persistence.entities.Notification;
 import org.apache.usergrid.persistence.entities.Notifier;
 import org.apache.usergrid.persistence.entities.Receipt;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.QueueMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,28 +35,26 @@ import java.util.concurrent.atomic.AtomicLong;
 public class TaskManager {
 
     private static final Logger LOG = LoggerFactory.getLogger(TaskManager.class);
-    private final QueueManager proxy;
-    private final String queuePath;
+    private final ApplicationQueueManager proxy;
+    private final QueueManager queueManager;
 
     private Notification notification;
     private AtomicLong successes = new AtomicLong();
     private AtomicLong failures = new AtomicLong();
-    private org.apache.usergrid.mq.QueueManager qm;
     private EntityManager em;
-    private ConcurrentHashMap<UUID, ApplicationQueueMessage> messageMap;
+    private ConcurrentHashMap<UUID, QueueMessage> messageMap;
     private boolean hasFinished;
 
-    public TaskManager(EntityManager em, org.apache.usergrid.mq.QueueManager qm, QueueManager proxy, Notification notification, String queuePath) {
+    public TaskManager(EntityManager em,ApplicationQueueManager proxy, Notification notification, QueueManager queueManager) {
         this.em = em;
-        this.qm = qm;
         this.notification = notification;
         this.proxy = proxy;
-        this.messageMap = new ConcurrentHashMap<UUID, ApplicationQueueMessage>();
+        this.messageMap = new ConcurrentHashMap<UUID, QueueMessage>();
         hasFinished = false;
-        this.queuePath = queuePath;
+        this.queueManager = queueManager;
     }
 
-    public void addMessage(UUID deviceId, ApplicationQueueMessage message) {
+    public void addMessage(UUID deviceId, QueueMessage message) {
         messageMap.put(deviceId, message);
     }
 
@@ -62,8 +62,8 @@ public class TaskManager {
         LOG.debug("REMOVED {}", deviceUUID);
         try {
             LOG.debug("notification {} removing device {} from remaining", notification.getUuid(), deviceUUID);
-            if(queuePath!=null){
-                qm.commitTransaction(queuePath, messageMap.get(deviceUUID).getTransaction(), null);
+            if(queueManager!=null){
+                queueManager.commitMessage(messageMap.get(deviceUUID));
             }
 
             EntityRef deviceRef = new SimpleEntityRef(Device.ENTITY_TYPE, deviceUUID);
@@ -147,34 +147,36 @@ public class TaskManager {
         }
     }
 
-    public void finishedBatch() throws Exception {
-        long successes = this.successes.getAndSet(0); //reset counters
-        long failures = this.failures.getAndSet(0); //reset counters
-        this.hasFinished = true;
-
-        // refresh notification
-        Notification notification = em.get(this.notification.getUuid(), Notification.class);
-        notification.setModified(System.currentTimeMillis());
-
-        long sent = successes, errors = failures;
-        //and write them out again, this will produce the most accurate count
-        Map<String, Long> stats = new HashMap<>(2);
-        stats.put("sent", sent);
-        stats.put("errors", errors);
-        notification.updateStatistics(successes, errors);
-
-        //none of this is known and should you ever do this
-        if (notification.getExpectedCount() <= (notification.getStatistics().get("sent") + notification.getStatistics().get("errors"))) {
-            Map<String, Object> properties = new HashMap<>();
-            notification.setFinished(notification.getModified());
-            properties.put("finished", notification.getModified());
-            properties.put("state", notification.getState());
-            LOG.info("done sending to devices in {} ms", notification.getFinished() - notification.getStarted());
-            notification.addProperties(properties);
-        }
-        LOG.info("notification finished batch: {} of {} devices", notification.getUuid(),sent+errors);
-        em.update(notification);
+    public  void finishedBatch() throws Exception {
+        synchronized (this) {
+            long successes = this.successes.getAndSet(0); //reset counters
+            long failures = this.failures.getAndSet(0); //reset counters
+            this.hasFinished = true;
+
+            // refresh notification
+            Notification notification = em.get(this.notification.getUuid(), Notification.class);
+            notification.setModified(System.currentTimeMillis());
+
+            //and write them out again, this will produce the most accurate count
+            Map<String, Long> stats = new HashMap<>(2);
+            stats.put("sent", successes);
+            stats.put("errors", failures);
+            notification.updateStatistics(successes, failures);
+
+            //none of this is known and should you ever do this
+            if (notification.getExpectedCount() <= (notification.getStatistics().get("sent") + notification.getStatistics().get("errors"))) {
+                Map<String, Object> properties = new HashMap<>();
+                notification.setFinished(notification.getModified());
+                properties.put("finished", notification.getModified());
+                properties.put("state", notification.getState());
+                LOG.info("done sending to devices in {} ms", notification.getFinished() - notification.getStarted());
+                notification.addProperties(properties);
+            }
+
+            LOG.info("notification finished batch: {} of {} devices", notification.getUuid(), successes + failures);
+            em.update(notification);
 //        Set<Notifier> notifiers = new HashSet<>(proxy.getNotifierMap().values()); // remove dups
 //        proxy.asyncCheckForInactiveDevices(notifiers);
+        }
     }
 }
\ No newline at end of file


[8/9] First pass at updating interfaces

Posted by to...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/FieldSerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/FieldSerializer.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/FieldSerializer.java
new file mode 100644
index 0000000..292f550
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/FieldSerializer.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
+import org.apache.usergrid.persistence.model.field.DoubleField;
+import org.apache.usergrid.persistence.model.field.Field;
+import org.apache.usergrid.persistence.model.field.IntegerField;
+import org.apache.usergrid.persistence.model.field.LongField;
+import org.apache.usergrid.persistence.model.field.StringField;
+import org.apache.usergrid.persistence.model.field.UUIDField;
+
+import com.netflix.astyanax.model.CompositeBuilder;
+import com.netflix.astyanax.model.CompositeParser;
+
+// TODO: replace with "real" serializer
+
+/**
+ * Serialize Field for use as part of row-key in Unique Values Column Family.
+ */
+public class FieldSerializer implements CompositeFieldSerializer<Field> {
+
+    private static final Logger LOG = LoggerFactory.getLogger( FieldSerializer.class );
+
+    public enum FieldType {
+        BOOLEAN_FIELD,
+        DOUBLE_FIELD,
+        INTEGER_FIELD,
+        LONG_FIELD,
+        STRING_FIELD,
+        UUID_FIELD
+    };
+
+    private static final FieldSerializer INSTANCE = new FieldSerializer();
+
+    @Override
+    public void toComposite( final CompositeBuilder builder, final Field field ) {
+
+        builder.addString( field.getName() );
+
+        // TODO: use the real field value serializer(s) here? Store hash instead?
+        builder.addString( field.getValue().toString() );
+         
+        String simpleName = field.getClass().getSimpleName();
+        int nameIndex = simpleName.lastIndexOf(".");
+        String fieldType = simpleName.substring(nameIndex + 1, simpleName.length() - "Field".length());
+        fieldType = StringUtils.upperCase( fieldType + "_FIELD" );
+
+        builder.addString( fieldType );
+    }
+
+    @Override
+    public Field fromComposite( final CompositeParser composite ) {
+
+        final String name = composite.readString();
+        final String value = composite.readString();
+        final String typeString = composite.readString();
+
+        final FieldType fieldType = FieldType.valueOf( typeString );
+
+        switch (fieldType) {
+            case DOUBLE_FIELD: 
+                return new DoubleField(name, Double.parseDouble(value));
+            case INTEGER_FIELD: 
+                return new IntegerField(name, Integer.parseInt(value));
+            case LONG_FIELD: 
+                return new LongField(name, Long.parseLong(value));
+            case STRING_FIELD: 
+                return new StringField(name, value);
+            case UUID_FIELD: 
+                return new UUIDField(name, UUID.fromString(value));
+            default:
+                throw new RuntimeException("Unknown unique field type");
+        }
+    }
+
+
+    /**
+     * Get the singleton serializer
+     */
+    public static FieldSerializer get() {
+        return INSTANCE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
index 4d930f2..89012aa 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
@@ -20,8 +20,7 @@ package org.apache.usergrid.persistence.collection.serialization.impl;
 
 import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
-import org.apache.usergrid.persistence.collection.mvcc.stage.write.UniqueValueSerializationStrategy;
-import org.apache.usergrid.persistence.collection.mvcc.stage.write.UniqueValueSerializationStrategyImpl;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.core.migration.Migration;
 
 import com.google.inject.AbstractModule;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueImpl.java
new file mode 100644
index 0000000..2fdae1a
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueImpl.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.field.Field;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Represents a Unique Value of a field within a collection.
+ */
+public class UniqueValueImpl implements UniqueValue {
+    private final CollectionScope collectionScope;
+    private final Field field;
+    private final Id entityId;
+    private final UUID entityVersion;
+
+    public UniqueValueImpl(
+            final CollectionScope scope, final Field field, Id entityId, final UUID version ) {
+
+        Preconditions.checkNotNull( scope, "scope is required" );
+        Preconditions.checkNotNull( field, "field is required" );
+//        Preconditions.checkNotNull( version, "version is required" );
+        Preconditions.checkNotNull( entityId, "entityId is required" );
+
+        this.collectionScope = scope;
+        this.field = field;
+        this.entityVersion = version;
+        this.entityId = entityId;
+    }
+
+    @Override
+    public CollectionScope getCollectionScope() {
+        return collectionScope;
+    }
+
+    @Override
+    public Field getField() {
+        return field;
+    }
+
+    @Override
+    public UUID getEntityVersion() {
+        return entityVersion;
+    }
+
+    @Override
+    public Id getEntityId() {
+        return entityId;
+    }
+
+    
+    @Override
+    public boolean equals( final Object o ) {
+        if ( this == o ) {
+            return true;
+        }
+        if ( o == null || getClass() != o.getClass() ) {
+            return false;
+        }
+
+        final UniqueValueImpl that = ( UniqueValueImpl ) o;
+
+        if ( !getCollectionScope().equals( that.getCollectionScope() ) ) {
+            return false;
+        }
+
+        if ( !getField().equals( that.getField()) ) {
+            return false;
+        }
+
+        if ( !getEntityVersion().equals( that.getEntityVersion() ) ) {
+            return false;
+        }
+
+        if ( !getEntityId().equals( that.getEntityId() ) ) {
+            return false;
+        }
+
+        return true;
+    }
+
+
+    @Override
+    public int hashCode() {
+        int result = 31 * getCollectionScope().hashCode();
+        result = 31 * result + getField().hashCode();
+        result = 31 * result + getEntityVersion().hashCode();
+        result = 31 * result + getEntityId().hashCode();
+        return result;
+    }
+
+
+    @Override
+    public String toString() {
+        return "UniqueValueImpl{" +
+                ", collectionScope =" + collectionScope.getName() +
+                ", field =" + field +
+                ", entityVersion=" + entityVersion +
+                ", entityId =" + entityId +
+                '}';
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
new file mode 100644
index 0000000..d480691
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.marshal.BytesType;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
+import org.apache.usergrid.persistence.core.astyanax.ColumnTypes;
+import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
+import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
+import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.migration.Migration;
+import org.apache.usergrid.persistence.model.field.Field;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.netflix.astyanax.ColumnListMutation;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import com.netflix.astyanax.model.ColumnList;
+import com.netflix.astyanax.model.Row;
+import com.netflix.astyanax.util.RangeBuilder;
+
+
+/**
+ * Reads and writes to UniqueValues column family.
+ */
+public class UniqueValueSerializationStrategyImpl implements UniqueValueSerializationStrategy, Migration {
+
+    private static final Logger log = LoggerFactory.getLogger( UniqueValueSerializationStrategyImpl.class );
+
+    // TODO: use "real" field serializer here instead once it is ready
+    private static final CollectionScopedRowKeySerializer<Field> ROW_KEY_SER =
+            new CollectionScopedRowKeySerializer<Field>( FieldSerializer.get() );
+
+    private static final EntityVersionSerializer ENTITY_VERSION_SER = new EntityVersionSerializer();
+
+    private static final MultiTennantColumnFamily<CollectionScope, Field, EntityVersion> CF_UNIQUE_VALUES =
+            new MultiTennantColumnFamily<CollectionScope, Field, EntityVersion>( "Unique_Values", ROW_KEY_SER,
+                    ENTITY_VERSION_SER );
+
+    protected final Keyspace keyspace;
+
+
+    /**
+     * Construct serialization strategy for keyspace.
+     *
+     * @param keyspace Keyspace in which to store Unique Values.
+     */
+    @Inject
+    public UniqueValueSerializationStrategyImpl( final Keyspace keyspace ) {
+        this.keyspace = keyspace;
+    }
+
+
+    @Override
+    public java.util.Collection getColumnFamilies() {
+
+        MultiTennantColumnFamilyDefinition cf =
+                new MultiTennantColumnFamilyDefinition( CF_UNIQUE_VALUES, BytesType.class.getSimpleName(),
+                        ColumnTypes.DYNAMIC_COMPOSITE_TYPE, BytesType.class.getSimpleName(),
+                        MultiTennantColumnFamilyDefinition.CacheOption.KEYS );
+
+        return Collections.singleton( cf );
+    }
+
+
+    public MutationBatch write( UniqueValue uniqueValue ) {
+        return write( uniqueValue, Integer.MAX_VALUE );
+    }
+
+
+    @Override
+    public MutationBatch write( UniqueValue value, Integer timeToLive ) {
+
+        Preconditions.checkNotNull( value, "value is required" );
+        Preconditions.checkNotNull( timeToLive, "timeToLive is required" );
+
+        log.debug( "Writing unique value scope={} id={} version={} name={} value={} ttl={} ", new Object[] {
+                value.getCollectionScope().getName(), value.getEntityId(), value.getEntityVersion(),
+                value.getField().getName(), value.getField().getValue(), timeToLive
+        } );
+
+        final EntityVersion ev = new EntityVersion( value.getEntityId(), value.getEntityVersion() );
+
+        final Integer ttl;
+        if ( timeToLive.equals( Integer.MAX_VALUE ) ) {
+            ttl = null;
+        }
+        else {
+            ttl = timeToLive;
+        }
+
+        return doWrite( value.getCollectionScope(), value.getField(), new UniqueValueSerializationStrategyImpl.RowOp() {
+
+            @Override
+            public void doOp( final ColumnListMutation<EntityVersion> colMutation ) {
+                colMutation.putColumn( ev, 0x0, ttl );
+            }
+        } );
+    }
+
+
+    @Override
+    public MutationBatch delete( UniqueValue value ) {
+
+        Preconditions.checkNotNull( value, "value is required" );
+
+        final EntityVersion ev = new EntityVersion( value.getEntityId(), value.getEntityVersion() );
+
+        return doWrite( value.getCollectionScope(), value.getField(), new UniqueValueSerializationStrategyImpl.RowOp() {
+
+            @Override
+            public void doOp( final ColumnListMutation<EntityVersion> colMutation ) {
+                colMutation.deleteColumn( ev );
+            }
+        } );
+    }
+
+
+    /**
+     * Do the column update or delete for the given column and row key
+     *
+     * @param context We need to use this when getting the keyspace
+     */
+    private MutationBatch doWrite( CollectionScope context, Field field, RowOp op ) {
+        final MutationBatch batch = keyspace.prepareMutationBatch();
+        op.doOp( batch.withRow( CF_UNIQUE_VALUES, ScopedRowKey.fromKey( context, field ) ) );
+        return batch;
+    }
+
+
+    @Override
+    public UniqueValueSet load( final CollectionScope colScope, final Collection<Field> fields )
+            throws ConnectionException {
+
+        Preconditions.checkNotNull( fields, "fields are required" );
+        Preconditions.checkArgument( fields.size() > 0, "More than 1 field msut be specified" );
+
+        final List<ScopedRowKey<CollectionScope, Field>> keys = new ArrayList<>( fields.size() );
+
+        for ( Field field : fields ) {
+            final ScopedRowKey<CollectionScope, Field> rowKey = ScopedRowKey.fromKey( colScope, field );
+
+            keys.add( rowKey );
+        }
+
+        final UniqueValueSetImpl uniqueValueSet = new UniqueValueSetImpl( fields.size() );
+
+        Iterator<Row<ScopedRowKey<CollectionScope, Field>, EntityVersion>> results =
+                keyspace.prepareQuery( CF_UNIQUE_VALUES ).getKeySlice( keys )
+                        .withColumnRange( new RangeBuilder().setLimit( 1 ).build() ).execute().getResult().iterator();
+
+
+        while ( results.hasNext() )
+
+        {
+
+            final Row<ScopedRowKey<CollectionScope, Field>, EntityVersion> unique = results.next();
+
+
+            final Field field = unique.getKey().getKey();
+
+            final ColumnList<EntityVersion> columnList = unique.getColumns();
+
+            //sanity check, nothing to do, skip it
+            if ( columnList.size() < 1 ) {
+                continue;
+            }
+
+            final EntityVersion entityVersion = columnList.getColumnByIndex( 0 ).getName();
+
+
+            final UniqueValueImpl uniqueValue = new UniqueValueImpl( colScope, field, entityVersion.getEntityId(),
+                    entityVersion.getEntityVersion() );
+
+            uniqueValueSet.addValue( uniqueValue );
+        }
+
+        return uniqueValueSet;
+    }
+
+
+    /**
+     * Simple callback to perform puts and deletes with a common row setup code
+     */
+    private static interface RowOp {
+        void doOp( ColumnListMutation<EntityVersion> colMutation );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSetImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSetImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSetImpl.java
new file mode 100644
index 0000000..8dd9528
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSetImpl.java
@@ -0,0 +1,85 @@
+package org.apache.usergrid.persistence.collection.serialization.impl;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
+
+
+public class UniqueValueSetImpl implements UniqueValueSet {
+
+    private final Map<String, UniqueValue> values;
+
+    public UniqueValueSetImpl(final int expectedMaxSize) {
+        values = new HashMap<>(expectedMaxSize);
+    }
+
+
+    public void addValue(UniqueValue value){
+        values.put( value.getField().getName(), value );
+    }
+
+    @Override
+    public UniqueValue getValue( final String fieldName ) {
+        return values.get( fieldName );
+    }
+
+
+    @Override
+    public Iterator<UniqueValue> iterator() {
+        return new UniqueValueIterator(values.entrySet());
+    }
+
+
+    /**
+     * Inner class of unique value iterator
+     */
+    private static final class
+            UniqueValueIterator implements Iterator<UniqueValue>{
+
+        private final Iterator<Map.Entry<String, UniqueValue>> sourceIterator;
+
+        public UniqueValueIterator( final Set<Map.Entry<String, UniqueValue>> entries ) {
+            this.sourceIterator = entries.iterator();
+        }
+
+
+        @Override
+        public boolean hasNext() {
+            return sourceIterator.hasNext();
+        }
+
+
+        @Override
+        public UniqueValue next() {
+            return sourceIterator.next().getValue();
+        }
+
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException( "Remove is unsupported" );
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
index c90d079..c2876b0 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
@@ -1,6 +1,8 @@
 package org.apache.usergrid.persistence.collection.mvcc.stage.delete;
 
 
+import java.security.Key;
+
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
@@ -13,11 +15,12 @@ import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
 import org.apache.usergrid.persistence.collection.mvcc.stage.AbstractMvccEntityStageTest;
 import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
 import org.apache.usergrid.persistence.collection.mvcc.stage.TestEntityGenerator;
-import org.apache.usergrid.persistence.collection.mvcc.stage.write.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteCommit;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.model.entity.Entity;
 
+import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.MutationBatch;
 
 import static org.junit.Assert.assertEquals;
@@ -111,13 +114,16 @@ public class MarkCommitTest extends AbstractMvccEntityStageTest {
         final MutationBatch entityMutation = mock( MutationBatch.class );
         final SerializationFig serializationFig = mock(SerializationFig.class);
         final UniqueValueSerializationStrategy uniqueValueSerializationStrategy = mock(UniqueValueSerializationStrategy.class);
+        final Keyspace keyspace = mock( Keyspace.class );
+
+        when(keyspace.prepareMutationBatch()).thenReturn( entityMutation );
 
         when( logStrategy.write( any( CollectionScope.class ), any( MvccLogEntry.class ) ) ).thenReturn( logMutation );
         when( mvccEntityStrategy.write( any( CollectionScope.class ), any( MvccEntity.class ) ) )
                 .thenReturn( entityMutation );
 
 
-        new MarkCommit( logStrategy, mvccEntityStrategy, uniqueValueSerializationStrategy, serializationFig ).call( event );
+        new MarkCommit( logStrategy, mvccEntityStrategy, uniqueValueSerializationStrategy, serializationFig, keyspace ).call( event );
 
         //TODO: This doesn't assert anything, this needs fixed (should be a fail technically)
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/EntityVersionSerializerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/EntityVersionSerializerTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/EntityVersionSerializerTest.java
index 42e481e..889cba9 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/EntityVersionSerializerTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/EntityVersionSerializerTest.java
@@ -23,6 +23,8 @@ import java.nio.ByteBuffer;
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.apache.usergrid.persistence.collection.serialization.impl.EntityVersion;
+import org.apache.usergrid.persistence.collection.serialization.impl.EntityVersionSerializer;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/FieldSerializerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/FieldSerializerTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/FieldSerializerTest.java
index e3ec59d..dcff324 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/FieldSerializerTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/FieldSerializerTest.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.apache.usergrid.persistence.collection.serialization.impl.FieldSerializer;
 import org.apache.usergrid.persistence.model.field.Field;
 import org.apache.usergrid.persistence.model.field.IntegerField;
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueSerializationStrategyImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueSerializationStrategyImplTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueSerializationStrategyImplTest.java
index c323883..f03baba 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueSerializationStrategyImplTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueSerializationStrategyImplTest.java
@@ -18,11 +18,11 @@
 package org.apache.usergrid.persistence.collection.mvcc.stage.write;
 
 
+import java.util.Collections;
 import java.util.UUID;
 
 import org.jukito.UseModules;
 import org.junit.Assert;
-import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -33,16 +33,21 @@ import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
 import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
 import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
-import org.apache.usergrid.persistence.core.cassandra.CassandraRule;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
+import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
 import org.apache.usergrid.persistence.core.cassandra.ITRunner;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.field.Field;
 import org.apache.usergrid.persistence.model.field.IntegerField;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
 import com.google.inject.Inject;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
+
 @RunWith( ITRunner.class )
 @UseModules( TestCollectionModule.class )
 public class UniqueValueSerializationStrategyImplTest {
@@ -51,7 +56,7 @@ public class UniqueValueSerializationStrategyImplTest {
     @Inject
     @Rule
     public MigrationManagerRule migrationManagerRule;
-    
+
     @Inject
     UniqueValueSerializationStrategy strategy;
 
@@ -59,16 +64,18 @@ public class UniqueValueSerializationStrategyImplTest {
     @Test
     public void testBasicOperation() throws ConnectionException, InterruptedException {
 
-        CollectionScope scope = new CollectionScopeImpl(
-                new SimpleId( "organization" ), new SimpleId( "test" ), "test" );
+        CollectionScope scope =
+                new CollectionScopeImpl( new SimpleId( "organization" ), new SimpleId( "test" ), "test" );
 
         IntegerField field = new IntegerField( "count", 5 );
-        Id entityId = new SimpleId( UUIDGenerator.newTimeUUID(), "entity");
+        Id entityId = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
         UUID version = UUIDGenerator.newTimeUUID();
         UniqueValue stored = new UniqueValueImpl( scope, field, entityId, version );
         strategy.write( stored ).execute();
 
-        UniqueValue retrieved = strategy.load( scope, field );
+        UniqueValueSet fields = strategy.load( scope, Collections.<Field>singleton( field ) );
+
+        UniqueValue retrieved = fields.getValue( field.getName() );
         Assert.assertNotNull( retrieved );
         Assert.assertEquals( stored, retrieved );
     }
@@ -76,13 +83,13 @@ public class UniqueValueSerializationStrategyImplTest {
 
     @Test
     public void testWriteWithTTL() throws InterruptedException, ConnectionException {
-        
-        CollectionScope scope = new CollectionScopeImpl(
-                new SimpleId( "organization" ), new SimpleId( "test" ), "test" );
+
+        CollectionScope scope =
+                new CollectionScopeImpl( new SimpleId( "organization" ), new SimpleId( "test" ), "test" );
 
         // write object that lives 2 seconds
         IntegerField field = new IntegerField( "count", 5 );
-        Id entityId = new SimpleId( UUIDGenerator.newTimeUUID(), "entity");
+        Id entityId = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
         UUID version = UUIDGenerator.newTimeUUID();
         UniqueValue stored = new UniqueValueImpl( scope, field, entityId, version );
         strategy.write( stored, 2 ).execute();
@@ -90,14 +97,19 @@ public class UniqueValueSerializationStrategyImplTest {
         Thread.sleep( 1000 );
 
         // waited one sec, should be still here
-        UniqueValue retrieved = strategy.load( scope, field );
+        UniqueValueSet fields = strategy.load( scope, Collections.<Field>singleton( field ) );
+
+        UniqueValue retrieved = fields.getValue( field.getName() );
+
         Assert.assertNotNull( retrieved );
         Assert.assertEquals( stored, retrieved );
 
         Thread.sleep( 1500 );
 
         // wait another second, should be gone now
-        UniqueValue nullExpected = strategy.load( scope, field );
+        fields = strategy.load( scope, Collections.<Field>singleton( field ) );
+
+        UniqueValue nullExpected = fields.getValue( field.getName() );
         Assert.assertNull( nullExpected );
     }
 
@@ -105,18 +117,22 @@ public class UniqueValueSerializationStrategyImplTest {
     @Test
     public void testDelete() throws ConnectionException {
 
-        CollectionScope scope = new CollectionScopeImpl(
-                new SimpleId( "organization" ), new SimpleId( "test" ), "test" );
+        CollectionScope scope =
+                new CollectionScopeImpl( new SimpleId( "organization" ), new SimpleId( "test" ), "test" );
 
         IntegerField field = new IntegerField( "count", 5 );
-        Id entityId = new SimpleId( UUIDGenerator.newTimeUUID(), "entity");
+        Id entityId = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
         UUID version = UUIDGenerator.newTimeUUID();
         UniqueValue stored = new UniqueValueImpl( scope, field, entityId, version );
         strategy.write( stored ).execute();
 
         strategy.delete( stored ).execute();
 
-        UniqueValue nullExpected = strategy.load( scope, field );
+        UniqueValueSet fields = strategy.load( scope, Collections.<Field>singleton( field ) );
+
+        UniqueValue nullExpected = fields.getValue( field.getName() );
+
+
         Assert.assertNull( nullExpected );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
index 981a14d..b0ccb4f 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
@@ -30,6 +30,7 @@ import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
 import org.apache.usergrid.persistence.collection.mvcc.stage.AbstractMvccEntityStageTest;
 import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
 import org.apache.usergrid.persistence.collection.mvcc.stage.TestEntityGenerator;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.model.entity.Entity;
 
 import com.netflix.astyanax.MutationBatch;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerifyTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerifyTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerifyTest.java
index 64e5e2e..6f31412 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerifyTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerifyTest.java
@@ -36,6 +36,9 @@ import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
 import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
 import org.apache.usergrid.persistence.collection.mvcc.stage.AbstractMvccEntityStageTest;
 import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 import org.apache.usergrid.persistence.model.field.StringField;
@@ -96,20 +99,10 @@ public class WriteOptimisticVerifyTest extends AbstractMvccEntityStageTest {
         // Run the stage
         WriteOptimisticVerify newStage = new WriteOptimisticVerify( noConflictLog );
 
-        CollectionIoEvent<MvccEntity> result;
-        result = newStage.call( new CollectionIoEvent<MvccEntity>( collectionScope, mvccEntity ) );
 
-        assertSame("Context was correct", collectionScope, result.getEntityCollection()) ;
+        newStage.call( new CollectionIoEvent<>( collectionScope, mvccEntity ) );
 
-        // Verify the entity is correct
-        MvccEntity entry = result.getEvent();
 
-        // Verify UUID and version in both the MvccEntity and the entity itself. Here assertSame 
-        // is used on purpose as we want to make sure the same instance is used, not a copy.
-        // This way the caller's runtime type is retained.
-        assertSame( "Id correct", entity.getId(), entry.getId() );
-        assertSame( "Version did not not match entityId", entity.getVersion(), entry.getVersion() );
-        assertSame( "Entity correct", entity, entry.getEntity().get() );
     }
 
     @Test
@@ -159,7 +152,7 @@ public class WriteOptimisticVerifyTest extends AbstractMvccEntityStageTest {
         RollbackAction rollbackAction = new RollbackAction( mvccLog, uvstrat );
 
         try {
-            newStage.call( new CollectionIoEvent<MvccEntity>(scope, mvccEntity));
+            newStage.call( new CollectionIoEvent<>(scope, mvccEntity));
 
         } catch (WriteOptimisticVerifyException e) {
             log.info("Error", e);

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyStageTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyStageTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyStageTest.java
deleted file mode 100644
index d4f6507..0000000
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyStageTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.mvcc.stage.write;
-
-
-import org.jukito.UseModules;
-
-import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-import org.apache.usergrid.persistence.collection.mvcc.stage.AbstractMvccEntityStageTest;
-import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
-import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
-
-import static org.mockito.Mockito.mock;
-
-
-/**
- * TODO: Update the test to correctly test for detecting more than 1 duplicate and exception handling correctly
- *
- * @author tnine
- */
-@UseModules( TestCollectionModule.class )
-public class WriteUniqueVerifyStageTest extends AbstractMvccEntityStageTest {
-
-    @Override
-    protected void validateStage( final CollectionIoEvent<MvccEntity> event ) {
-        UniqueValueSerializationStrategy uvstrat = mock( UniqueValueSerializationStrategy.class );
-        SerializationFig fig = mock( SerializationFig.class );
-        new WriteUniqueVerify( uvstrat, fig ).call( event );
-    }
-}
-
-

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
index 15aff3d..ba89503 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
@@ -29,15 +29,19 @@ import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
 import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.core.cassandra.ITRunner;
 import org.apache.usergrid.persistence.model.entity.Entity;
 
 import com.google.inject.Inject;
+import com.netflix.astyanax.Keyspace;
 
 import static org.apache.usergrid.persistence.collection.mvcc.stage.TestEntityGenerator.fromEntity;
 import static org.apache.usergrid.persistence.collection.mvcc.stage.TestEntityGenerator.generateEntity;
 import static org.junit.Assert.assertSame;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
 
 
 @RunWith( ITRunner.class )
@@ -58,43 +62,12 @@ public class WriteUniqueVerifyTest {
     private SerializationFig fig;
 
 
-    /**
-     * Standard flow
-     */
-    @Test( timeout = 5000 )
-    public void testStartStage() throws Exception {
-
-        final CollectionScope collectionScope = mock( CollectionScope.class );
-
-        // set up the mock to return the entity from the start phase
-        final Entity entity = generateEntity();
-
-        final MvccEntity mvccEntity = fromEntity( entity );
-
-        // run the stage
-        WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, fig );
-
-        CollectionIoEvent<MvccEntity> result = newStage.call( 
-            new CollectionIoEvent<MvccEntity>( collectionScope, mvccEntity ) )
-                .toBlocking().last();
-
-        assertSame( "Context was correct", collectionScope, result.getEntityCollection() );
-
-        // verify the entity is correct
-        MvccEntity entry = result.getEvent();
-
-        // verify uuid and version in both the MvccEntity and the entity itself. assertSame is 
-        // used on purpose.  We want to make sure the same instance is used, not a copy.
-        // this way the caller's runtime type is retained.
-        assertSame( "id correct", entity.getId(), entry.getId() );
-        assertSame( "version did not not match entityId", entity.getVersion(), entry.getVersion() );
-        assertSame( "Entity correct", entity, entry.getEntity().get() );
-    }
 
 
     @Test
     public void testNoFields() {
         final CollectionScope collectionScope = mock( CollectionScope.class );
+        final Keyspace keyspace = mock(Keyspace.class);
 
         // set up the mock to return the entity from the start phase
         final Entity entity = generateEntity();
@@ -102,13 +75,14 @@ public class WriteUniqueVerifyTest {
         final MvccEntity mvccEntity = fromEntity( entity );
 
         // run the stage
-        WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, fig );
+        WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, fig, keyspace );
+
+       newStage.call(
+            new CollectionIoEvent<>( collectionScope, mvccEntity ) ) ;
 
-        CollectionIoEvent<MvccEntity> result = newStage.call( 
-            new CollectionIoEvent<MvccEntity>( collectionScope, mvccEntity ) )
-                .toBlocking().last();
+       //if we get here, it's a success.  We want to test no exceptions are thrown
 
-        assertSame( "Context was correct", collectionScope, result.getEntityCollection() );
+        verify(keyspace, never()).prepareMutationBatch();
     }
 
 }


[9/9] git commit: First pass at updating interfaces

Posted by to...@apache.org.
First pass at updating interfaces


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/fd841758
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/fd841758
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/fd841758

Branch: refs/heads/collection_multiget
Commit: fd841758a0649474337f9522d5f21a51fb439933
Parents: 17cc01d
Author: Todd Nine <to...@apache.org>
Authored: Tue Oct 7 13:33:08 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Tue Oct 7 13:33:08 2014 -0600

----------------------------------------------------------------------
 .../collection/guice/CollectionModule.java      |   5 +-
 .../impl/EntityCollectionManagerImpl.java       |  37 ++--
 .../mvcc/event/PostProcessObserver.java         |  39 ----
 .../mvcc/stage/delete/MarkCommit.java           | 153 +++++++------
 .../mvcc/stage/write/EntityVersion.java         |  64 ------
 .../stage/write/EntityVersionSerializer.java    |  71 ------
 .../mvcc/stage/write/FieldSerializer.java       | 107 ---------
 .../mvcc/stage/write/RollbackAction.java        |   3 +
 .../mvcc/stage/write/UniqueValue.java           |  39 ----
 .../mvcc/stage/write/UniqueValueImpl.java       | 124 -----------
 .../write/UniqueValueSerializationStrategy.java |  66 ------
 .../UniqueValueSerializationStrategyImpl.java   | 194 ----------------
 .../mvcc/stage/write/WriteCommit.java           |   5 +-
 .../mvcc/stage/write/WriteOptimisticVerify.java |  10 +-
 .../mvcc/stage/write/WriteUniqueVerify.java     | 206 ++++++++---------
 .../collection/serialization/UniqueValue.java   |  55 +++++
 .../UniqueValueSerializationStrategy.java       |  68 ++++++
 .../serialization/UniqueValueSet.java           |  32 +++
 .../serialization/impl/EntityVersion.java       |  64 ++++++
 .../impl/EntityVersionSerializer.java           |  71 ++++++
 .../serialization/impl/FieldSerializer.java     | 107 +++++++++
 .../serialization/impl/SerializationModule.java |   3 +-
 .../serialization/impl/UniqueValueImpl.java     | 125 +++++++++++
 .../UniqueValueSerializationStrategyImpl.java   | 219 +++++++++++++++++++
 .../serialization/impl/UniqueValueSetImpl.java  |  85 +++++++
 .../mvcc/stage/delete/MarkCommitTest.java       |  10 +-
 .../write/EntityVersionSerializerTest.java      |   2 +
 .../mvcc/stage/write/FieldSerializerTest.java   |   1 +
 ...niqueValueSerializationStrategyImplTest.java |  50 +++--
 .../mvcc/stage/write/WriteCommitTest.java       |   1 +
 .../stage/write/WriteOptimisticVerifyTest.java  |  17 +-
 .../stage/write/WriteUniqueVerifyStageTest.java |  48 ----
 .../mvcc/stage/write/WriteUniqueVerifyTest.java |  48 +---
 33 files changed, 1101 insertions(+), 1028 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index 9d949e8..3336166 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -29,12 +29,11 @@ import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerialization
 import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLogGenerator;
 import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLogGeneratorImpl;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-import org.apache.usergrid.persistence.collection.mvcc.stage.write.UniqueValueSerializationStrategy;
-import org.apache.usergrid.persistence.collection.mvcc.stage.write.UniqueValueSerializationStrategyImpl;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueSerializationStrategyImpl;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.collection.serialization.impl.SerializationModule;
-import org.apache.usergrid.persistence.collection.service.UUIDService;
 import org.apache.usergrid.persistence.collection.service.impl.ServiceModule;
 import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl;
 import org.apache.usergrid.persistence.core.task.TaskExecutor;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 094baa6..6299acb 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -138,7 +138,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
 
 
         // create our observable and start the write
-        CollectionIoEvent<Entity> writeData = new CollectionIoEvent<Entity>( collectionScope, entity );
+        final CollectionIoEvent<Entity> writeData = new CollectionIoEvent<Entity>( collectionScope, entity );
 
         Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner( writeData,writeStart );
 
@@ -147,15 +147,10 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
         // observable = Concurrent.concurrent( observable, Schedulers.io(), new WaitZip(), 
         //                  writeVerifyUnique, writeOptimisticVerify );
 
-        observable.doOnNext( new Action1<CollectionIoEvent<MvccEntity>>() {
-            @Override
-            public void call( final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent ) {
-                //Queue future write here (verify)
-            }
-        } ).map( writeCommit ).doOnNext( new Action1<Entity>() {
+        observable.map( writeCommit ).doOnNext( new Action1<Entity>() {
             @Override
             public void call( final Entity entity ) {
-                //fork background processing here (start)
+                //TODO fire a task here
 
                 //post-processing to come later. leave it empty for now.
             }
@@ -175,7 +170,13 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
         Preconditions.checkNotNull( entityId.getType(), "Entity type is required in this stage" );
 
         return Observable.from( new CollectionIoEvent<Id>( collectionScope, entityId ) )
-                         .map( markStart ).map( markCommit );
+                         .map( markStart ).doOnNext( markCommit ).map( new Func1<CollectionIoEvent<MvccEntity>,
+                        Void>() {
+                    @Override
+                    public Void call( final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent ) {
+                        return null;
+                    }
+                } );
     }
 
 
@@ -217,7 +218,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
 
                //we an update, signal the fix
 
-                //TODO T.N Change this to use request collapsing
+                //TODO T.N Change this to fire a task
                 Observable.from( new CollectionIoEvent<Id>(collectionScope, entityId ) ).map( load ).subscribeOn( Schedulers.io() ).subscribe();
 
 
@@ -226,27 +227,29 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
     }
 
     // fire the stages
-    public Observable<CollectionIoEvent<MvccEntity>> stageRunner( CollectionIoEvent<Entity> writeData,WriteStart writeState ) {
+    public Observable<CollectionIoEvent<MvccEntity>> stageRunner( CollectionIoEvent<Entity> writeData, WriteStart writeState ) {
 
-        return Observable.from( writeData ).map( writeState ).flatMap(
-                new Func1<CollectionIoEvent<MvccEntity>, Observable<CollectionIoEvent<MvccEntity>>>() {
+        return Observable.from( writeData ).map( writeState ).doOnNext( new Action1<CollectionIoEvent<MvccEntity>>() {
 
                     @Override
-                    public Observable<CollectionIoEvent<MvccEntity>> call(
+                    public void call(
                             final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent ) {
 
                         Observable<CollectionIoEvent<MvccEntity>> unique =
                                 Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() )
-                                          .flatMap( writeVerifyUnique );
+                                          .doOnNext( writeVerifyUnique );
 
 
                         // optimistic verification
                         Observable<CollectionIoEvent<MvccEntity>> optimistic =
                                 Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() )
-                                          .map( writeOptimisticVerify );
+                                          .doOnNext( writeOptimisticVerify );
+
+
+                        //wait for both to finish
+                        Observable.merge( unique, optimistic ).toBlocking().last();
 
 
-                        return Observable.merge( unique, optimistic).last();
                     }
                 } );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/event/PostProcessObserver.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/event/PostProcessObserver.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/event/PostProcessObserver.java
deleted file mode 100644
index b06957d..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/event/PostProcessObserver.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.collection.mvcc.event;
-
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-
-
-/**
- * @author: tnine
- */
-public interface PostProcessObserver {
-
-
-    /**
-     * The entity was comitted by the MVCC system.  Post processing needs to occur
-     *
-     * @param scope The scope used in the write pipeline
-     * @param entity The entity used in the write pipeline
-     *
-     */
-    public void postCommit(CollectionScope scope,  MvccEntity entity );
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
index 4e8d6d5..61a2a36 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
@@ -19,7 +19,6 @@
 package org.apache.usergrid.persistence.collection.mvcc.stage.delete;
 
 
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
@@ -28,7 +27,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
 import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
@@ -37,31 +35,31 @@ import org.apache.usergrid.persistence.collection.mvcc.entity.MvccValidationUtil
 import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
 import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
 import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
-import org.apache.usergrid.persistence.collection.mvcc.stage.write.UniqueValue;
-import org.apache.usergrid.persistence.collection.mvcc.stage.write.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
 import org.apache.usergrid.persistence.core.rx.ObservableIterator;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.field.Field;
 
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import rx.Observable;
-import rx.functions.Func1;
+import rx.functions.Action1;
 
 
 /**
- * This phase should invoke any finalization, and mark the entity 
- * as committed in the data store before returning
+ * This phase should invoke any finalization, and mark the entity as committed in the data store before returning
  */
 @Singleton
-public class MarkCommit implements Func1<CollectionIoEvent<MvccEntity>, Void> {
+public class MarkCommit implements Action1<CollectionIoEvent<MvccEntity>> {
 
     private static final Logger LOG = LoggerFactory.getLogger( MarkCommit.class );
 
@@ -69,28 +67,29 @@ public class MarkCommit implements Func1<CollectionIoEvent<MvccEntity>, Void> {
     private final MvccEntitySerializationStrategy entityStrat;
     private final SerializationFig serializationFig;
     private final UniqueValueSerializationStrategy uniqueValueStrat;
+    private final Keyspace keyspace;
+
 
     @Inject
     public MarkCommit( final MvccLogEntrySerializationStrategy logStrat,
                        final MvccEntitySerializationStrategy entityStrat,
-                       final UniqueValueSerializationStrategy uniqueValueStrat,
-                       final SerializationFig serializationFig) {
+                       final UniqueValueSerializationStrategy uniqueValueStrat, final SerializationFig serializationFig,
+                       final Keyspace keyspace ) {
+
 
-        Preconditions.checkNotNull( 
-                logStrat, "logEntrySerializationStrategy is required" );
-        Preconditions.checkNotNull( 
-                entityStrat, "entitySerializationStrategy is required" );
+        Preconditions.checkNotNull( logStrat, "logEntrySerializationStrategy is required" );
+        Preconditions.checkNotNull( entityStrat, "entitySerializationStrategy is required" );
 
         this.logStrat = logStrat;
         this.entityStrat = entityStrat;
         this.serializationFig = serializationFig;
         this.uniqueValueStrat = uniqueValueStrat;
+        this.keyspace = keyspace;
     }
 
 
-
     @Override
-    public Void call( final CollectionIoEvent<MvccEntity> idIoEvent ) {
+    public void call( final CollectionIoEvent<MvccEntity> idIoEvent ) {
 
         final MvccEntity entity = idIoEvent.getEvent();
 
@@ -103,64 +102,86 @@ public class MarkCommit implements Func1<CollectionIoEvent<MvccEntity>, Void> {
         final CollectionScope collectionScope = idIoEvent.getEntityCollection();
 
 
-        final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version,
-                Stage.COMMITTED, MvccLogEntry.State.DELETED );
+        LOG.debug("Inserting tombstone for entity {} at version {}", entityId, version );
 
-        final MutationBatch logMutation = logStrat.write( collectionScope, startEntry );
+        final MvccLogEntry startEntry =
+                new MvccLogEntryImpl( entityId, version, Stage.COMMITTED, MvccLogEntry.State.DELETED );
+
+        final MutationBatch entityStateBatch = logStrat.write( collectionScope, startEntry );
 
         //insert a "cleared" value into the versions.  Post processing should actually delete
-        MutationBatch entityMutation = entityStrat.mark( collectionScope, entityId, version );
-
-        //merge the 2 into 1 mutation
-        logMutation.mergeShallow( entityMutation );
-
-        //set up the post processing queue
-        //delete unique fields
-        Observable<List<Field>> deleteFieldsObservable = Observable.create(new ObservableIterator<Field>("deleteColumns") {
-            @Override
-            protected Iterator<Field> getIterator() {
-                Iterator<MvccEntity> entities = entityStrat.load(collectionScope, entityId, entity.getVersion(), 1);
-                Iterator<Field> fieldIterator = Collections.emptyIterator();
-                if (entities.hasNext()) {
-                    Optional<Entity> oe = entities.next().getEntity();
-                    if (oe.isPresent()) {
-                        fieldIterator = oe.get().getFields().iterator();
-                    }
-                }
-                return fieldIterator;
-            }
-        }).buffer(serializationFig.getBufferSize())
-                .map(new Func1<List<Field>, List<Field>>() {
-                    @Override
-                    public List<Field> call(List<Field> fields) {
-                        for (Field field : fields) {
-                            try {
-                                UniqueValue value = uniqueValueStrat.load(collectionScope, field);
-                                if (value != null) {
-                                    logMutation.mergeShallow(uniqueValueStrat.delete(value));
-                                }
-                            } catch (ConnectionException ce) {
-                                LOG.error("Failed to delete Unique Value", ce);
-                            }
-                        }
-                        return fields;
-                    }
-                });
-        deleteFieldsObservable.toBlocking().firstOrDefault(null);
 
         try {
-            logMutation.execute();
+            final MutationBatch entityBatch = entityStrat.mark( collectionScope, entityId, version );
+            entityStateBatch.mergeShallow( entityBatch );
+            entityStateBatch.execute();
         }
         catch ( ConnectionException e ) {
-            LOG.error( "Failed to execute write asynchronously ", e );
-            throw new CollectionRuntimeException( entity, collectionScope,
-                    "Failed to execute write asynchronously ", e );
+            throw new RuntimeException( "Unable to mark entry as deleted" );
         }
 
-        /**
-         * We're done executing.
-         */
 
-        return null;
+        //TODO Refactor this logic into a a class that can be invoked from anywhere
+        //load every entity we have history of
+        Observable<List<MvccEntity>> deleteFieldsObservable =
+                Observable.create( new ObservableIterator<MvccEntity>( "deleteColumns" ) {
+                    @Override
+                    protected Iterator<MvccEntity> getIterator() {
+                        Iterator<MvccEntity> entities =
+                                entityStrat.load( collectionScope, entityId, entity.getVersion(), 100 );
+
+                        return entities;
+                    }
+                } )       //buffer them for efficiency
+                          .buffer( serializationFig.getBufferSize() ).doOnNext(
+
+                        new Action1<List<MvccEntity>>() {
+                            @Override
+                            public void call( final List<MvccEntity> mvccEntities ) {
+
+
+                                final MutationBatch batch = keyspace.prepareMutationBatch();
+
+                                for ( MvccEntity mvccEntity : mvccEntities ) {
+                                    if ( !mvccEntity.getEntity().isPresent() ) {
+                                        continue;
+                                    }
+
+                                    final UUID entityVersion = mvccEntity.getVersion();
+
+                                    final Entity entity = mvccEntity.getEntity().get();
+
+                                    //remove all unique fields from the index
+                                    for ( final Field field : entity.getFields() ) {
+
+                                        if(!field.isUnique()){
+                                            continue;
+                                        }
+
+                                        final UniqueValue unique = new UniqueValueImpl( collectionScope, field, entityId, entityVersion );
+
+                                        final MutationBatch deleteMutation = uniqueValueStrat.delete( unique );
+
+                                        batch.mergeShallow( deleteMutation );
+                                    }
+                                }
+
+                                try {
+                                    batch.execute();
+                                }
+                                catch ( ConnectionException e1 ) {
+                                    throw new RuntimeException( "Unable to execute " +
+                                            "unique value " +
+                                            "delete", e1 );
+                                }
+                            }
+                        }
+
+
+                                                                       );
+
+        final int removedCount = deleteFieldsObservable.count().toBlocking().last();
+
+        LOG.debug("Removed unique values for {} entities of entity {}", removedCount, entityId );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/EntityVersion.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/EntityVersion.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/EntityVersion.java
deleted file mode 100644
index d62ae87..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/EntityVersion.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.mvcc.stage.write;
-
-
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.model.entity.Id;
-
-/**
- * Combine entity ID and entity version for use as column name for UniqueValues Column Family.
- */
-class EntityVersion {
-    private final Id entityId;
-    private final UUID entityVersion;
-
-    public EntityVersion(Id id, UUID uuid) {
-        this.entityId = id;
-        this.entityVersion = uuid;
-    }
-
-    public Id getEntityId() {
-        return entityId;
-    }
-
-    public UUID getEntityVersion() {
-        return entityVersion;
-    }
-
-    public boolean equals( Object o ) {
-
-        if ( o == null || !(o instanceof EntityVersion) ) {
-            return false;
-        }
-
-        EntityVersion other = (EntityVersion)o;
-
-        if ( !other.getEntityId().equals( getEntityId() )) {
-            return false;
-        }
-
-        if ( !other.getEntityVersion().equals( getEntityVersion() )) {
-            return false;
-        }
-
-        return true;
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/EntityVersionSerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/EntityVersionSerializer.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/EntityVersionSerializer.java
deleted file mode 100644
index 86b5aff..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/EntityVersionSerializer.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.mvcc.stage.write;
-
-
-import java.nio.ByteBuffer;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.model.entity.SimpleId;
-
-import com.google.common.base.Preconditions;
-import com.netflix.astyanax.model.CompositeBuilder;
-import com.netflix.astyanax.model.Composites;
-import com.netflix.astyanax.model.DynamicComposite;
-import com.netflix.astyanax.serializers.AbstractSerializer;
-import com.netflix.astyanax.serializers.StringSerializer;
-import com.netflix.astyanax.serializers.UUIDSerializer;
-
-/**
- * Serialize EntityVersion, entity ID and version, for use a column name in Unique Values Column Family. 
- */
-public class EntityVersionSerializer extends AbstractSerializer<EntityVersion> {
-
-    private static final Logger LOG = LoggerFactory.getLogger( EntityVersionSerializer.class );
-
-    @Override
-    public ByteBuffer toByteBuffer(final EntityVersion ev) {
-
-        CompositeBuilder builder = Composites.newDynamicCompositeBuilder();
-
-        builder.addTimeUUID( ev.getEntityVersion() );
-        builder.addTimeUUID( ev.getEntityId().getUuid() );
-        builder.addString( ev.getEntityId().getType() );
-
-        return builder.build();
-    }
-
-    @Override
-    public EntityVersion fromByteBuffer(final ByteBuffer byteBuffer) {
-
-        // would use Composites.newDynamicCompositeParser(byteBuffer) but it is not implemented
-
-        DynamicComposite composite = DynamicComposite.fromByteBuffer(byteBuffer);
-        Preconditions.checkArgument(composite.size() == 3, "Composite should have 3 elements");
-
-        final UUID version      = composite.get( 0, UUIDSerializer.get() );
-        final UUID entityId     = composite.get( 1, UUIDSerializer.get() );
-        final String entityType = composite.get( 2, StringSerializer.get() );
-        
-        return new EntityVersion( new SimpleId( entityId, entityType ), version);
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/FieldSerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/FieldSerializer.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/FieldSerializer.java
deleted file mode 100644
index 3719e3e..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/FieldSerializer.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.mvcc.stage.write;
-
-
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.commons.lang3.StringUtils;
-
-import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
-import org.apache.usergrid.persistence.model.field.DoubleField;
-import org.apache.usergrid.persistence.model.field.Field;
-import org.apache.usergrid.persistence.model.field.IntegerField;
-import org.apache.usergrid.persistence.model.field.LongField;
-import org.apache.usergrid.persistence.model.field.StringField;
-import org.apache.usergrid.persistence.model.field.UUIDField;
-
-import com.netflix.astyanax.model.CompositeBuilder;
-import com.netflix.astyanax.model.CompositeParser;
-
-// TODO: replace with "real" serializer
-
-/**
- * Serialize Field for use as part of row-key in Unique Values Column Family.
- */
-public class FieldSerializer implements CompositeFieldSerializer<Field> {
-
-    private static final Logger LOG = LoggerFactory.getLogger( FieldSerializer.class );
-
-    public enum FieldType {
-        BOOLEAN_FIELD,
-        DOUBLE_FIELD,
-        INTEGER_FIELD,
-        LONG_FIELD,
-        STRING_FIELD,
-        UUID_FIELD
-    };
-
-    private static final FieldSerializer INSTANCE = new FieldSerializer();
-
-    @Override
-    public void toComposite( final CompositeBuilder builder, final Field field ) {
-
-        builder.addString( field.getName() );
-
-        // TODO: use the real field value serializer(s) here? Store hash instead?
-        builder.addString( field.getValue().toString() );
-         
-        String simpleName = field.getClass().getSimpleName();
-        int nameIndex = simpleName.lastIndexOf(".");
-        String fieldType = simpleName.substring(nameIndex + 1, simpleName.length() - "Field".length());
-        fieldType = StringUtils.upperCase( fieldType + "_FIELD" );
-
-        builder.addString( fieldType );
-    }
-
-    @Override
-    public Field fromComposite( final CompositeParser composite ) {
-
-        final String name = composite.readString();
-        final String value = composite.readString();
-        final String typeString = composite.readString();
-
-        final FieldType fieldType = FieldType.valueOf( typeString );
-
-        switch (fieldType) {
-            case DOUBLE_FIELD: 
-                return new DoubleField(name, Double.parseDouble(value));
-            case INTEGER_FIELD: 
-                return new IntegerField(name, Integer.parseInt(value));
-            case LONG_FIELD: 
-                return new LongField(name, Long.parseLong(value));
-            case STRING_FIELD: 
-                return new StringField(name, value);
-            case UUID_FIELD: 
-                return new UUIDField(name, UUID.fromString(value));
-            default:
-                throw new RuntimeException("Unknown unique field type");
-        }
-    }
-
-
-    /**
-     * Get the singleton serializer
-     */
-    public static FieldSerializer get() {
-        return INSTANCE;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java
index 7448816..dfccb34 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java
@@ -25,6 +25,9 @@ import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
 import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.field.Field;
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValue.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValue.java
deleted file mode 100644
index de1594a..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValue.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.mvcc.stage.write;
-
-
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.field.Field;
-
-/**
- * Represents a Unique Value of a field within a collection.
- */
-public interface UniqueValue {
-
-    public CollectionScope getCollectionScope();
-
-    public Id getEntityId();
-
-    public Field getField();
-
-    public UUID getEntityVersion();
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueImpl.java
deleted file mode 100644
index 7c86491..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueImpl.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.mvcc.stage.write;
-
-
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.field.Field;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Represents a Unique Value of a field within a collection.
- */
-public class UniqueValueImpl implements UniqueValue {
-    private final CollectionScope collectionScope;
-    private final Field field;
-    private final Id entityId;
-    private final UUID entityVersion;
-
-    public UniqueValueImpl(
-            final CollectionScope scope, final Field field, Id entityId, final UUID version ) {
-
-        Preconditions.checkNotNull( scope, "scope is required" );
-        Preconditions.checkNotNull( field, "field is required" );
-//        Preconditions.checkNotNull( version, "version is required" );
-        Preconditions.checkNotNull( entityId, "entityId is required" );
-
-        this.collectionScope = scope;
-        this.field = field;
-        this.entityVersion = version;
-        this.entityId = entityId;
-    }
-
-    @Override
-    public CollectionScope getCollectionScope() {
-        return collectionScope;
-    }
-
-    @Override
-    public Field getField() {
-        return field;
-    }
-
-    @Override
-    public UUID getEntityVersion() {
-        return entityVersion;
-    }
-
-    @Override
-    public Id getEntityId() {
-        return entityId;
-    }
-
-    
-    @Override
-    public boolean equals( final Object o ) {
-        if ( this == o ) {
-            return true;
-        }
-        if ( o == null || getClass() != o.getClass() ) {
-            return false;
-        }
-
-        final UniqueValueImpl that = ( UniqueValueImpl ) o;
-
-        if ( !getCollectionScope().equals( that.getCollectionScope() ) ) {
-            return false;
-        }
-
-        if ( !getField().equals( that.getField()) ) {
-            return false;
-        }
-
-        if ( !getEntityVersion().equals( that.getEntityVersion() ) ) {
-            return false;
-        }
-
-        if ( !getEntityId().equals( that.getEntityId() ) ) {
-            return false;
-        }
-
-        return true;
-    }
-
-
-    @Override
-    public int hashCode() {
-        int result = 31 * getCollectionScope().hashCode();
-        result = 31 * result + getField().hashCode();
-        result = 31 * result + getEntityVersion().hashCode();
-        result = 31 * result + getEntityId().hashCode();
-        return result;
-    }
-
-
-    @Override
-    public String toString() {
-        return "UniqueValueImpl{" +
-                ", collectionScope =" + collectionScope.getName() +
-                ", field =" + field +
-                ", entityVersion=" + entityVersion +
-                ", entityId =" + entityId +
-                '}';
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueSerializationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueSerializationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueSerializationStrategy.java
deleted file mode 100644
index 9773644..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueSerializationStrategy.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.mvcc.stage.write;
-
-
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.model.field.Field;
-
-
-/**
- * Reads and writes to UniqueValues column family.
- */
-public interface UniqueValueSerializationStrategy {
-
-    /**
-     * Write the specified UniqueValue to Cassandra with optional timeToLive in milliseconds.
-     * 
-     * @param uniqueValue Object to be written
-     * @return MutatationBatch that encapsulates operation, caller may or may not execute.
-     */
-    public MutationBatch write( UniqueValue uniqueValue );
-
-    /**
-     * Write the specified UniqueValue to Cassandra with optional timeToLive in milliseconds.
-     * 
-     * @param uniqueValue Object to be written
-     * @param timeToLive How long object should live in seconds 
-     * @return MutatationBatch that encapsulates operation, caller may or may not execute.
-     */
-    public MutationBatch write( UniqueValue uniqueValue, Integer timeToLive );
-
-    /**
-     * Load UniqueValue that matches field from collection or null if that value does not exist.
-     * 
-     * @param colScope Collection scope in which to look for field name/value
-     * @param field Field name/value to search for
-     * @return UniqueValue or null if not found
-     * @throws ConnectionException on error connecting to Cassandra
-     */
-    public UniqueValue load( CollectionScope colScope, Field field ) throws ConnectionException;
-
-    /**
-     * Delete the specified Unique Value from Cassandra.
-     * 
-     * @param uniqueValue Object to be deleted.
-     * @return MutatationBatch that encapsulates operation, caller may or may not execute.
-     */
-    public MutationBatch delete( UniqueValue uniqueValue );
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueSerializationStrategyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueSerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueSerializationStrategyImpl.java
deleted file mode 100644
index b7e113e..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueSerializationStrategyImpl.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.mvcc.stage.write;
-
-
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-import com.netflix.astyanax.ColumnListMutation;
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
-import com.netflix.astyanax.model.ColumnList;
-import com.netflix.astyanax.util.RangeBuilder;
-import java.util.Collections;
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.serialization.impl.CollectionScopedRowKeySerializer;
-import org.apache.usergrid.persistence.core.astyanax.ColumnTypes;
-import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
-import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
-import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
-import org.apache.usergrid.persistence.core.migration.Migration;
-import org.apache.usergrid.persistence.model.field.Field;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Reads and writes to UniqueValues column family.
- */
-public class UniqueValueSerializationStrategyImpl 
-    implements UniqueValueSerializationStrategy, Migration {
-
-    private static final Logger log = 
-            LoggerFactory.getLogger( UniqueValueSerializationStrategyImpl.class );
-
-    // TODO: use "real" field serializer here instead once it is ready
-    private static final CollectionScopedRowKeySerializer<Field> ROW_KEY_SER =
-            new CollectionScopedRowKeySerializer<Field>( FieldSerializer.get() );
-
-    private static final EntityVersionSerializer ENTITY_VERSION_SER = new EntityVersionSerializer();
-
-    private static final MultiTennantColumnFamily<CollectionScope, Field, EntityVersion>
-        CF_UNIQUE_VALUES =
-            new MultiTennantColumnFamily<CollectionScope, Field, EntityVersion>( "Unique_Values",
-                ROW_KEY_SER,
-                ENTITY_VERSION_SER );
-
-    protected final Keyspace keyspace;
-
-
-    /**
-     * Construct serialization strategy for keyspace.
-     * @param keyspace Keyspace in which to store Unique Values.
-     */
-    @Inject
-    public UniqueValueSerializationStrategyImpl( final Keyspace keyspace ) {
-        this.keyspace = keyspace;
-    }
-
-
-    @Override
-    public java.util.Collection getColumnFamilies() {
-
-        MultiTennantColumnFamilyDefinition cf = new MultiTennantColumnFamilyDefinition(
-                CF_UNIQUE_VALUES,
-                BytesType.class.getSimpleName(),
-                ColumnTypes.DYNAMIC_COMPOSITE_TYPE,
-                BytesType.class.getSimpleName(), MultiTennantColumnFamilyDefinition.CacheOption.KEYS );
-
-        return Collections.singleton( cf );
-    }
-
-
-    public MutationBatch write( UniqueValue uniqueValue ) {
-        return write( uniqueValue, Integer.MAX_VALUE );
-    }
-
-
-    @Override
-    public MutationBatch write( UniqueValue value, Integer timeToLive ) {
-
-        Preconditions.checkNotNull( value, "value is required" );
-        Preconditions.checkNotNull( timeToLive, "timeToLive is required" );
-
-        log.debug("Writing unique value scope={} id={} version={} name={} value={} ttl={} ",
-            new Object[] { 
-                value.getCollectionScope().getName(),
-                value.getEntityId(),
-                value.getEntityVersion(),
-                value.getField().getName(),
-                value.getField().getValue(),
-                timeToLive
-         });
-
-        final EntityVersion ev = new EntityVersion( value.getEntityId(), value.getEntityVersion() );
-
-        final Integer ttl;
-        if ( timeToLive.equals( Integer.MAX_VALUE )) {
-            ttl = null;
-        } else {
-            ttl = timeToLive;
-        }
-
-        return doWrite( value.getCollectionScope(), value.getField(),
-            new UniqueValueSerializationStrategyImpl.RowOp() {
-
-            @Override
-            public void doOp( final ColumnListMutation<EntityVersion> colMutation ) {
-                colMutation.putColumn( ev, 0x0, ttl );
-            }
-        } );
-    }
-
-
-    @Override
-    public MutationBatch delete(UniqueValue value) {
-
-        Preconditions.checkNotNull( value, "value is required" );
-
-        final EntityVersion ev = new EntityVersion( value.getEntityId(), value.getEntityVersion() );
-
-        return doWrite( value.getCollectionScope(), value.getField(),
-            new UniqueValueSerializationStrategyImpl.RowOp() {
-
-            @Override
-            public void doOp( final ColumnListMutation<EntityVersion> colMutation ) {
-                colMutation.deleteColumn(ev);
-            }
-        } );
-    }
-
-
-    /**
-     * Do the column update or delete for the given column and row key
-     * @param context We need to use this when getting the keyspace
-     */
-    private MutationBatch doWrite( CollectionScope context, Field field, RowOp op ) {
-        final MutationBatch batch = keyspace.prepareMutationBatch();
-        op.doOp( batch.withRow( CF_UNIQUE_VALUES, ScopedRowKey.fromKey( context, field ) ) );
-        return batch;
-    }
-
-
-    @Override
-    public UniqueValue load( CollectionScope colScope, Field field ) throws ConnectionException {
-
-        Preconditions.checkNotNull( field, "field is required" );
-
-        ColumnList<EntityVersion> result;
-        try {
-            result = keyspace.prepareQuery( CF_UNIQUE_VALUES )
-                .getKey( ScopedRowKey.fromKey( colScope, field ) )
-                .withColumnRange(new RangeBuilder().setLimit(1).build())
-                .execute()
-                .getResult();
-        }
-        catch ( NotFoundException nfe ) {
-            return null;
-        }
-
-        if ( result.isEmpty() ) {
-            return null;
-        }
-
-        EntityVersion ev = result.getColumnByIndex(0).getName();
-
-        return new UniqueValueImpl( colScope, field, ev.getEntityId(), ev.getEntityVersion() );
-    }
-
-
-    /**
-     * Simple callback to perform puts and deletes with a common row setup code
-     */
-    private static interface RowOp {
-        void doOp( ColumnListMutation<EntityVersion> colMutation );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
index 0004ddb..49e967f 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
@@ -33,6 +33,9 @@ import org.apache.usergrid.persistence.collection.mvcc.entity.MvccValidationUtil
 import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
 import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
 import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.collection.util.EntityUtils;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.model.entity.Entity;
@@ -110,7 +113,7 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Entity>
 
             if ( field.isUnique() ) {
 
-                UniqueValue written  = new UniqueValueImpl( ioEvent.getEntityCollection(), field, 
+                UniqueValue written  = new UniqueValueImpl( ioEvent.getEntityCollection(), field,
                     mvccEntity.getEntity().get().getId(), mvccEntity.getEntity().get().getVersion());
                 MutationBatch mb = uniqueValueStrat.write( written );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java
index b5e5873..dcdb408 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java
@@ -38,6 +38,7 @@ import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
+import rx.functions.Action1;
 import rx.functions.Func1;
 
 
@@ -46,7 +47,7 @@ import rx.functions.Func1;
  */
 @Singleton
 public class WriteOptimisticVerify 
-    implements Func1<CollectionIoEvent<MvccEntity>, CollectionIoEvent<MvccEntity>> {
+    implements Action1<CollectionIoEvent<MvccEntity>> {
 
     private static final Logger log = LoggerFactory.getLogger( WriteOptimisticVerify.class );
 
@@ -59,7 +60,7 @@ public class WriteOptimisticVerify
 
 
     @Override
-    public CollectionIoEvent<MvccEntity> call( final CollectionIoEvent<MvccEntity> ioevent ) {
+    public void call( final CollectionIoEvent<MvccEntity> ioevent ) {
         MvccValidationUtils.verifyMvccEntityWithEntity( ioevent.getEvent() );
 
         // If the version was included on the entity write operation (delete or write) we need
@@ -74,7 +75,7 @@ public class WriteOptimisticVerify
         CollectionScope collectionScope = ioevent.getEntityCollection();
 
         if(entity.getVersion() == null){
-            return ioevent;
+            return;
         }
 
         try {
@@ -98,8 +99,7 @@ public class WriteOptimisticVerify
                 "Error reading entity log", e );
         }
 
-        // No op, just emit the value
-        return ioevent;
+
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
index 713703f..e6d37fc 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
@@ -18,36 +18,46 @@
 package org.apache.usergrid.persistence.collection.mvcc.stage.write;
 
 
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccValidationUtils;
 import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
+import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
 import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.field.Field;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
 import rx.Observable;
-import rx.functions.Func1;
-import rx.functions.FuncN;
-import rx.schedulers.Schedulers;
+import rx.functions.Action1;
 
 
 /**
  * This phase execute all unique value verification on the MvccEntity.
  */
 @Singleton
-public class WriteUniqueVerify implements 
-        Func1<CollectionIoEvent<MvccEntity>, Observable<? extends CollectionIoEvent<MvccEntity>>> {
+public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>> {
 
     private static final Logger LOG = LoggerFactory.getLogger( WriteUniqueVerify.class );
 
@@ -55,16 +65,16 @@ public class WriteUniqueVerify implements
 
     protected final SerializationFig serializationFig;
 
+    protected final Keyspace keyspace;
+
 
     @Inject
-    public WriteUniqueVerify( 
-            final UniqueValueSerializationStrategy uniqueValueSerializiationStrategy, 
-            final SerializationFig serializationFig ) {
+    public WriteUniqueVerify( final UniqueValueSerializationStrategy uniqueValueSerializiationStrategy,
+                              final SerializationFig serializationFig, final Keyspace keyspace ) {
+        this.keyspace = keyspace;
 
-        Preconditions.checkNotNull( uniqueValueSerializiationStrategy, 
-                "uniqueValueSerializationStrategy is required" );
-        Preconditions.checkNotNull( serializationFig, 
-                "serializationFig is required" );
+        Preconditions.checkNotNull( uniqueValueSerializiationStrategy, "uniqueValueSerializationStrategy is required" );
+        Preconditions.checkNotNull( serializationFig, "serializationFig is required" );
 
         this.uniqueValueStrat = uniqueValueSerializiationStrategy;
         this.serializationFig = serializationFig;
@@ -72,8 +82,7 @@ public class WriteUniqueVerify implements
 
 
     @Override
-    public Observable<? extends CollectionIoEvent<MvccEntity>> 
-        call(final CollectionIoEvent<MvccEntity> ioevent ) {
+    public void call( final CollectionIoEvent<MvccEntity> ioevent ) {
 
         MvccValidationUtils.verifyMvccEntityWithEntity( ioevent.getEvent() );
 
@@ -81,131 +90,100 @@ public class WriteUniqueVerify implements
 
         final Entity entity = mvccEntity.getEntity().get();
 
-        // use simple thread pool to verify fields in parallel
+        final Id entityId = entity.getId();
+
+        final UUID entityVersion = entity.getVersion();
 
-        // We want to use concurrent to fork all validations this way they're wrapped by timeouts 
-        // and Hystrix thread pools for JMX operations.  See the WriteCommand in the 
-        // EntityCollectionManagerImpl. 
+        final CollectionScope scope = ioevent.getEntityCollection();
+
+        // use simple thread pool to verify fields in parallel
 
-        // TODO: still needs to be added to the Concurrent utility class?
+        final Collection<Field> entityFields = entity.getFields();
 
-        final List<Observable<FieldUniquenessResult>> fields =
-                new ArrayList<Observable<FieldUniquenessResult>>();
+        //allocate our max size, worst case
+        final List<Field> uniqueFields = new ArrayList<>( entityFields.size() );
 
+        final MutationBatch batch = keyspace.prepareMutationBatch();
         //
         // Construct all the functions for verifying we're unique
         //
-        for ( final Field field : entity.getFields() ) {
+        for ( final Field field : entityFields ) {
 
             // if it's unique, create a function to validate it and add it to the list of 
             // concurrent validations
             if ( field.isUnique() ) {
 
-                Observable<FieldUniquenessResult> result =  Observable.from( field )
-                        .subscribeOn( Schedulers.io() )
-                        .map(new Func1<Field,  FieldUniquenessResult>() {
-
-                    @Override
-                    public FieldUniquenessResult call(Field field ) {
-
-                        // use write-first then read strategy
-                        UniqueValue written = new UniqueValueImpl( 
-                            ioevent.getEntityCollection(), field, entity.getId(), mvccEntity.getVersion() );
-
-                        // use TTL in case something goes wrong before entity is finally committed
-                        MutationBatch mb = uniqueValueStrat.write( written, serializationFig.getTimeout() );
-
-                        try {
-                            mb.execute();
-                        }
-                        catch ( ConnectionException ex ) {
-                            throw new RuntimeException("Unable to write to cassandra", ex );
-                        }
-
-                        // does the database value match what we wrote?
-                        UniqueValue loaded;
-                        try {
-                            loaded = uniqueValueStrat.load( ioevent.getEntityCollection(), field );
-                        }
-                        catch ( ConnectionException ex ) {
-                            throw new RuntimeException("Unable to read from cassandra", ex );
-                        }
-
-                        return new FieldUniquenessResult( 
-                            field, loaded.getEntityId().equals( written.getEntityId() ) );
-                    }
-                } );
-
-                fields.add(result);
+
+                // use write-first then read strategy
+                final UniqueValue written = new UniqueValueImpl( scope, field, entityId, entityVersion );
+
+                // use TTL in case something goes wrong before entity is finally committed
+                final MutationBatch mb = uniqueValueStrat.write( written, serializationFig.getTimeout() );
+
+                batch.mergeShallow( mb );
+
+
+                uniqueFields.add( field );
             }
         }
 
-        //short circuit.  If we zip up nothing, we block forever.
-        if(fields.size() == 0){
-            return Observable.from(ioevent );
+        //short circuit nothing to do
+        if ( uniqueFields.size() == 0 ) {
+            return;
         }
 
-        //
-        // Zip the results up
-        //
-        final FuncN<CollectionIoEvent<MvccEntity>> zipFunction = 
-                new FuncN<CollectionIoEvent<MvccEntity>>() {
-            
-            @Override
-            public CollectionIoEvent<MvccEntity> call( final Object... args ) {
-
-                Map<String, Field> uniquenessVioloations = new HashMap<String, Field>();
-
-                for ( Object resultObj : args ) {
-                    FieldUniquenessResult result = ( FieldUniquenessResult ) resultObj;
-                    if ( !result.isUnique() ) {
-                        Field field = result.getField();
-                        uniquenessVioloations.put( field.getName(), field );
-                    }
-                }
-
-                if ( !uniquenessVioloations.isEmpty() ) {
-                    throw new WriteUniqueVerifyException( 
-                            mvccEntity, ioevent.getEntityCollection(), uniquenessVioloations );
-                }
-                    
-                //return the original event
-                return ioevent;
-            }
-        };
 
-        return Observable.zip( fields, zipFunction );
-    }
+        //perform the write
+        try {
+            batch.execute();
+        }
+        catch ( ConnectionException ex ) {
+            throw new RuntimeException( "Unable to write to cassandra", ex );
+        }
 
 
-    static class FieldUniquenessResult {
-        private Field field;
-        private Boolean unique;
 
+        //now get the set of fields back
+        final UniqueValueSet uniqueValues;
 
-        public FieldUniquenessResult( Field f, Boolean u ) {
-            this.field = f;
-            this.unique = u;
+        try {
+            uniqueValues = uniqueValueStrat.load( scope, uniqueFields );
+        }
+        catch ( ConnectionException e ) {
+            throw new RuntimeException( "Unable to read from cassandra", e );
         }
 
 
-        public Boolean isUnique() {
-            return unique;
-        }
+        final Map<String, Field> uniquenessViolations = new HashMap<>( uniqueFields.size());
 
 
-        public void setUnique( Boolean isUnique ) {
-            this.unique = isUnique;
-        }
+        //loop through each field that was unique
+        for ( final Field field : uniqueFields ) {
+
+            final UniqueValue uniqueValue = uniqueValues.getValue( field.getName() );
+
+            if ( uniqueValue == null ) {
+                throw new RuntimeException(
+                        String.format( "Could not retrieve unique value for field %s, unable to verify",
+                                field.getName() ) );
+            }
+
 
+            final Id returnedEntityId = uniqueValue.getEntityId();
 
-        public Field getField() {
-            return field;
+
+            if ( !entityId.equals( returnedEntityId ) ) {
+                uniquenessViolations.put( field.getName(), field );
+            }
         }
 
 
-        public void setField( Field field ) {
-            this.field = field;
+        //We have violations, throw an exception
+        if ( !uniquenessViolations.isEmpty() ) {
+            throw new WriteUniqueVerifyException( mvccEntity, ioevent.getEntityCollection(), uniquenessViolations );
         }
+
+
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValue.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValue.java
new file mode 100644
index 0000000..9749101
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValue.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.collection.serialization;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.field.Field;
+
+/**
+ * Represents a Unique Value of a field within a collection.
+ */
+public interface UniqueValue {
+
+    /**
+     * The scope of this value
+     * @return
+     */
+    public CollectionScope getCollectionScope();
+
+    /**
+     * The entity Id that owns this value
+     * @return
+     */
+    public Id getEntityId();
+
+    /**
+     * The field value
+     * @return
+     */
+    public Field getField();
+
+    /**
+     * The version of the entity that owns this value
+     * @return
+     */
+    public UUID getEntityVersion();
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
new file mode 100644
index 0000000..efcc60d
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.collection.serialization;
+
+
+import java.util.Collection;
+
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.model.field.Field;
+
+
+/**
+ * Reads and writes to UniqueValues column family.
+ */
+public interface UniqueValueSerializationStrategy {
+
+    /**
+     * Write the specified UniqueValue to Cassandra with optional timeToLive in milliseconds.
+     * 
+     * @param uniqueValue Object to be written
+     * @return MutatationBatch that encapsulates operation, caller may or may not execute.
+     */
+    public MutationBatch write( UniqueValue uniqueValue );
+
+    /**
+     * Write the specified UniqueValue to Cassandra with optional timeToLive in milliseconds.
+     * 
+     * @param uniqueValue Object to be written
+     * @param timeToLive How long object should live in seconds 
+     * @return MutatationBatch that encapsulates operation, caller may or may not execute.
+     */
+    public MutationBatch write( UniqueValue uniqueValue, Integer timeToLive );
+
+    /**
+     * Load UniqueValue that matches field from collection or null if that value does not exist.
+     * 
+     * @param colScope Collection scope in which to look for field name/value
+     * @param fields Field name/value to search for
+     * @return UniqueValueSet containing fields from the collection that exist in cassandra
+     * @throws ConnectionException on error connecting to Cassandra
+     */
+    public UniqueValueSet load( CollectionScope colScope, Collection<Field> fields ) throws ConnectionException;
+
+    /**
+     * Delete the specified Unique Value from Cassandra.
+     * 
+     * @param uniqueValue Object to be deleted.
+     * @return MutatationBatch that encapsulates operation, caller may or may not execute.
+     */
+    public MutationBatch delete( UniqueValue uniqueValue );
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSet.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSet.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSet.java
new file mode 100644
index 0000000..5436a11
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSet.java
@@ -0,0 +1,32 @@
+package org.apache.usergrid.persistence.collection.serialization;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+/**
+ * A read only view of unique values
+ */
+public interface UniqueValueSet extends Iterable<UniqueValue> {
+
+    /**
+     * Get the unique value for the field
+     * @param fieldName
+     * @return
+     */
+    public UniqueValue getValue(final String fieldName);
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityVersion.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityVersion.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityVersion.java
new file mode 100644
index 0000000..274cf5d
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityVersion.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.model.entity.Id;
+
+/**
+ * Combine entity ID and entity version for use as column name for UniqueValues Column Family.
+ */
+public class EntityVersion {
+    private final Id entityId;
+    private final UUID entityVersion;
+
+    public EntityVersion(Id id, UUID uuid) {
+        this.entityId = id;
+        this.entityVersion = uuid;
+    }
+
+    public Id getEntityId() {
+        return entityId;
+    }
+
+    public UUID getEntityVersion() {
+        return entityVersion;
+    }
+
+    public boolean equals( Object o ) {
+
+        if ( o == null || !(o instanceof EntityVersion) ) {
+            return false;
+        }
+
+        EntityVersion other = (EntityVersion)o;
+
+        if ( !other.getEntityId().equals( getEntityId() )) {
+            return false;
+        }
+
+        if ( !other.getEntityVersion().equals( getEntityVersion() )) {
+            return false;
+        }
+
+        return true;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityVersionSerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityVersionSerializer.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityVersionSerializer.java
new file mode 100644
index 0000000..810a1fc
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityVersionSerializer.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+
+import com.google.common.base.Preconditions;
+import com.netflix.astyanax.model.CompositeBuilder;
+import com.netflix.astyanax.model.Composites;
+import com.netflix.astyanax.model.DynamicComposite;
+import com.netflix.astyanax.serializers.AbstractSerializer;
+import com.netflix.astyanax.serializers.StringSerializer;
+import com.netflix.astyanax.serializers.UUIDSerializer;
+
+/**
+ * Serialize EntityVersion, entity ID and version, for use a column name in Unique Values Column Family. 
+ */
+public class EntityVersionSerializer extends AbstractSerializer<EntityVersion> {
+
+    private static final Logger LOG = LoggerFactory.getLogger( EntityVersionSerializer.class );
+
+    @Override
+    public ByteBuffer toByteBuffer(final EntityVersion ev) {
+
+        CompositeBuilder builder = Composites.newDynamicCompositeBuilder();
+
+        builder.addTimeUUID( ev.getEntityVersion() );
+        builder.addTimeUUID( ev.getEntityId().getUuid() );
+        builder.addString( ev.getEntityId().getType() );
+
+        return builder.build();
+    }
+
+    @Override
+    public EntityVersion fromByteBuffer(final ByteBuffer byteBuffer) {
+
+        // would use Composites.newDynamicCompositeParser(byteBuffer) but it is not implemented
+
+        DynamicComposite composite = DynamicComposite.fromByteBuffer(byteBuffer);
+        Preconditions.checkArgument(composite.size() == 3, "Composite should have 3 elements");
+
+        final UUID version      = composite.get( 0, UUIDSerializer.get() );
+        final UUID entityId     = composite.get( 1, UUIDSerializer.get() );
+        final String entityType = composite.get( 2, StringSerializer.get() );
+        
+        return new EntityVersion( new SimpleId( entityId, entityType ), version);
+    }
+    
+}