You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/09/16 22:34:36 UTC

[07/25] usergrid git commit: Initial integration of Qakka into Usergrid Queue module, and implementation of Qakka-based LegacyQueueManager implementation.

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java
new file mode 100644
index 0000000..572c897
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.sharding;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.impl.ShardSerializationImpl;
+import org.apache.usergrid.persistence.qakka.AbstractTest;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Created by russo on 6/9/16.
+ */
+public class ShardIteratorTest extends AbstractTest {
+
+    private static final Logger logger = LoggerFactory.getLogger( ShardIteratorTest.class );
+
+    @Test
+    public void getActiveShards(){
+
+        CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+        ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraClient );
+
+        Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null);
+        Shard shard2 = new Shard("test", "region1", Shard.Type.DEFAULT, 200L, null);
+
+        shardSerialization.createShard(shard1);
+        shardSerialization.createShard(shard2);
+
+        Iterator<Shard> shardIterator = new ShardIterator(
+                cassandraClient, "test", "region1", Shard.Type.DEFAULT, Optional.empty());
+
+        List<Shard> shards = new ArrayList<>(1);
+
+
+        shardIterator.forEachRemaining(shard -> {
+
+            logger.info("Shard ID: {}", shard.getShardId());
+            shards.add(shard);
+
+        });
+
+        assertTrue(shards.size() == 2 && shards.get(0).equals(shard1) && shards.get(1).equals(shard2));
+
+
+    }
+
+    @Test
+    public void seekActiveShards(){
+
+        CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+        ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraClient );
+        
+        Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null);
+        Shard shard2 = new Shard("test", "region1", Shard.Type.DEFAULT, 200L, null);
+        Shard shard3 = new Shard("test", "region1", Shard.Type.DEFAULT, 300L, null);
+
+        shardSerialization.createShard(shard1);
+        shardSerialization.createShard(shard2);
+        shardSerialization.createShard(shard3);
+
+
+        Iterator<Shard> shardIterator = new ShardIterator(
+                cassandraClient, "test", "region1", Shard.Type.DEFAULT, Optional.of(200L));
+
+        List<Shard> shards = new ArrayList<>(1);
+
+        shardIterator.forEachRemaining(shard -> {
+
+            logger.info("Shard ID: {}", shard.getShardId());
+            shards.add(shard);
+
+        });
+
+        assertTrue(shards.size() == 1 && shards.get(0).equals(shard3));
+    }
+
+
+    @Test
+    public void shardIteratorOrdering() throws Exception {
+
+        CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+        ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraClient );
+
+        int numShards = 10;
+        String region = "default";
+        String queueName = "sit_queue_" + RandomStringUtils.randomAlphanumeric(20);
+        
+        for ( long i=0; i<numShards; i++) {
+            UUID messageId = QakkaUtils.getTimeUuid();
+            Shard shard = new Shard( queueName, region, Shard.Type.DEFAULT, i+1, messageId );
+            shardSerialization.createShard( shard );
+            try { Thread.sleep(10); } catch (Exception intentionallyIgnored) {}
+        }
+        
+        Iterator<Shard> shardIterator = new ShardIterator(
+                cassandraClient, queueName, region, Shard.Type.DEFAULT, Optional.empty()); 
+
+        int count = 0;
+        Long prevTimestamp = null;
+        while ( shardIterator.hasNext() ) {
+            Shard shard = shardIterator.next();
+            if ( prevTimestamp != null ) {
+                Assert.assertTrue( prevTimestamp < shard.getPointer().timestamp() );
+            }
+            prevTimestamp = shard.getPointer().timestamp();
+            count++;
+        }
+        
+        Assert.assertEquals( numShards, count );
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerializationTest.java
new file mode 100644
index 0000000..e1a541b
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerializationTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.sharding;
+
+import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
+import org.apache.usergrid.persistence.qakka.AbstractTest;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.impl.ShardSerializationImpl;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+
+/**
+ * Created by russo on 6/8/16.
+ */
+public class ShardSerializationTest extends AbstractTest {
+
+    private static final Logger logger = LoggerFactory.getLogger( ShardSerializationTest.class );
+
+
+    @Test
+    public void writeNewShard(){
+
+        CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+        ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraClient );
+
+        Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null);
+        shardSerialization.createShard(shard1);
+    }
+
+    @Test
+    public void deleteShard(){
+
+        CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+        ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraClient );
+
+        Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null);
+
+        shardSerialization.createShard(shard1);
+        shardSerialization.deleteShard(shard1);
+        assertNull(shardSerialization.loadShard(shard1));
+
+
+
+    }
+
+    @Test
+    public void loadNullShard(){
+
+        CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+        ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraClient );
+
+        Shard shard1 = new Shard("junk", "region1", Shard.Type.DEFAULT, 100L, null);
+
+        assertNull(shardSerialization.loadShard(shard1));
+
+
+
+    }
+
+    @Test
+    public void updatePointer(){
+
+        CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+        ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraClient );
+        
+        Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null);
+        shardSerialization.createShard(shard1);
+
+        final UUID pointer = QakkaUtils.getTimeUuid();
+
+        shard1.setPointer(pointer);
+        shardSerialization.updateShardPointer(shard1);
+
+        Shard returnedShard = shardSerialization.loadShard(shard1);
+
+        assertEquals(pointer, returnedShard.getPointer());
+
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardStrategyTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardStrategyTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardStrategyTest.java
new file mode 100644
index 0000000..ea73abc
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardStrategyTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.sharding;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.persistence.qakka.AbstractTest;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
+import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.UUID;
+
+
+public class ShardStrategyTest extends AbstractTest {
+
+    private static final Logger logger = LoggerFactory.getLogger( ShardStrategyTest.class );
+
+
+    @Test
+    public void testBasicOperation() {
+
+        CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+        cassandraClient.getSession();
+
+
+        ShardSerialization shardSer   = getInjector().getInstance( ShardSerialization.class );
+        ShardStrategy shardStrategy   = getInjector().getInstance( ShardStrategy.class );
+
+        UUID messageIdToLocate = null;
+        long selectedShardId = 4L;
+        
+        int numShards = 10;
+        String region = "default";
+        String queueName = "sst_queue_" + RandomStringUtils.randomAlphanumeric(20);
+
+        for ( long i=0; i<numShards; i++) {
+            shardSer.createShard( new Shard( queueName, region, Shard.Type.DEFAULT, i, QakkaUtils.getTimeUuid()));
+            try { Thread.sleep(10); } catch (Exception intentionallyIgnored) {}
+            if ( i == selectedShardId ) {
+                messageIdToLocate = QakkaUtils.getTimeUuid();
+            }
+            try { Thread.sleep(10); } catch (Exception intentionallyIgnored) {}
+        }
+
+        Shard selectedShard = shardStrategy.selectShard( queueName, region, Shard.Type.DEFAULT, messageIdToLocate );
+
+        Assert.assertEquals( selectedShardId, selectedShard.getShardId() );
+        
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardTest.java
new file mode 100644
index 0000000..fba135a
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.sharding;
+
+import org.apache.usergrid.persistence.qakka.AbstractTest;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Created by russo on 6/9/16.
+ */
+public class ShardTest extends AbstractTest {
+
+    
+    @Test
+    public void testEquals(){
+
+        Shard shard1 = new Shard("test", "region", Shard.Type.DEFAULT, 100L, null);
+        Shard shard2 = new Shard("test", "region", Shard.Type.DEFAULT, 100L, null);
+
+        assertEquals(shard1, shard2);
+
+    }
+
+    @Test
+    public void testHashCode(){
+
+        Shard shard1 = new Shard("test", "region", Shard.Type.DEFAULT, 10000000000L, null);
+        Shard shard2 = new Shard("test", "region", Shard.Type.DEFAULT, 10000000000L, null);
+
+        assertEquals(shard1.hashCode(), shard2.hashCode());
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLogSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLogSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLogSerializationTest.java
new file mode 100644
index 0000000..20b72b0
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLogSerializationTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.transferlog;
+
+import com.datastax.driver.core.PagingState;
+import org.apache.cassandra.utils.UUIDGen;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.persistence.qakka.AbstractTest;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
+import org.apache.usergrid.persistence.qakka.exceptions.QakkaException;
+import org.apache.usergrid.persistence.qakka.serialization.Result;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+
+public class TransferLogSerializationTest extends AbstractTest {
+
+    @Test
+    public void recordTransferLog() throws Exception {
+
+        TransferLogSerialization logSerialization = getInjector().getInstance( TransferLogSerialization.class );
+        
+        CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+        cassandraClient.getSession();
+
+        String queueName = "tlst_queue_" + RandomStringUtils.randomAlphanumeric( 15 );
+        String source = RandomStringUtils.randomAlphanumeric( 15 );
+        String dest = RandomStringUtils.randomAlphanumeric( 15 );
+        
+        int numLogs = 100;
+        
+        for ( int i=0; i<numLogs; i++ ) {
+            logSerialization.recordTransferLog( queueName, source, dest, UUIDGen.getTimeUUID());
+        }
+
+        int count = 0;
+        int fetchCount = 0;
+        PagingState pagingState = null;
+        while ( true ) {
+            
+            Result<TransferLog> all = logSerialization.getAllTransferLogs( pagingState, 10 );
+                   
+            // we only want entities for our queue
+            List<TransferLog> logs = all.getEntities().stream()
+                .filter( log -> log.getQueueName().equals( queueName ) ).collect( Collectors.toList() );
+
+            count += logs.size();
+            fetchCount++;
+            if ( all.getPagingState() == null ) {
+                break;
+            } 
+            pagingState = all.getPagingState();
+        }
+
+        Assert.assertEquals( numLogs, count );
+    }
+
+    @Test
+    public void removeTransferLog() throws Exception {
+
+        TransferLogSerialization logSerialization = getInjector().getInstance( TransferLogSerialization.class );
+
+        CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+        cassandraClient.getSession(); 
+        
+        String queueName = "tlst_queue_" + RandomStringUtils.randomAlphanumeric( 15 );
+        String source = RandomStringUtils.randomAlphanumeric( 15 );
+        String dest = RandomStringUtils.randomAlphanumeric( 15 );
+
+        UUID messageId = UUIDGen.getTimeUUID();
+        logSerialization.recordTransferLog( queueName, source, dest, messageId );
+
+        List<TransferLog> allLogs = getTransferLogs( logSerialization );
+
+        // we only want entities for our queue
+        List<TransferLog> logs = allLogs.stream()
+                .filter( log -> log.getQueueName().equals( queueName ) ).collect( Collectors.toList() );
+        Assert.assertEquals( 1, logs.size());
+
+        logSerialization.removeTransferLog( queueName, source, dest, messageId );
+        
+        List<TransferLog> all = getTransferLogs( logSerialization );
+        logs = all.stream()
+            .filter( log -> log.getQueueName().equals( queueName ) ).collect( Collectors.toList() );
+        Assert.assertEquals( 0, logs.size());
+        
+        try {
+            logSerialization.removeTransferLog( queueName, source, dest, messageId );
+            Assert.fail("Removing non-existent log should throw exception");
+            
+        } catch ( QakkaException expected ) {
+            // success!
+        }
+    }
+
+    private List<TransferLog> getTransferLogs(TransferLogSerialization logSerialization) {
+        PagingState pagingState = null;
+        List<TransferLog> allLogs = new ArrayList<>();
+        while ( true ) {
+            Result<TransferLog> result = logSerialization.getAllTransferLogs( pagingState, 100 );
+            allLogs.addAll( result.getEntities() );
+            if ( result.getPagingState() == null ) {
+                break;
+            }
+            pagingState = result.getPagingState();
+        }
+        return allLogs;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
index 69655e5..4b6e9d3 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
@@ -20,72 +20,67 @@
 package org.apache.usergrid.persistence.queue;
 
 
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.qakka.AbstractTest;
+import org.apache.usergrid.persistence.qakka.App;
+import org.apache.usergrid.persistence.qakka.QakkaModule;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
+import org.apache.usergrid.persistence.queue.guice.QueueModule;
+import org.apache.usergrid.persistence.queue.impl.LegacyQueueScopeImpl;
+import org.junit.Ignore;
+import org.junit.Test;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule;
-import org.apache.usergrid.persistence.core.test.ITRunner;
-import org.apache.usergrid.persistence.core.test.UseModules;
-import org.apache.usergrid.persistence.queue.guice.TestQueueModule;
-import org.apache.usergrid.persistence.queue.impl.LegacyQueueScopeImpl;
-
-import com.google.inject.Inject;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assume.assumeTrue;
-
 
-@RunWith( ITRunner.class )
-@UseModules( { TestQueueModule.class } )
-public class LegacyQueueManagerTest {
 
-    @Inject
-    protected LegacyQueueFig queueFig;
-    @Inject
-    protected LegacyQueueManagerFactory qmf;
+public class LegacyQueueManagerTest extends AbstractTest {
 
-    /**
-     * Mark tests as ignored if no AWS creds are present
-     */
-    @Rule
-    public NoAWSCredsRule awsCredsRule = new NoAWSCredsRule();
+    public static long queueSeed = System.currentTimeMillis();
 
+    // give each test its own injector
+    @Override
+    protected Injector getInjector() {
+        return Guice.createInjector( new QueueModule() );
+    }
 
-    protected LegacyQueueScope scope;
-    private LegacyQueueManager qm;
 
-    public static long queueSeed = System.currentTimeMillis();
+    @Test
+    public void send() throws Exception{
 
+        Injector myInjector = getInjector();
 
-    @Before
-    public void mockApp() {
+        CassandraClient cassandraClient = myInjector.getInstance( CassandraClientImpl.class );
+        cassandraClient.getSession();
 
-        this.scope = new LegacyQueueScopeImpl( "testQueue"+queueSeed++, LegacyQueueScope.RegionImplementation.LOCAL);
-        qm = qmf.getQueueManager(scope);
-    }
+        ActorSystemFig actorSystemFig = myInjector.getInstance( ActorSystemFig.class );
+        String region = actorSystemFig.getRegionLocal();
 
-    @org.junit.After
-    public void cleanup(){
-        qm.deleteQueue();
-    }
+        App app = myInjector.getInstance( App.class );
+        app.start( "localhost", getNextAkkaPort(), region );
 
+        final LegacyQueueScopeImpl scope =
+            new LegacyQueueScopeImpl( "testQueue" + queueSeed++, LegacyQueueScope.RegionImplementation.LOCAL );
+        LegacyQueueManagerFactory qmf = myInjector.getInstance( LegacyQueueManagerFactory.class );
+        LegacyQueueManager qm = qmf.getQueueManager(scope);
 
-    @Test
-    public void send() throws Exception{
         String value = "bodytest";
         qm.sendMessage(value);
+
+        Thread.sleep(5000);
+
         List<LegacyQueueMessage> messageList = qm.getMessages(1, String.class);
         assertTrue(messageList.size() >= 1);
         for(LegacyQueueMessage message : messageList){
-            assertTrue(message.getBody().equals(value));
+            assertEquals( value, message.getBody() );
             qm.commitMessage(message);
         }
 
@@ -96,12 +91,32 @@ public class LegacyQueueManagerTest {
 
     @Test
     public void sendMore() throws Exception{
+
+        Injector myInjector = getInjector();
+
+        CassandraClient cassandraClient = myInjector.getInstance( CassandraClientImpl.class );
+        cassandraClient.getSession();
+
+        ActorSystemFig actorSystemFig = myInjector.getInstance( ActorSystemFig.class );
+        String region = actorSystemFig.getRegionLocal();
+
+        App app = myInjector.getInstance( App.class );
+        app.start( "localhost", getNextAkkaPort(), region );
+
+        final LegacyQueueScopeImpl scope =
+            new LegacyQueueScopeImpl( "testQueue" + queueSeed++, LegacyQueueScope.RegionImplementation.LOCAL );
+        LegacyQueueManagerFactory qmf = myInjector.getInstance( LegacyQueueManagerFactory.class );
+        LegacyQueueManager qm = qmf.getQueueManager(scope);
+
         HashMap<String,String> values = new HashMap<>();
         values.put("test","Test");
 
         List<Map<String,String>> bodies = new ArrayList<>();
         bodies.add(values);
         qm.sendMessages(bodies);
+
+        Thread.sleep(5000);
+
         List<LegacyQueueMessage> messageList = qm.getMessages(1, values.getClass());
         assertTrue(messageList.size() >= 1);
         for(LegacyQueueMessage message : messageList){
@@ -115,7 +130,25 @@ public class LegacyQueueManagerTest {
     }
 
     @Test
+    @Ignore("Not implemented yet")
     public void queueSize() throws Exception{
+
+        Injector myInjector = getInjector();
+
+        CassandraClient cassandraClient = myInjector.getInstance( CassandraClientImpl.class );
+        cassandraClient.getSession();
+
+        ActorSystemFig actorSystemFig = myInjector.getInstance( ActorSystemFig.class );
+        String region = actorSystemFig.getRegionLocal();
+
+        App app = myInjector.getInstance( App.class );
+        app.start( "localhost", getNextAkkaPort(), region );
+
+        final LegacyQueueScopeImpl scope =
+            new LegacyQueueScopeImpl( "testQueue" + queueSeed++, LegacyQueueScope.RegionImplementation.LOCAL );
+        LegacyQueueManagerFactory qmf = myInjector.getInstance( LegacyQueueManagerFactory.class );
+        LegacyQueueManager qm = qmf.getQueueManager(scope);
+
         HashMap<String,String> values = new HashMap<>();
         values.put("test", "Test");
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/guice/TestQueueModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/guice/TestQueueModule.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/guice/TestQueueModule.java
index 8390672..70e3543 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/guice/TestQueueModule.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/guice/TestQueueModule.java
@@ -18,7 +18,6 @@
 package org.apache.usergrid.persistence.queue.guice;
 
 
-import org.apache.usergrid.persistence.core.guice.CommonModule;
 import org.apache.usergrid.persistence.core.guice.TestModule;
 
 
@@ -26,7 +25,6 @@ public class TestQueueModule extends TestModule {
 
     @Override
     protected void configure() {
-        install( new CommonModule());
         install( new QueueModule() );
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/resources/cassandra.yaml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/resources/cassandra.yaml b/stack/corepersistence/queue/src/test/resources/cassandra.yaml
new file mode 100644
index 0000000..e97bf00
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/resources/cassandra.yaml
@@ -0,0 +1,53 @@
+
+cluster_name: 'Embedded Test Cluster'
+
+# ports
+storage_port: 7075
+listen_address: localhost
+rpc_address: localhost
+rpc_port: 9160
+native_transport_port: 9042
+
+# data files
+data_file_directories:
+    - target/embeddedCassandra/data
+commitlog_directory: target/embeddedCassandra/commitlog
+saved_caches_directory: target/embeddedCassandra/saved_caches
+
+# native transport!
+start_native_transport: true
+
+# other stuff
+start_rpc: true
+initial_token:
+auto_bootstrap: false
+hinted_handoff_enabled: true
+authenticator: org.apache.cassandra.auth.AllowAllAuthenticator
+partitioner: org.apache.cassandra.dht.RandomPartitioner
+seed_provider:
+    - class_name: org.apache.cassandra.locator.SimpleSeedProvider
+      parameters:
+          - seeds: "127.0.0.1"
+commitlog_sync: periodic
+commitlog_sync_period_in_ms: 10000
+disk_access_mode: auto
+concurrent_reads: 2
+concurrent_writes: 4
+rpc_keepalive: true
+thrift_framed_transport_size_in_mb: 15
+thrift_max_message_length_in_mb: 16
+snapshot_before_compaction: false
+column_index_size_in_kb: 64
+endpoint_snitch: org.apache.cassandra.locator.SimpleSnitch
+dynamic_snitch: true
+dynamic_snitch_update_interval_in_ms: 100 
+dynamic_snitch_reset_interval_in_ms: 600000
+dynamic_snitch_badness_threshold: 0.0
+request_scheduler: org.apache.cassandra.scheduler.NoScheduler
+server_encryption_options:
+    internode_encryption: none
+    keystore: conf/.keystore
+    keystore_password: cassandra
+    truststore: conf/.truststore
+    truststore_password: cassandra
+index_interval: 128

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/resources/log4j.properties b/stack/corepersistence/queue/src/test/resources/log4j.properties
new file mode 100644
index 0000000..3c679f5
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/resources/log4j.properties
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+log4j.rootLogger=ERROR,stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d %p (%t) %c{1} - %m%n
+
+log4j.logger.org.apache.usergrid.persistence.actorsystem=DEBUG
+log4j.logger.org.apache.usergrid.persistence.actorsystem=DEBUG
+log4j.logger.org.apache.usergrid.persistence.qakka=DEBUG
+
+log4j.logger.org.apache.cassandra=WARN
+log4j.logger.org.glassfish=WARN

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/resources/qakka-duck.jpg
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/resources/qakka-duck.jpg b/stack/corepersistence/queue/src/test/resources/qakka-duck.jpg
new file mode 100644
index 0000000..8a0e0a2
Binary files /dev/null and b/stack/corepersistence/queue/src/test/resources/qakka-duck.jpg differ

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/resources/qakka.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/resources/qakka.properties b/stack/corepersistence/queue/src/test/resources/qakka.properties
new file mode 100644
index 0000000..c3b613c
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/resources/qakka.properties
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+# Properties for JUnit tests
+
+usergrid.cluster_name=Test Cluster
+
+usergrid.cluster.hostname=localhost
+
+# Comma-separated list of regions to be considered
+usergrid.cluster.region.list=us-east
+
+# The regions of this local instance of Usergrid
+usergrid.cluster.region.local=us-east
+
+# Comma-separated lists of cluster seeds each with format {region}:{hostname}
+usergrid.cluster.seeds=us-east:localhost
+
+# Port used for cluster communications.
+usergrid.cluster.port=2551
+
+queue.writer.num.actors=100
+
+# set shard size and times low for testing purposes
+queue.shard.max.size=500
+queue.shard.allocation.check.frequency.millis=100
+queue.shard.allocation.advance.time.millis=200
+
+queue.max.inmemory.shard.counter = 100
+
+cassandra.hosts=localhost
+
+cassandra.keyspace.application=qakka_test
+
+cassandra.keyspace-drop-and-create=true

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/pom.xml
----------------------------------------------------------------------
diff --git a/stack/pom.xml b/stack/pom.xml
index 5186a13..d739bb4 100644
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@ -100,9 +100,9 @@
         <amber-version>0.22-incubating</amber-version>
         <astyanax.version>3.9.0</astyanax.version>
         <aws.version>1.10.20</aws.version>
-        <cassandra-version>1.2.18</cassandra-version>
+        <cassandra-version>2.1.14</cassandra-version>
         <guava.version>18.0</guava.version>
-        <guice.version>4.0-beta5</guice.version>
+        <guice.version>4.0</guice.version>
         <hector-om-version>3.0-03</hector-om-version>
         <hector-version>1.1-4</hector-version>
         <hector-test-version>1.1-4</hector-test-version>
@@ -110,7 +110,7 @@
         <jackson-version>1.9.9</jackson-version>
         <jackson-2-version>2.3.3</jackson-2-version>
         <jclouds.version>1.9.0</jclouds.version>
-        <jersey-version>2.21</jersey-version>
+        <jersey-version>2.23.1</jersey-version>
         <junit-version>4.12</junit-version>
         <log4j-version>1.2.16</log4j-version>
         <org.springframework.version>3.2.13.RELEASE</org.springframework.version>

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java
index 3d4911d..8356f16 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java
@@ -16,24 +16,26 @@
  */
 package org.apache.usergrid.services.notifications.apns;
 
-import com.relayrides.pushy.apns.*;
-import com.relayrides.pushy.apns.util.*;
-
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.relayrides.pushy.apns.PushManager;
+import com.relayrides.pushy.apns.PushManagerConfiguration;
+import com.relayrides.pushy.apns.util.SimpleApnsPushNotification;
+import org.apache.usergrid.persistence.EntityManager;
 import org.apache.usergrid.persistence.entities.Notification;
 import org.apache.usergrid.persistence.entities.Notifier;
-import org.mortbay.util.ajax.JSON;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-import java.util.concurrent.*;
-
-import org.apache.usergrid.persistence.EntityManager;
 import org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException;
 import org.apache.usergrid.services.ServicePayload;
 import org.apache.usergrid.services.notifications.ConnectionException;
 import org.apache.usergrid.services.notifications.ProviderAdapter;
 import org.apache.usergrid.services.notifications.TaskTracker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.*;
 
 /**
  * Adapter for Apple push notifications
@@ -47,6 +49,8 @@ public class APNsAdapter implements ProviderAdapter {
     private static final String TEST_TOKEN = "ff026b5a4d2761ef13843e8bcab9fc83b47f1dfbd1d977d225ab296153ce06d6";
     private static final String TEST_PAYLOAD = "{}";
 
+    private static ObjectMapper objectMapper = new ObjectMapper();
+
     static {
         validEnvironments.add("development");
         validEnvironments.add("production");
@@ -155,7 +159,7 @@ public class APNsAdapter implements ProviderAdapter {
                 payload = "{\"aps\":{\"alert\":\"" + payload + "\"}}";
             }
         } else {
-            payload = JSON.toString(objPayload);
+            payload = objectMapper.writeValueAsString( objPayload );
         }
         if (payload.length() > 2048) {
             throw new IllegalArgumentException(

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
index 7929ad4..6b619b7 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
@@ -16,11 +16,12 @@
  */
 package org.apache.usergrid.services.notifications.gcm;
 
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.android.gcm.server.*;
 import org.apache.usergrid.persistence.entities.Notification;
 import org.apache.usergrid.persistence.entities.Notifier;
 import org.apache.usergrid.services.notifications.InactiveDeviceManager;
-import org.mortbay.util.ajax.JSON;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,6 +45,8 @@ public class GCMAdapter implements ProviderAdapter {
     private final Notifier notifier;
     private EntityManager entityManager;
 
+    private static ObjectMapper objectMapper = new ObjectMapper();
+
     private ConcurrentHashMap<Long,Batch> batches;
 
     private static final String ttlKey = "time_to_live";
@@ -147,9 +150,9 @@ public class GCMAdapter implements ProviderAdapter {
             throw new IllegalArgumentException(
                     "GCM Payload must be either a Map or a String");
         }
-        if (JSON.toString(mapPayload).length() > 4096) {
-            throw new IllegalArgumentException(
-                    "GCM payloads must be 4096 characters or less");
+        String payloadString = objectMapper.writeValueAsString( mapPayload );
+        if ( payloadString.length() > 4096) {
+            throw new IllegalArgumentException( "GCM payloads must be 4096 characters or less");
         }
         return mapPayload;
     }