You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/09/16 22:34:38 UTC

[09/25] usergrid git commit: Initial integration of Qakka into Usergrid Queue module, and implementation of Qakka-based LegacyQueueManager implementation.

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
new file mode 100644
index 0000000..9158412
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.qakka.serialization.sharding.impl;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
+import org.apache.usergrid.persistence.core.datastax.TableDefinition;
+import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException;
+import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardCounterSerialization;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.MessageFormat;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+@Singleton
+public class ShardCounterSerializationImpl implements ShardCounterSerialization {
+    private static final Logger logger = LoggerFactory.getLogger( ShardCounterSerializationImpl.class );
+
+    private final CassandraClient cassandraClient;
+
+    final static String TABLE_COUNTERS       = "shard_counters";
+    final static String COLUMN_QUEUE_NAME    = "queue_name";
+    final static String COLUMN_SHARD_ID      = "shard_id";
+    final static String COLUMN_COUNTER_VALUE = "counter_value";
+    final static String COLUMN_SHARD_TYPE    = "shard_type";
+
+    static final String CQL =
+        "CREATE TABLE IF NOT EXISTS shard_counters ( " +
+            "counter_value counter, " +
+            "queue_name    varchar, " +
+            "shard_type    varchar, " +
+            "shard_id      bigint, " +
+            "PRIMARY KEY (queue_name, shard_type, shard_id) " +
+    ");  ";
+
+    final long maxInMemoryIncrement;
+
+    class InMemoryCount {
+        long baseCount;
+        final AtomicLong increment = new AtomicLong( 0L );
+        InMemoryCount( long baseCount ) {
+            this.baseCount = baseCount;
+        }
+        public long value() {
+            return baseCount + increment.get();
+        }
+        public AtomicLong getIncrement() {
+            return increment;
+        }
+        void setBaseCount( long baseCount ) {
+            this.baseCount = baseCount;
+        }
+    }
+
+    private Map<String, InMemoryCount> inMemoryCounters = new ConcurrentHashMap<>(200);
+
+
+    @Inject
+    public ShardCounterSerializationImpl(QakkaFig qakkaFig, CassandraClient cassandraClient ) {
+        this.maxInMemoryIncrement = qakkaFig.getMaxInMemoryShardCounter();
+        this.cassandraClient = cassandraClient;
+    }
+
+
+    @Override
+    public void incrementCounter(String queueName, Shard.Type type, long shardId, long increment ) {
+
+        String key = queueName + type + shardId;
+        synchronized ( inMemoryCounters ) {
+
+            if ( inMemoryCounters.get( key ) == null ) {
+
+                Long value = retrieveCounterFromStorage( queueName, type, shardId );
+
+                if ( value == null ) {
+                    incrementCounterInStorage( queueName, type, shardId, 0L );
+                    inMemoryCounters.put( key, new InMemoryCount( 0L ));
+                } else {
+                    inMemoryCounters.put( key, new InMemoryCount( value ));
+                }
+                inMemoryCounters.get( key ).getIncrement().addAndGet( increment );
+                return;
+            }
+        }
+
+        InMemoryCount inMemoryCount = inMemoryCounters.get( key );
+
+        synchronized ( inMemoryCount ) {
+            long totalIncrement = inMemoryCount.getIncrement().addAndGet( increment );
+
+            if (totalIncrement > maxInMemoryIncrement) {
+                incrementCounterInStorage( queueName, type, shardId, totalIncrement );
+                inMemoryCount.setBaseCount( retrieveCounterFromStorage( queueName, type, shardId ) );
+                inMemoryCount.getIncrement().set( 0L );
+            }
+        }
+
+    }
+
+
+    @Override
+    public long getCounterValue( String queueName, Shard.Type type, long shardId ) {
+
+        String key = queueName + type + shardId;
+
+        synchronized ( inMemoryCounters ) {
+
+            if ( inMemoryCounters.get( key ) == null ) {
+
+                Long value = retrieveCounterFromStorage( queueName, type, shardId );
+
+                if ( value == null ) {
+                    throw new NotFoundException(
+                            MessageFormat.format( "No counter found for queue {0} type {1} shardId {2}",
+                                    queueName, type, shardId ));
+                } else {
+                    inMemoryCounters.put( key, new InMemoryCount( value ));
+                }
+            }
+        }
+
+        return inMemoryCounters.get( key ).value();
+    }
+
+    void incrementCounterInStorage( String queueName, Shard.Type type, long shardId, long increment ) {
+
+        Statement update = QueryBuilder.update( TABLE_COUNTERS )
+                .where( QueryBuilder.eq(   COLUMN_QUEUE_NAME, queueName ) )
+                .and(   QueryBuilder.eq(   COLUMN_SHARD_TYPE, type.toString() ) )
+                .and(   QueryBuilder.eq(   COLUMN_SHARD_ID, shardId ) )
+                .with(  QueryBuilder.incr( COLUMN_COUNTER_VALUE, increment ) );
+        cassandraClient.getSession().execute( update );
+    }
+
+
+    Long retrieveCounterFromStorage( String queueName, Shard.Type type, long shardId ) {
+
+        Statement query = QueryBuilder.select().from( TABLE_COUNTERS )
+                .where( QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName ) )
+                .and( QueryBuilder.eq( COLUMN_SHARD_TYPE, type.toString()) )
+                .and( QueryBuilder.eq( COLUMN_SHARD_ID, shardId ) );
+
+        ResultSet resultSet = cassandraClient.getSession().execute( query );
+        List<Row> all = resultSet.all();
+
+        if ( all.size() > 1 ) {
+            throw new QakkaRuntimeException(
+                    "Multiple rows for counter " + queueName + " type " + type + " shardId " + shardId );
+        }
+        if ( all.isEmpty() ) {
+            return null;
+        }
+        return all.get(0).getLong( COLUMN_COUNTER_VALUE );
+    }
+
+    @Override
+    public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() {
+        return Collections.EMPTY_LIST;
+    }
+
+    @Override
+    public Collection<TableDefinition> getTables() {
+        return Collections.singletonList( new TableDefinitionStringImpl( "shard_counters", CQL ) );
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java
new file mode 100644
index 0000000..7b9fd8e
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.sharding.impl;
+
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.querybuilder.Assignment;
+import com.datastax.driver.core.querybuilder.Clause;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.google.common.collect.Lists;
+import com.google.inject.Inject;
+import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
+import org.apache.usergrid.persistence.core.datastax.TableDefinition;
+import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardSerialization;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.UUID;
+
+
+public class ShardSerializationImpl implements ShardSerialization {
+
+    private static final Logger logger = LoggerFactory.getLogger( ShardSerializationImpl.class );
+
+    private final CassandraClient cassandraClient;
+
+    public final static String COLUMN_QUEUE_NAME = "queue_name";
+    public final static String COLUMN_REGION = "region";
+    public final static String COLUMN_SHARD_ID = "shard_id";
+    public final static String COLUMN_ACTIVE = "active";
+    public final static String COLUMN_POINTER = "pointer";
+
+
+    public final static String TABLE_SHARDS_MESSAGES_AVAILABLE = "shards_messages_available";
+
+    public final static String TABLE_SHARDS_MESSAGES_INFLIGHT = "shards_messages_inflight";
+
+
+    static final String SHARDS_MESSAGES_AVAILABLE =
+            "CREATE TABLE IF NOT EXISTS shards_messages_available ( " +
+                    "queue_name text, " +
+                    "region     text, " +
+                    "shard_id   bigint, " +
+                    "active     int, " +
+                    "pointer    timeuuid, " +
+                    "PRIMARY KEY ((queue_name, region), active, shard_id) " +
+                    ") WITH CLUSTERING ORDER BY (active DESC, shard_id ASC); ";
+
+    static final String SHARDS_MESSAGES_AVAILABLE_INFLIGHT =
+            "CREATE TABLE IF NOT EXISTS shards_messages_inflight ( " +
+                    "queue_name text, " +
+                    "region     text, " +
+                    "shard_id   bigint, " +
+                    "active     int, " +
+                    "pointer    timeuuid, " +
+                    "PRIMARY KEY ((queue_name, region), active, shard_id) " +
+                    ") WITH CLUSTERING ORDER BY (active DESC, shard_id ASC); ";
+
+
+    @Inject
+    public ShardSerializationImpl( CassandraClient cassandraClient ) {
+        this.cassandraClient = cassandraClient;
+    }
+
+    public void createShard(final Shard shard){
+
+        Statement insert = QueryBuilder.insertInto(getTableName(shard.getType()))
+                .value(COLUMN_QUEUE_NAME, shard.getQueueName())
+                .value(COLUMN_REGION, shard.getRegion())
+                .value(COLUMN_SHARD_ID, shard.getShardId())
+                .value(COLUMN_ACTIVE, 1)
+                .value(COLUMN_POINTER, shard.getPointer());
+
+        cassandraClient.getSession().execute(insert);
+
+    }
+
+    public Shard loadShard(final Shard shard){
+
+        Clause queueNameClause = QueryBuilder.eq(COLUMN_QUEUE_NAME, shard.getQueueName());
+        Clause regionClause = QueryBuilder.eq(COLUMN_REGION, shard.getRegion());
+        Clause activeClause = QueryBuilder.eq(COLUMN_ACTIVE, 1);
+        Clause shardIdClause = QueryBuilder.eq(COLUMN_SHARD_ID, shard.getShardId());
+
+
+
+        Statement select = QueryBuilder.select().from(getTableName(shard.getType()))
+                .where(queueNameClause)
+                .and(regionClause)
+                .and(activeClause)
+                .and(shardIdClause);
+
+        Row row = cassandraClient.getSession().execute(select).one();
+
+        if (row == null){
+            return null;
+        }
+
+        final String queueName = row.getString(COLUMN_QUEUE_NAME);
+        final String region = row.getString(COLUMN_REGION);
+        final long shardId = row.getLong(COLUMN_SHARD_ID);
+        final UUID pointer = row.getUUID(COLUMN_POINTER);
+
+        return new Shard(queueName, region, shard.getType(), shardId, pointer);
+
+
+
+    }
+
+
+    public void deleteShard(final Shard shard){
+
+        Clause queueNameClause = QueryBuilder.eq(COLUMN_QUEUE_NAME, shard.getQueueName());
+        Clause regionClause = QueryBuilder.eq(COLUMN_REGION, shard.getRegion());
+        Clause activeClause = QueryBuilder.eq(COLUMN_ACTIVE, 1);
+        Clause shardIdClause = QueryBuilder.eq(COLUMN_SHARD_ID, shard.getShardId());
+
+
+
+        Statement delete = QueryBuilder.delete().from(getTableName(shard.getType()))
+                .where(queueNameClause)
+                .and(regionClause)
+                .and(activeClause)
+                .and(shardIdClause);
+
+        cassandraClient.getSession().execute(delete);
+
+    }
+
+    public void updateShardPointer(final Shard shard){
+
+        Assignment assignment = QueryBuilder.set(COLUMN_POINTER, shard.getPointer());
+
+        Clause queueNameClause = QueryBuilder.eq(COLUMN_QUEUE_NAME, shard.getQueueName());
+        Clause regionClause = QueryBuilder.eq(COLUMN_REGION, shard.getRegion());
+        Clause activeClause = QueryBuilder.eq(COLUMN_ACTIVE, 1);
+        Clause shardIdClause = QueryBuilder.eq(COLUMN_SHARD_ID, shard.getShardId());
+
+        Statement update = QueryBuilder.update(getTableName(shard.getType()))
+                .with(assignment)
+                .where(queueNameClause)
+                .and(regionClause)
+                .and(activeClause)
+                .and(shardIdClause);
+
+        cassandraClient.getSession().execute(update);
+
+    }
+
+    public static String getTableName(Shard.Type shardType){
+
+        String table;
+        if( shardType.equals(Shard.Type.DEFAULT)) {
+            table = TABLE_SHARDS_MESSAGES_AVAILABLE;
+        }else if (shardType.equals(Shard.Type.INFLIGHT)) {
+            table = TABLE_SHARDS_MESSAGES_INFLIGHT;
+        }else{
+            throw new IllegalArgumentException("Unknown ShardType");
+        }
+
+        return table;
+
+    }
+
+    @Override
+    public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() {
+        return Collections.EMPTY_LIST;
+    }
+
+    @Override
+    public Collection<TableDefinition> getTables() {
+        return Lists.newArrayList(
+                new TableDefinitionStringImpl( TABLE_SHARDS_MESSAGES_AVAILABLE, SHARDS_MESSAGES_AVAILABLE ),
+                new TableDefinitionStringImpl( TABLE_SHARDS_MESSAGES_INFLIGHT, SHARDS_MESSAGES_AVAILABLE_INFLIGHT )
+        );
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardStrategyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardStrategyImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardStrategyImpl.java
new file mode 100644
index 0000000..cfd9a60
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardStrategyImpl.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.sharding.impl;
+
+import com.google.inject.Inject;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardStrategy;
+
+import java.text.MessageFormat;
+import java.util.Optional;
+import java.util.UUID;
+
+
+public class ShardStrategyImpl implements ShardStrategy {
+
+    final CassandraClient cassandraClient;
+
+    @Inject
+    public ShardStrategyImpl(CassandraClient cassandraClient) {
+        this.cassandraClient = cassandraClient;
+    }
+
+    @Override
+    public Shard selectShard(String queueName, String region, Shard.Type shardType, UUID pointer) {
+
+        // use shard iterator to walk through shards until shard can be found
+
+        ShardIterator shardIterator = new ShardIterator(
+                cassandraClient, queueName, region, shardType, Optional.empty() );
+
+        if ( !shardIterator.hasNext() ) {
+            String msg = MessageFormat.format(
+                    "No shards found for queue {0} region {1} type {2}", queueName, region, shardType );
+            throw new NotFoundException( msg );
+        }
+
+        // walk through shards from oldest to newest
+
+        Shard prev = shardIterator.next();
+        while ( shardIterator.hasNext() ) {
+            Shard next = shardIterator.next();
+
+            // if item is older than the next shard, the use prev shard
+            if ( pointer.timestamp() < next.getPointer().timestamp() ) {
+                return prev;
+            }
+            prev = next;
+        }
+        return prev;
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLog.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLog.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLog.java
new file mode 100644
index 0000000..048096d
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLog.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.transferlog;
+
+import java.util.UUID;
+
+
+public class TransferLog {
+    String queueName;
+    String sourceRegion;
+    String destRegion;
+    UUID messageId;
+    long transfer_time;
+
+    public TransferLog(
+            String queueName,
+            String sourceRegion,
+            String destRegion,
+            UUID messageId,
+            long transfer_time) {
+        this.queueName = queueName;
+        this.sourceRegion = sourceRegion;
+        this.destRegion = destRegion;
+        this.messageId = messageId;
+        this.transfer_time = transfer_time;
+    }
+
+    public String getQueueName() {
+        return queueName;
+    }
+
+    public void setQueueName(String queueName) {
+        this.queueName = queueName;
+    }
+
+    public String getSourceRegion() {
+        return sourceRegion;
+    }
+
+    public void setSourceRegion(String sourceRegion) {
+        this.sourceRegion = sourceRegion;
+    }
+
+    public String getDestRegion() {
+        return destRegion;
+    }
+
+    public void setDestRegion(String destRegion) {
+        this.destRegion = destRegion;
+    }
+
+    public UUID getMessageId() {
+        return messageId;
+    }
+
+    public void setMessageId(UUID messageId) {
+        this.messageId = messageId;
+    }
+
+    public long getTransfer_time() {
+        return transfer_time;
+    }
+
+    public void setTransfer_time(long transfer_time) {
+        this.transfer_time = transfer_time;
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLogSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLogSerialization.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLogSerialization.java
new file mode 100644
index 0000000..ea155d7
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLogSerialization.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.transferlog;
+
+import com.datastax.driver.core.PagingState;
+import org.apache.usergrid.persistence.core.migration.schema.Migration;
+import org.apache.usergrid.persistence.qakka.exceptions.QakkaException;
+import org.apache.usergrid.persistence.qakka.serialization.Result;
+
+import java.util.UUID;
+
+
+public interface TransferLogSerialization extends Migration {
+
+    /**
+     * Record transfer log record.
+     *
+     * @param queueName Name of queue.
+     * @param source Source region.
+     * @param dest Destination region.
+     * @param messageId UUID of message in message_data table.
+     */
+    void recordTransferLog(
+        String queueName, String source, String dest, UUID messageId);
+
+    /**
+     * Remove transfer log record.
+     *
+     * @param queueName Name of queue.
+     * @param source Source region.
+     * @param dest Destination region.
+     * @param messageId UUID of message in message_data table.
+     * @throws QakkaException If transfer log message was not found or could not be removed.
+     */
+    void removeTransferLog(
+        String queueName, String source, String dest, UUID messageId) throws QakkaException;
+
+    /**
+     * Get all transfer logs (for testing purposes)
+     *
+     * @param pagingState Paging state (or null if none)
+     * @param fetchSize Number of rows to be fetched per page (or -1 for default)
+     */
+    Result<TransferLog> getAllTransferLogs(PagingState pagingState, int fetchSize);
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/impl/TransferLogSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/impl/TransferLogSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/impl/TransferLogSerializationImpl.java
new file mode 100644
index 0000000..f9fb0dc
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/impl/TransferLogSerializationImpl.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.transferlog.impl;
+
+import com.datastax.driver.core.PagingState;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.google.inject.Inject;
+import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
+import org.apache.usergrid.persistence.core.datastax.TableDefinition;
+import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.exceptions.QakkaException;
+import org.apache.usergrid.persistence.qakka.serialization.Result;
+import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLog;
+import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLogSerialization;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+
+public class TransferLogSerializationImpl implements TransferLogSerialization {
+
+    private static final Logger logger = LoggerFactory.getLogger( TransferLogSerializationImpl.class );
+
+    private final CassandraClient cassandraClient;
+
+    public final static String TABLE_TRANSFER_LOG   = "transfer_log";
+
+    public final static String COLUMN_QUEUE_NAME    = "queue_name";
+    public final static String COLUMN_SOURCE_REGION = "source_region";
+    public final static String COLUMN_DEST_REGION   = "dest_region";
+    public final static String COLUMN_MESSAGE_ID    = "message_id";
+    public final static String COLUMN_TRANSFER_TIME = "transfer_time";
+
+    static final String CQL =
+        "CREATE TABLE IF NOT EXISTS transfer_log ( " +
+            "queue_name    text, " +
+            "source_region text, " +
+            "dest_region   text, " +
+            "message_id    timeuuid, " +
+            "transfer_time bigint, " +
+            "PRIMARY KEY ((queue_name, dest_region, message_id)) " +
+            ");  ";
+
+
+    @Inject
+    public TransferLogSerializationImpl( CassandraClient cassandraClient ) {
+        this.cassandraClient = cassandraClient;
+    }
+
+
+    @Override
+    public void recordTransferLog(
+            String queueName, String source, String dest, UUID messageId) {
+
+        Statement insert = QueryBuilder.insertInto(TABLE_TRANSFER_LOG)
+                .value(COLUMN_QUEUE_NAME, queueName )
+                .value(COLUMN_SOURCE_REGION, source )
+                .value(COLUMN_DEST_REGION, dest )
+                .value(COLUMN_MESSAGE_ID, messageId )
+                .value(COLUMN_TRANSFER_TIME, System.currentTimeMillis() );
+        cassandraClient.getSession().execute(insert);
+    }
+
+
+    @Override
+    public void removeTransferLog(
+            String queueName, String source, String dest, UUID messageId ) throws QakkaException {
+
+        Statement query = QueryBuilder.select().all().from(TABLE_TRANSFER_LOG)
+            .where(   QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName ))
+                .and( QueryBuilder.eq( COLUMN_DEST_REGION, dest ))
+                .and( QueryBuilder.eq( COLUMN_MESSAGE_ID, messageId ));
+        ResultSet rs = cassandraClient.getSession().execute( query );
+
+        if ( rs.getAvailableWithoutFetching() == 0 ) {
+            StringBuilder sb = new StringBuilder();
+            sb.append( "Transfer log entry not found for queueName=" ).append( queueName );
+            sb.append( " source=" ).append( source );
+            sb.append( " dest=" ).append( dest );
+            sb.append( " messageId=" ).append( messageId );
+            throw new QakkaException( sb.toString() );
+        }
+
+        Statement deleteQuery = QueryBuilder.delete().from(TABLE_TRANSFER_LOG)
+                .where(   QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName ))
+                    .and( QueryBuilder.eq( COLUMN_DEST_REGION, dest ))
+                .and( QueryBuilder.eq( COLUMN_MESSAGE_ID, messageId ));
+        cassandraClient.getSession().execute( deleteQuery );
+    }
+
+
+    @Override
+    public Result<TransferLog> getAllTransferLogs(PagingState pagingState, int fetchSize ) {
+
+        Statement query = QueryBuilder.select().all().from(TABLE_TRANSFER_LOG);
+
+        query.setFetchSize( fetchSize );
+        if ( pagingState != null ) {
+            query.setPagingState( pagingState );
+        }
+
+        ResultSet rs = cassandraClient.getSession().execute( query );
+        final PagingState newPagingState = rs.getExecutionInfo().getPagingState();
+
+        final List<TransferLog> transferLogs = new ArrayList<>();
+        int numReturned = rs.getAvailableWithoutFetching();
+        for ( int i=0; i<numReturned; i++ ) {
+            Row row = rs.one();
+            TransferLog tlog = new TransferLog(
+                    row.getString( COLUMN_QUEUE_NAME ),
+                    row.getString( COLUMN_SOURCE_REGION ),
+                    row.getString( COLUMN_DEST_REGION ),
+                    row.getUUID( COLUMN_MESSAGE_ID ),
+                    row.getLong( COLUMN_TRANSFER_TIME ));
+            transferLogs.add( tlog );
+        }
+
+        return new Result<TransferLog>() {
+
+            @Override
+            public PagingState getPagingState() {
+                return newPagingState;
+            }
+
+            @Override
+            public List<TransferLog> getEntities() {
+                return transferLogs;
+            }
+        };
+    }
+
+    @Override
+    public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() {
+        return Collections.EMPTY_LIST;
+    }
+
+    @Override
+    public Collection<TableDefinition> getTables() {
+        return Collections.singletonList( new TableDefinitionStringImpl( TABLE_TRANSFER_LOG, CQL ) );
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
index 6d62da0..7bd0fa7 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
@@ -18,18 +18,18 @@
 package org.apache.usergrid.persistence.queue.guice;
 
 
+import com.google.inject.AbstractModule;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+import org.apache.usergrid.persistence.qakka.QakkaModule;
+import org.apache.usergrid.persistence.queue.LegacyQueueFig;
+import org.apache.usergrid.persistence.queue.LegacyQueueManager;
+import org.apache.usergrid.persistence.queue.LegacyQueueManagerFactory;
 import org.apache.usergrid.persistence.queue.LegacyQueueManagerInternalFactory;
+import org.apache.usergrid.persistence.queue.impl.QakkaQueueManager;
 import org.apache.usergrid.persistence.queue.impl.QueueManagerFactoryImpl;
 import org.apache.usergrid.persistence.queue.impl.SNSQueueManagerImpl;
 import org.safehaus.guicyfig.GuicyFigModule;
 
-import org.apache.usergrid.persistence.queue.LegacyQueueFig;
-import org.apache.usergrid.persistence.queue.LegacyQueueManager;
-import org.apache.usergrid.persistence.queue.LegacyQueueManagerFactory;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.assistedinject.FactoryModuleBuilder;
-
 
 /**
  * Simple module for wiring our collection api
@@ -44,11 +44,11 @@ public class QueueModule extends AbstractModule {
 
         install(new GuicyFigModule(LegacyQueueFig.class));
 
+        install( new QakkaModule() );
+
         bind(LegacyQueueManagerFactory.class).to(QueueManagerFactoryImpl.class);
-        install(new FactoryModuleBuilder().implement(LegacyQueueManager.class, SNSQueueManagerImpl.class)
+        install( new FactoryModuleBuilder().implement(LegacyQueueManager.class, QakkaQueueManager.class)
             .build(LegacyQueueManagerInternalFactory.class));
 
     }
-
-
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
new file mode 100644
index 0000000..c407a78
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.queue.impl;
+
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.ProtocolVersion;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.core.*;
+import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
+import org.apache.usergrid.persistence.queue.LegacyQueueFig;
+import org.apache.usergrid.persistence.queue.LegacyQueueManager;
+import org.apache.usergrid.persistence.queue.LegacyQueueMessage;
+import org.apache.usergrid.persistence.queue.LegacyQueueScope;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+
+public class QakkaQueueManager implements LegacyQueueManager {
+    private static final Logger logger = LoggerFactory.getLogger( QakkaQueueManager.class );
+
+    private final LegacyQueueScope    scope;
+    private final LegacyQueueFig      fig;
+    private final QueueManager        queueManager;
+    private final QueueMessageManager queueMessageManager;
+    private final QakkaFig            qakkaFig;
+    private final Regions             regions;
+
+
+    @Inject
+    public QakkaQueueManager(
+        @Assisted LegacyQueueScope scope,
+        LegacyQueueFig      fig,
+        QueueManager        queueManager,
+        QueueMessageManager queueMessageManager,
+        QakkaFig            qakkaFig,
+        Regions             regions
+    ) {
+
+        this.scope = scope;
+        this.fig = fig;
+        this.queueManager = queueManager;
+        this.qakkaFig = qakkaFig;
+        this.queueMessageManager = queueMessageManager;
+        this.regions = regions;
+
+        if ( queueManager.getQueueConfig(scope.getName()) == null ) {
+
+            // TODO: read defaults from config
+            //queueManager.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ));
+
+            Queue queue = new Queue( scope.getName() );
+            queueManager.createQueue( queue );
+        }
+    }
+
+
+    @Override
+    public <T extends Serializable> void sendMessage(T body) throws IOException {
+
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        ObjectOutputStream oos = new ObjectOutputStream(bos);
+        oos.writeObject(body);
+        oos.flush();
+        oos.close();
+        ByteBuffer byteBuffer = ByteBuffer.wrap( bos.toByteArray() );
+
+        queueMessageManager.sendMessages(
+            scope.getName(),
+            regions.getRegions( scope.getRegionImplementation().name() ),
+            null, // delay millis
+            null, // expiration seconds
+            "application/octet-stream",
+            DataType.serializeValue( byteBuffer, ProtocolVersion.NEWEST_SUPPORTED ));
+    }
+
+
+    @Override
+    public <T extends Serializable> void sendMessageToTopic(T body) throws IOException {
+        sendMessage( body );
+    }
+
+
+    @Override
+    public List<LegacyQueueMessage> getMessages(int limit, Class klass) {
+
+        List<LegacyQueueMessage> messages = new ArrayList<>();
+        List<QueueMessage> qakkaMessages = queueMessageManager.getNextMessages( scope.getName(), limit );
+
+        for ( QueueMessage qakkaMessage : qakkaMessages ) {
+
+            Object body;
+            try {
+                ByteBuffer messageData = queueMessageManager.getMessageData( qakkaMessage.getMessageId() );
+                ByteBuffer bb = (ByteBuffer)DataType.blob().deserialize(
+                    messageData, ProtocolVersion.NEWEST_SUPPORTED );
+
+                ByteArrayInputStream bais = new ByteArrayInputStream( bb.array() );
+                ObjectInputStream ios = new ObjectInputStream( bais );
+                body = ios.readObject();
+
+            } catch (Throwable t) {
+                throw new QakkaRuntimeException( "Error de-serializing object", t );
+            }
+
+            LegacyQueueMessage legacyQueueMessage = new LegacyQueueMessage(
+                qakkaMessage.getQueueMessageId().toString(),
+                null,   // handle
+                body,
+                null);  // type
+
+            messages.add( legacyQueueMessage );
+        }
+
+        return messages;
+    }
+
+
+    @Override
+    public long getQueueDepth() {
+        return 0;
+    }
+
+
+    @Override
+    public void commitMessage(LegacyQueueMessage queueMessage) {
+
+        UUID queueMessageId  = UUID.fromString( queueMessage.getMessageId() );
+        queueMessageManager.ackMessage( scope.getName(), queueMessageId );
+    }
+
+
+    @Override
+    public void commitMessages(List<LegacyQueueMessage> queueMessages) {
+
+        for ( LegacyQueueMessage message : queueMessages ) {
+            commitMessage( message );
+        }
+    }
+
+
+    @Override
+    public void sendMessages( List bodies ) throws IOException {
+
+        for ( Object body : bodies ) {
+            sendMessage( (Serializable)body );
+        }
+
+    }
+
+
+    @Override
+    public void deleteQueue() {
+        queueManager.deleteQueue( scope.getName() );
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
index 2d51903..c1bdc72 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
@@ -67,7 +67,9 @@ public class QueueManagerFactoryImpl implements LegacyQueueManagerFactory {
             });
 
     @Inject
-    public QueueManagerFactoryImpl(final LegacyQueueFig queueFig, final LegacyQueueManagerInternalFactory queuemanagerInternalFactory){
+    public QueueManagerFactoryImpl(
+        final LegacyQueueFig queueFig, final LegacyQueueManagerInternalFactory queuemanagerInternalFactory) {
+
         this.queueFig = queueFig;
         this.queuemanagerInternalFactory = queuemanagerInternalFactory;
         this.defaultManager = new HashMap<>(10);

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/webapp/WEB-INF/web.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/webapp/WEB-INF/web.xml b/stack/corepersistence/queue/src/main/webapp/WEB-INF/web.xml
new file mode 100644
index 0000000..cc94f07
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/webapp/WEB-INF/web.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<web-app xmlns="http://java.sun.com/xml/ns/javaee"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
+         version="3.0">
+
+    <display-name>qakka</display-name>
+
+    <listener>
+        <listener-class>org.apache.usergrid.persistence.qakka.api.impl.StartupListener</listener-class>
+    </listener>
+ 
+    <filter>
+        <filter-name>qakka</filter-name>
+        <filter-class>org.glassfish.jersey.servlet.ServletContainer</filter-class>
+
+        <init-param>
+            <param-name>javax.ws.rs.Application</param-name>
+            <param-value>org.apache.usergrid.persistence.qakka.api.impl.JerseyResourceConfig</param-value>
+        </init-param>
+
+        <init-param>
+            <param-name>jersey.config.server.tracing.type</param-name>
+            <!-- allowed values are OFF, ON_DEMAND, ALL -->
+            <param-value>OFF</param-value>
+        </init-param>
+
+    </filter>
+
+    <filter-mapping>
+        <filter-name>qakka</filter-name>
+        <url-pattern>/api/*</url-pattern>
+    </filter-mapping>
+    
+</web-app>

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/AbstractTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/AbstractTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/AbstractTest.java
new file mode 100644
index 0000000..8f5284c
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/AbstractTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import org.apache.usergrid.persistence.core.migration.schema.MigrationException;
+import org.apache.usergrid.persistence.core.migration.schema.MigrationManager;
+import org.apache.usergrid.persistence.queue.guice.QueueModule;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+public class AbstractTest {
+    private static final Logger logger = LoggerFactory.getLogger( AbstractTest.class );
+
+    static AtomicInteger nextPort = new AtomicInteger(3551);
+
+    protected static Injector sharedInjector;
+
+    static { new KeyspaceDropper(); }
+
+
+    public AbstractTest() {
+        if ( getInjector() == null ) {
+            setInjector( Guice.createInjector( new QueueModule() ) );
+            MigrationManager migrationManager = getInjector().getInstance( MigrationManager.class );
+            try {
+                migrationManager.migrate();
+            } catch (MigrationException e) {
+                logger.error("Error in migration", e);
+            }
+        }
+    }
+
+    protected Injector getInjector() {
+        return sharedInjector;
+    }
+
+    protected static void setInjector(Injector injector) {
+        AbstractTest.sharedInjector = injector;
+    }
+
+
+    protected int getNextAkkaPort() {
+        int ret = nextPort.getAndIncrement();
+        logger.info("Returning port {} for this {}", ret, this.hashCode());
+        return ret;
+    }
+
+
+    @BeforeClass
+    public static void startCassandra() throws Exception {
+        //EmbeddedCassandraServerHelper.startEmbeddedCassandra("/cassandra.yaml");
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/KeyspaceDropper.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/KeyspaceDropper.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/KeyspaceDropper.java
new file mode 100644
index 0000000..aa4dfd1
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/KeyspaceDropper.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.qakka;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.Session;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Properties;
+
+
+/**
+ * Created by Dave Johnson (snoopdave@apache.org) on 9/9/16.
+ */
+public class KeyspaceDropper {
+    
+    private static final Logger logger = LoggerFactory.getLogger( AbstractTest.class );
+    
+    static { dropTestKeyspace(); }
+
+    
+    public static void dropTestKeyspace() {
+
+        String propsFileName = "qakka.properties";
+
+        Properties props = new Properties();
+        try {
+            props.load( App.class.getResourceAsStream( "/" + propsFileName ) );
+        } catch (IOException e) {
+            throw new RuntimeException( "Unable to load " + propsFileName + " file!" );
+        }
+
+        String keyspace =     (String)props.get("cassandra.keyspace.application");
+        String hosts[] =              props.getProperty( "cassandra.hosts", "127.0.0.1" ).split(",");
+        int port = Integer.parseInt(  props.getProperty( "cassandra.port", "9042" ));
+
+        Cluster.Builder builder = Cluster.builder();
+        for ( String host : hosts ) {
+            builder = builder.addContactPoint( host ).withPort( port );
+        }
+
+        final QueryOptions queryOptions = new QueryOptions().setConsistencyLevel( ConsistencyLevel.LOCAL_QUORUM );
+        builder.withQueryOptions( queryOptions );
+        Cluster cluster = builder.build();
+
+        Session session = cluster.connect();
+        logger.info("Dropping test keyspace: {}", keyspace);
+        session.execute( "DROP KEYSPACE IF EXISTS " + keyspace );
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/AbstractRestTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/AbstractRestTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/AbstractRestTest.java
new file mode 100644
index 0000000..a7d6215
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/AbstractRestTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.api;
+
+
+import org.apache.usergrid.persistence.qakka.KeyspaceDropper;
+import org.apache.usergrid.persistence.qakka.api.impl.StartupListener;
+import org.apache.usergrid.persistence.qakka.api.impl.JerseyResourceConfig;
+import org.glassfish.jersey.test.DeploymentContext;
+import org.glassfish.jersey.test.JerseyTest;
+import org.glassfish.jersey.test.ServletDeploymentContext;
+import org.junit.BeforeClass;
+
+import javax.ws.rs.core.Application;
+
+
+abstract public class AbstractRestTest extends JerseyTest {
+
+    static Application app;
+
+    static DeploymentContext context = null;
+
+    static { new KeyspaceDropper(); }
+    
+
+    @BeforeClass
+    public static void startCassandra() throws Exception {
+        //EmbeddedCassandraServerHelper.startEmbeddedCassandra("/cassandra.yaml");
+    }
+    
+    @Override
+    protected Application configure() {
+        if ( app == null ) {
+            app = new JerseyResourceConfig();
+        }
+        return app;
+    }
+
+    @Override
+    protected DeploymentContext configureDeployment() {
+        if ( context == null ) {
+            context = ServletDeploymentContext.builder( configure() ) .addListener( StartupListener.class ).build();
+        }
+        return context;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/PerformanceTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/PerformanceTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/PerformanceTest.java
new file mode 100644
index 0000000..b99be69
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/PerformanceTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.api;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.persistence.qakka.core.QueueMessage;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.net.URISyntaxException;
+import java.util.*;
+
+
+public class PerformanceTest {
+    private static final Logger logger = LoggerFactory.getLogger( PerformanceTest.class );
+
+    
+    @Test
+    @Ignore("needs exernal Tomcat an Cassandra")
+    public void testSendAndGetMessagePerformance() throws URISyntaxException, JsonProcessingException {
+
+        Client client = ClientBuilder.newClient();
+        
+        WebTarget target = client.target("http://macsnoopdave2013:8080/api/");
+        
+        // create a queue
+
+        String queueName = "pt_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
+        Map<String, Object> queueMap = new HashMap<String, Object>() {{ put("name", queueName); }};
+        target.path("queues").request().post( Entity.entity( queueMap, MediaType.APPLICATION_JSON_TYPE));
+
+        // send some messages
+        int numMessages = 20000;
+
+        {
+            ObjectMapper mapper = new ObjectMapper();
+            List<Long> times = new ArrayList<>( numMessages );
+            int errorCount = 0;
+            int counter = 0;
+            
+            for (int i = 0; i < numMessages; i++) {
+
+                final int number = i;
+                Map<String, Object> messageMap = new HashMap<String, Object>() {{
+                    put( "message", "this is message #" + number );
+                    put( "valid", true );
+                }};
+                String body = mapper.writeValueAsString( messageMap );
+
+                long startTime = System.currentTimeMillis();
+                Response post = target.path( "queues" ).path( queueName ).path( "messages" )
+                        .request().post( Entity.entity( body, MediaType.APPLICATION_OCTET_STREAM_TYPE ) );
+                long stopTime = System.currentTimeMillis();
+                times.add( stopTime - startTime );
+
+                if ( post.getStatus() != 200 ) {
+                    errorCount++;
+                }
+
+                if ( ++counter % 500 == 0 ) {
+                    logger.debug("Sent {} messages with error count {}", counter, errorCount);
+                }
+
+                try { Thread.sleep(5); } catch ( Exception intentionallyIgnored ) {};
+            }
+
+            Long total = times.stream().mapToLong( time -> time ).sum();
+            Long max = times.stream().max( Comparator.comparing( time -> time ) ).get();
+            Long min = times.stream().min( Comparator.comparing( time -> time ) ).get();
+            Double average = times.stream().mapToLong( time -> time ).average().getAsDouble();
+
+            logger.debug( "\n>>>>>>> Total send time {}ms, min {}ms, max {}ms, average {}ms errors {}\n\n", 
+                    total, min, max, average, errorCount );
+        }
+
+        // get all messages, checking for dups
+
+        {
+            Set<UUID> messageIds = new HashSet<>();
+            List<Long> times = new ArrayList<>( numMessages );
+            int errorCount = 0;
+            int counter = 0;
+            
+            for (int j = 0; j < numMessages; j++) {
+
+                long startTime = System.currentTimeMillis();
+                Response response = target.path( "queues" ).path( queueName ).path( "messages" ).request().get();
+                long stopTime = System.currentTimeMillis();
+                times.add( stopTime - startTime );
+
+                if ( ++counter % 500 == 0 ) {
+                    logger.debug("Got {} messages with error count {}", counter, errorCount);
+                }
+                
+                if ( response .getStatus() != 200 ) {
+                    errorCount++;
+                    continue;
+                }
+
+                ApiResponse apiResponse = response.readEntity( ApiResponse.class );
+                QueueMessage queueMessage = apiResponse.getQueueMessages().iterator().next();
+
+                if (messageIds.contains( queueMessage.getQueueMessageId() )) {
+                    Assert.fail( "Message fetched twice: " + queueMessage.getQueueMessageId() );
+                } else {
+                    messageIds.add( queueMessage.getQueueMessageId() );
+                }
+            }
+            Assert.assertEquals( numMessages, messageIds.size() );
+
+            Long total = times.stream().mapToLong( time -> time ).sum();
+            Long max = times.stream().max( Comparator.comparing( time -> time ) ).get();
+            Long min = times.stream().min( Comparator.comparing( time -> time ) ).get();
+            Double average = times.stream().mapToLong( time -> time ).average().getAsDouble();
+
+            logger.debug( "\n>>>>>>> Total get time {}ms, min {}ms, max {}ms, average {}ms errors {}\n\n", 
+                    total, min, max, average, errorCount );
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java
new file mode 100644
index 0000000..fcb4212
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.api;
+
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.io.ByteStreams;
+import com.google.inject.Injector;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.api.impl.StartupListener;
+import org.apache.usergrid.persistence.qakka.core.QueueMessage;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URISyntaxException;
+import java.util.*;
+
+import static org.junit.Assert.fail;
+
+
+public class QueueResourceTest extends AbstractRestTest {
+    private static final Logger logger = LoggerFactory.getLogger( QueueResourceTest.class );
+
+    static private final TypeReference<Map<String,Object>> jsonMapTypeRef
+            = new TypeReference<Map<String,Object>>() {};
+
+    @Test
+    public void testCreateQueue() throws URISyntaxException {
+
+        // create a queue
+
+        String queueName = "qrt_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
+        Map<String, Object> queueMap = new HashMap<String, Object>() {{
+            put("name", queueName);
+        }};
+        Response response = target("queues").request()
+                .post( Entity.entity( queueMap, MediaType.APPLICATION_JSON_TYPE));
+
+        Assert.assertEquals( 201, response.getStatus() );
+        URIStrategy uriStrategy = StartupListener.INJECTOR.getInstance( URIStrategy.class );
+        Assert.assertEquals( uriStrategy.queueURI( queueName ).toString(), response.getHeaderString( "location" ) );
+
+        // get queue by name
+
+        response = target("queues").path( queueName ).path( "config" ).request().get();
+        Assert.assertEquals( 200, response.getStatus() );
+        ApiResponse apiResponse = response.readEntity( ApiResponse.class );
+        Assert.assertNotNull( apiResponse.getQueues() );
+        Assert.assertFalse( apiResponse.getQueues().isEmpty() );
+        Assert.assertEquals( 1, apiResponse.getQueues().size() );
+        Assert.assertEquals( queueName, apiResponse.getQueues().iterator().next().getName() );
+    }
+
+
+    @Test
+    public void testDeleteQueue() throws URISyntaxException {
+
+        // create a queue
+
+        String queueName = "qrt_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
+        Map<String, Object> queueMap = new HashMap<String, Object>() {{ put("name", queueName); }};
+        Response response = target("queues").request()
+                .post( Entity.entity( queueMap, MediaType.APPLICATION_JSON_TYPE));
+
+        Assert.assertEquals( 201, response.getStatus() );
+        URIStrategy uriStrategy = StartupListener.INJECTOR.getInstance( URIStrategy.class );
+        Assert.assertEquals( uriStrategy.queueURI( queueName ).toString(), response.getHeaderString( "location" ) );
+
+        // delete queue without confirm = true, should fail with bad request
+
+        response = target("queues").path( queueName ).request().delete();
+        Assert.assertEquals( 400, response.getStatus() );
+
+        // delete queue with confirm = true
+
+        response = target("queues").path( queueName ).queryParam( "confirm", true ).request().delete();
+        Assert.assertEquals( 200, response.getStatus() );
+
+        // cannot get queue by name
+
+        response = target("queues").path( queueName ).path( "config" ).request().get();
+        Assert.assertEquals( 404, response.getStatus() );
+    }
+
+
+    @Test
+    public void testSendMessageToBadQueue() throws URISyntaxException, JsonProcessingException, InterruptedException {
+
+        String queueName = "bogus_queue_is_bogus";
+        Map<String, Object> messageMap = new HashMap<String, Object>() {{ put("dummy_prop", "dummy_value"); }};
+        ObjectMapper mapper = new ObjectMapper();
+        String body = mapper.writeValueAsString( messageMap );
+
+        Response response = target("queues").path( queueName ).path( "messages" )
+                .request().post( Entity.entity( body, MediaType.APPLICATION_OCTET_STREAM_TYPE ));
+
+        Assert.assertEquals( 404, response.getStatus() );
+    }
+
+
+    @Test
+    public void testSendJsonMessagesAsJson() throws URISyntaxException, IOException, InterruptedException {
+        sendJsonMessages( true );
+    }
+
+
+    @Test
+    public void testSendMessagesJsonAsOctetStream() throws URISyntaxException, IOException, InterruptedException {
+        sendJsonMessages( false );
+    }
+
+
+    /**
+     * Send 100 JSON payload messages to queue.
+     * @param asJson True to send with content-type header 'application/json'
+     *               False to send with content-type header 'application/octet stream'
+     */
+    private void sendJsonMessages( boolean asJson ) throws URISyntaxException, IOException, InterruptedException {
+
+        // create a queue
+
+        String queueName = "qrt_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
+        Map<String, Object> queueMap = new HashMap<String, Object>() {{
+            put( "name", queueName );
+        }};
+        target( "queues" ).request().post( Entity.entity( queueMap, MediaType.APPLICATION_JSON_TYPE ) );
+
+        // send some messages
+
+        ObjectMapper mapper = new ObjectMapper();
+
+        int numMessages = 100;
+        for (int i = 0; i < numMessages; i++) {
+
+            final int number = i;
+            Map<String, Object> messageMap = new HashMap<String, Object>() {{
+                put( "message", "this is message #" + number );
+                put( "valid", true );
+            }};
+            String body = mapper.writeValueAsString( messageMap );
+
+            Response response;
+            if ( asJson ) {
+                response = target( "queues" ).path( queueName ).path( "messages" )
+                        .request().post( Entity.entity( body, MediaType.APPLICATION_JSON ) );
+            } else {
+                response = target( "queues" ).path( queueName ).path( "messages" )
+                        .queryParam( "contentType", MediaType.APPLICATION_JSON )
+                        .request().post( Entity.entity( body, MediaType.APPLICATION_OCTET_STREAM ) );
+            }
+
+            Assert.assertEquals( 200, response.getStatus() );
+        }
+
+        // get all messages, checking for dups
+
+        checkJsonMessages( queueName, numMessages );
+    }
+
+
+    private Set<UUID> checkJsonMessages( String queueName, int numMessages ) throws IOException {
+
+        ObjectMapper mapper = new ObjectMapper();
+
+        Set<UUID> messageIds = new HashSet<>();
+        for ( int j=0; j<numMessages; j++ ) {
+
+            int retries = 0;
+            int maxRetries = 10;
+            ApiResponse apiResponse = null;
+            while ( retries++ < maxRetries ) {
+                Response response = target( "queues" ).path( queueName ).path( "messages" ).request().get();
+                apiResponse = response.readEntity( ApiResponse.class );
+                if ( !apiResponse.getQueueMessages().isEmpty() ) {
+                    break;
+                }
+                try { Thread.sleep(500); } catch (Exception ignored) {}
+            }
+
+            Assert.assertNotNull(   apiResponse );
+            Assert.assertNotNull(   apiResponse.getQueueMessages() );
+            Assert.assertEquals( 1, apiResponse.getQueueMessages().size() );
+
+            QueueMessage queueMessage = apiResponse.getQueueMessages().iterator().next();
+            Map<String, Object> payload = mapper.readValue( queueMessage.getData(), jsonMapTypeRef );
+
+            Assert.assertEquals( queueName, queueMessage.getQueueName() );
+            Assert.assertNull( queueMessage.getHref() );
+            Assert.assertEquals( true, payload.get("valid") );
+
+            if (messageIds.contains( queueMessage.getQueueMessageId() )) {
+                Assert.fail("Message fetched twice: " + queueMessage.getQueueMessageId() );
+            } else {
+                messageIds.add( queueMessage.getQueueMessageId() );
+            }
+        }
+        Assert.assertEquals( numMessages, messageIds.size() );
+
+        return messageIds;
+    }
+
+
+    @Test
+    public void testSendBinaryMessages() throws URISyntaxException, IOException, InterruptedException {
+
+        // create a queue
+
+        String queueName = "qrt_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
+        Map<String, Object> queueMap = new HashMap<String, Object>() {{
+            put( "name", queueName );
+        }};
+        target( "queues" ).request().post( Entity.entity( queueMap, MediaType.APPLICATION_JSON_TYPE ) );
+
+        // send messages each with image/jpg payload
+
+        InputStream is = getClass().getResourceAsStream("/qakka-duck.jpg");
+        byte[] bytes = ByteStreams.toByteArray( is );
+
+        int numMessages = 100;
+        for (int i = 0; i < numMessages; i++) {
+
+            Response response = target( "queues" ).path( queueName ).path( "messages" )
+                    .queryParam( "contentType", "image/jpg" )
+                    .request()
+                    .post( Entity.entity( bytes, MediaType.APPLICATION_OCTET_STREAM ));
+
+            Assert.assertEquals( 200, response.getStatus() );
+        }
+
+        // get all messages, checking for dups
+
+        checkBinaryMessages( queueName, numMessages );
+    }
+
+
+    private Set<UUID> checkBinaryMessages( String queueName, int numMessages ) throws IOException {
+
+        Set<UUID> messageIds = new HashSet<>();
+        for ( int j=0; j<numMessages; j++ ) {
+
+            Response response = target( "queues" ).path( queueName ).path( "messages" ).request().get();
+
+            ApiResponse apiResponse = response.readEntity( ApiResponse.class );
+            Assert.assertNotNull(   apiResponse.getQueueMessages() );
+            Assert.assertFalse(     apiResponse.getQueueMessages().isEmpty() );
+            Assert.assertEquals( 1, apiResponse.getQueueMessages().size() );
+
+            QueueMessage queueMessage = apiResponse.getQueueMessages().iterator().next();
+
+            // no data in a binary message
+            Assert.assertNull( queueMessage.getData() );
+
+            // data can be found at HREF provided
+            Assert.assertNotNull( queueMessage.getHref() );
+
+            Response binaryResponse = target("queues")
+                    .path( queueName ).path("data").path( queueMessage.getQueueMessageId().toString() )
+                    .request().accept( "image/jpg" ).get();
+
+            Assert.assertEquals( 200, binaryResponse.getStatus() );
+            InputStream is = binaryResponse.readEntity( InputStream.class );
+
+            byte[] imageBytes = ByteStreams.toByteArray( is );
+            Assert.assertEquals( 11188, imageBytes.length);
+
+            if (messageIds.contains( queueMessage.getQueueMessageId() )) {
+                fail("Message fetched twice: " + queueMessage.getQueueMessageId() );
+            } else {
+                messageIds.add( queueMessage.getQueueMessageId() );
+            }
+        }
+        Assert.assertEquals( numMessages, messageIds.size() );
+
+        return messageIds;
+    }
+
+
+    @Test
+    public void testSendMessageAckAndTimeout() throws URISyntaxException, IOException, InterruptedException {
+
+        // create a queue
+
+        String queueName = "qrt_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
+        Map<String, Object> queueMap = new HashMap<String, Object>() {{ put("name", queueName); }};
+        target("queues").request().post( Entity.entity( queueMap, MediaType.APPLICATION_JSON_TYPE));
+
+        // send some messages
+
+        ObjectMapper mapper = new ObjectMapper();
+
+        int numMessages = 100;
+        for ( int i=0; i<numMessages; i++ ) {
+
+            final int number = i;
+            Map<String, Object> messageMap = new HashMap<String, Object>() {{
+                put("message", "this is message #" + number);
+                put("valid", true );
+            }};
+            String body = mapper.writeValueAsString( messageMap );
+
+            Response response = target("queues").path( queueName ).path( "messages" )
+                    .request().post( Entity.entity( body, MediaType.APPLICATION_JSON ));
+
+            Assert.assertEquals( 200, response.getStatus() );
+        }
+
+        // get all messages, checking for dups
+
+        Set<UUID> messageIds = checkJsonMessages( queueName, numMessages );
+
+        // there should be no more messages available
+
+        Response response = target( "queues" ).path( queueName ).path( "messages" ).request().get();
+        ApiResponse apiResponse = response.readEntity( ApiResponse.class );
+        Assert.assertNotNull( apiResponse.getQueueMessages() );
+        Assert.assertTrue( apiResponse.getQueueMessages().isEmpty() );
+
+        // ack half of the messages
+
+        int count = 0;
+        Set<UUID> ackedIds = new HashSet<>();
+        for ( UUID queueMessageId : messageIds ) {
+            response = target( "queues" )
+                    .path( queueName ).path( "messages" ).path( queueMessageId.toString() ).request().delete();
+            Assert.assertEquals( 200, response.getStatus() );
+            ackedIds.add( queueMessageId );
+            if ( ++count >= numMessages/2 ) {
+                break;
+            }
+        }
+        messageIds.removeAll( ackedIds );
+
+        // wait for remaining of the messages to timeout
+
+        QakkaFig qakkaFig = StartupListener.INJECTOR.getInstance( QakkaFig.class );
+        Thread.sleep( 2*qakkaFig.getQueueTimeoutSeconds() * 1000 );
+
+        // now, the remaining messages cannot be acked because they timed out
+
+        for ( UUID queueMessageId : messageIds ) {
+            response = target( "queues" )
+                    .path( queueName ).path( "messages" ).path( queueMessageId.toString() ).request().delete();
+            Assert.assertEquals( 400, response.getStatus() );
+        }
+
+        // and, those same messages should be available again in the queue
+
+        checkJsonMessages( queueName, numMessages/2 );
+    }
+
+
+    @Test
+    public void testConvertDelayParameter() {
+
+        Injector injector = StartupListener.INJECTOR;
+        QueueResource queueResource = injector.getInstance( QueueResource.class );
+
+        Assert.assertEquals( 0L, queueResource.convertDelayParameter( "" ).longValue() );
+        Assert.assertEquals( 0L, queueResource.convertDelayParameter( "0" ).longValue() );
+        Assert.assertEquals( 0L, queueResource.convertDelayParameter( "NONE" ).longValue() );
+        Assert.assertEquals( 5L, queueResource.convertDelayParameter( "5" ).longValue() );
+
+        try {
+            queueResource.convertDelayParameter( "bogus value" );
+            fail("Expected exception on bad value");
+        } catch ( IllegalArgumentException expected ) {
+            // pass
+        }
+    }
+
+    @Test
+    public void testConvertExpirationParameter() {
+
+        Injector injector = StartupListener.INJECTOR;
+        QueueResource queueResource = injector.getInstance( QueueResource.class );
+
+        Assert.assertNull( queueResource.convertExpirationParameter( "" ) );
+        Assert.assertNull( queueResource.convertExpirationParameter( "NEVER" ) );
+
+        Assert.assertEquals( 5L, queueResource.convertExpirationParameter( "5" ).longValue() );
+
+        try {
+            queueResource.convertExpirationParameter( "bogus value" );
+            fail("Expected exception on bad value");
+        } catch ( IllegalArgumentException expected ) {
+            // pass
+        }
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/common/CassandraClientTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/common/CassandraClientTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/common/CassandraClientTest.java
new file mode 100644
index 0000000..42423fa
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/common/CassandraClientTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.common;
+
+import com.datastax.driver.core.Session;
+import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
+import org.apache.usergrid.persistence.qakka.AbstractTest;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.junit.Test;
+
+
+/**
+ * Created by russo on 6/8/16.
+ */
+public class CassandraClientTest extends AbstractTest {
+    
+    @Test
+    public void getClient(){
+
+        CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+
+        Session session = cassandraClient.getSession();
+
+        session.getLoggedKeyspace();
+
+    }
+
+
+}