You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/09/16 22:34:39 UTC

[10/25] usergrid git commit: Initial integration of Qakka into Usergrid Queue module, and implementation of Qakka-based LegacyQueueManager implementation.

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/QakkaException.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/QakkaException.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/QakkaException.java
new file mode 100644
index 0000000..84e0681
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/QakkaException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.exceptions;
+
+
+public class QakkaException extends Exception {
+
+    public QakkaException(String message) {
+        super( message );
+    }
+
+    public QakkaException(String message, Throwable cause) {
+        super( message, cause );
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/QakkaRuntimeException.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/QakkaRuntimeException.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/QakkaRuntimeException.java
new file mode 100644
index 0000000..fbb788b
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/QakkaRuntimeException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.exceptions;
+
+
+public class QakkaRuntimeException extends RuntimeException {
+
+    public QakkaRuntimeException(String message) {
+        super( message );
+    }
+
+    public QakkaRuntimeException(String message, Throwable cause) {
+        super( message, cause );
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java
new file mode 100644
index 0000000..1c733a6
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization;
+
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.querybuilder.Clause;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.impl.QueueMessageSerializationImpl;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+import static org.apache.usergrid.persistence.qakka.serialization.queuemessages.impl.QueueMessageSerializationImpl.*;
+
+
+public class MultiShardMessageIterator implements Iterator<DatabaseQueueMessage> {
+
+    private static final Logger logger = LoggerFactory.getLogger( MultiShardMessageIterator.class );
+
+    private final CassandraClient cassandraClient;
+
+
+    private final int PAGE_SIZE = 100;
+    private final String queueName;
+    private final String region;
+    private final DatabaseQueueMessage.Type messageType;
+    private final Iterator<Shard> shardIterator;
+
+    private Iterator<DatabaseQueueMessage> currentIterator;
+    private Shard currentShard;
+    private UUID nextStart;
+
+
+    public MultiShardMessageIterator(
+            final CassandraClient cassandraClient,
+            final String queueName,
+            final String region,
+            final DatabaseQueueMessage.Type messageType,
+            final Iterator<Shard> shardIterator,
+            final UUID nextStart) {
+
+        this.queueName = queueName;
+        this.region = region;
+        this.messageType = messageType;
+        this.shardIterator = shardIterator;
+        this.nextStart = nextStart;
+        this.cassandraClient = cassandraClient;
+
+        if (shardIterator == null) {
+            throw new RuntimeException("shardIterator cannot be null");
+        }
+
+    }
+
+    @Override
+    public boolean hasNext() {
+
+        if ( shardIterator.hasNext() && currentIterator == null) {
+            advance();
+        }
+
+        if ( shardIterator.hasNext() && !currentIterator.hasNext()) {
+            advance();
+        }
+
+        if ( !shardIterator.hasNext() && ( currentIterator == null || !currentIterator.hasNext()) ) {
+            advance();
+        }
+
+        return currentIterator.hasNext();
+    }
+
+    @Override
+    public DatabaseQueueMessage next() {
+
+        if ( !hasNext() ) {
+            throw new NoSuchElementException( "No next message exists" );
+        }
+
+        return currentIterator.next();
+    }
+
+    private void advance(){
+
+        if (currentShard == null){
+            currentShard = shardIterator.next();
+        }
+
+        Clause queueNameClause = QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName);
+        Clause regionClause    = QueryBuilder.eq( COLUMN_REGION, region);
+        Clause shardIdClause   = QueryBuilder.eq( COLUMN_SHARD_ID, currentShard.getShardId());
+
+        // if we have a pointer from the shard and this is the first seek, init from the pointer's position
+        if ( currentShard.getPointer() != null && nextStart == null ){
+            nextStart = currentShard.getPointer();
+        }
+
+        Statement query;
+
+        if ( nextStart == null) {
+
+            query = QueryBuilder.select().all().from(QueueMessageSerializationImpl.getTableName(messageType))
+                    .where(queueNameClause)
+                    .and(regionClause)
+                    .and(shardIdClause)
+                    .limit(PAGE_SIZE);
+        } else {
+
+            Clause messageIdClause = QueryBuilder.gt( COLUMN_QUEUE_MESSAGE_ID, nextStart);
+            query = QueryBuilder.select().all().from(QueueMessageSerializationImpl.getTableName(messageType))
+                    .where(queueNameClause)
+                    .and(regionClause)
+                    .and(shardIdClause)
+                    .and(messageIdClause)
+                    .limit(PAGE_SIZE);
+        }
+
+        List<Row> rows = cassandraClient.getSession().execute(query).all();
+
+        if ( (rows == null || rows.size() == 0) && shardIterator.hasNext()) {
+
+            currentShard = shardIterator.next();
+            advance();
+
+        } else {
+
+            currentIterator = getIteratorFromRows(rows);
+
+        }
+    }
+
+
+    private Iterator<DatabaseQueueMessage> getIteratorFromRows(List<Row> rows){
+
+        List<DatabaseQueueMessage> messages = new ArrayList<>(rows.size());
+
+        rows.forEach(row -> {
+
+            final String queueName =    row.getString( COLUMN_QUEUE_NAME);
+            final String region =       row.getString( COLUMN_REGION);
+            final long shardId =        row.getLong(   COLUMN_SHARD_ID);
+            final UUID queueMessageId = row.getUUID(   COLUMN_QUEUE_MESSAGE_ID);
+            final UUID messageId =      row.getUUID(   COLUMN_MESSAGE_ID);
+            final long queuedAt =       row.getLong(   COLUMN_QUEUED_AT);
+            final long inflightAt =     row.getLong(   COLUMN_INFLIGHT_AT);
+
+            messages.add(new DatabaseQueueMessage(
+                    messageId, messageType, queueName, region, shardId, queuedAt, inflightAt, queueMessageId));
+
+            //queueMessageId is internal to the messages_available and messages_inflight tables
+            nextStart = queueMessageId;
+
+        });
+
+        return messages.iterator();
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/Result.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/Result.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/Result.java
new file mode 100644
index 0000000..b2b0934
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/Result.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization;
+
+import com.datastax.driver.core.PagingState;
+
+import java.util.List;
+
+
+/**
+ * Created by Dave Johnson (snoopdave@apache.org) on 8/8/16.
+ */
+public interface Result<T> {
+    PagingState getPagingState();
+
+    List<T> getEntities();
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/AuditLog.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/AuditLog.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/AuditLog.java
new file mode 100644
index 0000000..dcf0d1a
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/AuditLog.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.auditlog;
+
+import java.util.UUID;
+
+
+public class AuditLog {
+
+    public enum Status { SUCCESS, ERROR };
+
+    public enum Action { SEND, ACK, GET, REQUEUE };
+
+    Action action;
+    Status status;
+    String queueName;
+    String region;
+    UUID messageId;
+
+    UUID queueMessageId;
+    long transfer_time;
+
+    public AuditLog(
+            Action action,
+            Status status,
+            String queueName,
+            String region,
+            UUID messageId,
+            UUID queueMessageId,
+            long transfer_time) {
+
+        this.action = action;
+        this.status = status;
+        this.queueName = queueName;
+        this.region = region;
+        this.messageId = messageId;
+        this.queueMessageId = queueMessageId;
+        this.transfer_time = transfer_time;
+    }
+
+    public Action getAction() {
+        return action;
+    }
+
+    public Status getStatus() {
+        return status;
+    }
+
+    public String getQueueName() {
+        return queueName;
+    }
+
+    public void setQueueName(String queueName) {
+        this.queueName = queueName;
+    }
+
+    public String getRegion() {
+        return region;
+    }
+
+    public void setRegion(String region) {
+        this.region = region;
+    }
+
+    public UUID getMessageId() {
+        return messageId;
+    }
+
+    public void setMessageId(UUID messageId) {
+        this.messageId = messageId;
+    }
+
+    public long getTransfer_time() {
+        return transfer_time;
+    }
+
+    public void setTransfer_time(long transfer_time) {
+        this.transfer_time = transfer_time;
+    }
+
+    public UUID getQueueMessageId() {
+        return queueMessageId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/AuditLogSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/AuditLogSerialization.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/AuditLogSerialization.java
new file mode 100644
index 0000000..95f2dbe
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/AuditLogSerialization.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.auditlog;
+
+import org.apache.usergrid.persistence.core.migration.schema.Migration;
+import org.apache.usergrid.persistence.qakka.serialization.Result;
+
+import java.util.UUID;
+
+
+public interface AuditLogSerialization extends Migration {
+
+    /**
+     * Record audit log record.
+     */
+    void recordAuditLog(
+        AuditLog.Action action,
+        AuditLog.Status status,
+        String queueName,
+        String region,
+        UUID messageId,
+        UUID queueMessageId);
+
+    /**
+     * Get all audit logs for a specific queue message.
+     */
+    Result<AuditLog> getAuditLogs(UUID messageId);
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/impl/AuditLogSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/impl/AuditLogSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/impl/AuditLogSerializationImpl.java
new file mode 100644
index 0000000..d9dbab6
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/impl/AuditLogSerializationImpl.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.auditlog.impl;
+
+import com.datastax.driver.core.PagingState;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.google.inject.Inject;
+import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
+import org.apache.usergrid.persistence.core.datastax.TableDefinition;
+import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.serialization.Result;
+import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog;
+import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+
+public class AuditLogSerializationImpl implements AuditLogSerialization {
+
+    private static final Logger logger = LoggerFactory.getLogger( AuditLogSerializationImpl.class );
+
+    private final CassandraClient cassandraClient;
+
+    public final static String TABLE_AUDIT_LOG   = "audit_log";
+
+    public final static String COLUMN_ACTION           = "action";
+    public final static String COLUMN_STATUS           = "status";
+    public final static String COLUMN_QUEUE_NAME       = "queue_name";
+    public final static String COLUMN_REGION           = "region";
+    public final static String COLUMN_MESSAGE_ID       = "message_id";
+    public final static String COLUMN_QUEUE_MESSAGE_ID = "queue_message_id";
+    public final static String COLUMN_TRANSFER_TIME    = "transfer_time";
+
+
+    // design note: want to be able to query this by message_id, so we can do "garbage collection"
+    // of message data items that have been processed in all target regions
+
+    static final String CQL =
+        "CREATE TABLE IF NOT EXISTS audit_log ( " +
+                "action           text, " +
+                "status           text, " +
+                "queue_name       text, " +
+                "region           text, " +
+                "message_id       timeuuid, " +
+                "queue_message_id timeuuid, " +
+                "transfer_time    bigint, " +
+                "PRIMARY KEY (message_id, transfer_time) " +
+        ") WITH CLUSTERING ORDER BY (transfer_time ASC); ";
+
+
+    @Inject
+    public AuditLogSerializationImpl( CassandraClient cassandraClient ) {
+        this.cassandraClient = cassandraClient;
+    }
+
+
+    @Override
+    public void recordAuditLog(
+            AuditLog.Action action,
+            AuditLog.Status status,
+            String queueName,
+            String region,
+            UUID messageId,
+            UUID queueMessageId ) {
+
+        Statement insert = QueryBuilder.insertInto(TABLE_AUDIT_LOG)
+                .value(COLUMN_ACTION, action.toString() )
+                .value(COLUMN_STATUS, status.toString() )
+                .value(COLUMN_QUEUE_NAME, queueName )
+                .value(COLUMN_REGION, region )
+                .value(COLUMN_MESSAGE_ID, messageId )
+                .value(COLUMN_QUEUE_MESSAGE_ID, queueMessageId )
+                .value(COLUMN_TRANSFER_TIME, System.currentTimeMillis() );
+        cassandraClient.getSession().execute(insert);
+    }
+
+
+    @Override
+    public Result<AuditLog> getAuditLogs( UUID messageId ) {
+
+        Statement query = QueryBuilder.select().all().from(TABLE_AUDIT_LOG)
+            .where( QueryBuilder.eq( COLUMN_MESSAGE_ID, messageId ) );
+
+        ResultSet rs = cassandraClient.getSession().execute( query );
+
+        final List<AuditLog> auditLogs = rs.all().stream().map( row ->
+            new AuditLog(
+                AuditLog.Action.valueOf( row.getString( COLUMN_ACTION )),
+                AuditLog.Status.valueOf( row.getString( COLUMN_STATUS )),
+                row.getString( COLUMN_QUEUE_NAME ),
+                row.getString( COLUMN_REGION ),
+                row.getUUID( COLUMN_MESSAGE_ID ),
+                row.getUUID( COLUMN_QUEUE_MESSAGE_ID ),
+                row.getLong( COLUMN_TRANSFER_TIME ) )
+        ).collect( Collectors.toList() );
+
+        return new Result<AuditLog>() {
+
+            @Override
+            public PagingState getPagingState() {
+                return null; // no paging
+            }
+
+            @Override
+            public List<AuditLog> getEntities() {
+                return auditLogs;
+            }
+        };
+
+    }
+
+
+    @Override
+    public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() {
+        return Collections.EMPTY_LIST;
+    }
+
+    @Override
+    public Collection<TableDefinition> getTables() {
+        return Collections.singletonList( new TableDefinitionStringImpl( TABLE_AUDIT_LOG, CQL ) );
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessage.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessage.java
new file mode 100644
index 0000000..dab47d5
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessage.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.queuemessages;
+
+import java.util.UUID;
+
+
+public class DatabaseQueueMessage {
+
+    public enum Type {
+        DEFAULT, INFLIGHT
+    }
+
+    private final String queueName;
+    private final String region;
+    private final Long queuedAt;
+    private final UUID messageId;
+    private final UUID queueMessageId;
+
+    private Type type;
+    private Long inflightAt;
+
+    private Long shardId;
+
+
+    public DatabaseQueueMessage(
+            final UUID messageId,
+            final Type type,
+            final String queueName,
+            final String region,
+            final Long shardId,
+            final Long queuedAt,
+            final Long inflightAt,
+            UUID queueMessageId){
+
+        this.messageId = messageId;
+        this.type = type;
+        this.queueName = queueName;
+        this.region = region;
+        this.shardId = shardId;
+        this.queuedAt = queuedAt;
+        this.inflightAt = inflightAt;
+        this.queueMessageId = queueMessageId;
+
+    }
+
+    public void setType(Type type) {
+        this.type = type;
+    }
+
+    public String getQueueName() {
+        return queueName;
+    }
+
+    public String getRegion() {
+        return region;
+    }
+
+    public Long getShardId() {
+        return shardId;
+    }
+
+    public UUID getMessageId() {
+        return messageId;
+    }
+
+    public Type getType() {
+        return type;
+    }
+
+    public Long getQueuedAt() {
+        return queuedAt;
+    }
+
+    public UUID getQueueMessageId() {
+        return queueMessageId;
+    }
+
+    public Long getInflightAt() {
+        return inflightAt;
+    }
+
+    public void setInflightAt(Long inflightAt) {
+        this.inflightAt = inflightAt;
+    }
+
+    public void setShardId(Long shardId) {
+        this.shardId = shardId;
+    }
+
+
+
+    @Override
+    public int hashCode() {
+        int result = queueName.hashCode();
+        result = ( 31 * result ) + region.hashCode();
+        result = ( 31 * result ) + (int)( shardId != null ? shardId : 0L );
+        result = ( 31 * result ) + messageId.hashCode();
+        result = ( 31 * result ) + type.hashCode();
+
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+
+        if( this == obj){
+            return true;
+        }
+
+        if( !(obj instanceof DatabaseQueueMessage)){
+            return false;
+        }
+
+        DatabaseQueueMessage that = (DatabaseQueueMessage) obj;
+
+        if( !this.queueName.equalsIgnoreCase(that.queueName)){
+            return false;
+        }
+        if( !this.region.equalsIgnoreCase(that.region)){
+            return false;
+        }
+        if( this.shardId != that.shardId){
+            return false;
+        }
+        if( !messageId.equals(that.messageId)){
+            return false;
+        }
+        if( !type.equals(that.type)){
+            return false;
+        }
+
+        return true;
+
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessageBody.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessageBody.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessageBody.java
new file mode 100644
index 0000000..c4c7fce
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessageBody.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.queuemessages;
+
+import com.google.common.base.Preconditions;
+
+import java.nio.ByteBuffer;
+
+
+public class DatabaseQueueMessageBody {
+
+
+    private final ByteBuffer blob;
+    private final String contentType;
+
+
+    public DatabaseQueueMessageBody(final ByteBuffer blob, final String contentType){
+
+        Preconditions.checkNotNull(blob, "Blob data cannot be null");
+
+        this.blob = blob;
+        this.contentType = contentType;
+
+    }
+
+    public ByteBuffer getBlob() {
+        return blob;
+    }
+
+    public String getContentType() {
+        return contentType;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java
new file mode 100644
index 0000000..cbbf11f
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.qakka.serialization.queuemessages;
+
+import org.apache.usergrid.persistence.core.migration.schema.Migration;
+
+
+public interface MessageCounterSerialization  extends Migration {
+
+    void incrementCounter(String queueName, DatabaseQueueMessage.Type type, long increment);
+
+    void decrementCounter(String queueName, DatabaseQueueMessage.Type type, long increment);
+
+    long getCounterValue(String name, DatabaseQueueMessage.Type type);
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java
new file mode 100644
index 0000000..3ebe735
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.queuemessages;
+
+import org.apache.usergrid.persistence.core.migration.schema.Migration;
+
+import java.util.UUID;
+
+
+public interface QueueMessageSerialization extends Migration {
+
+    /**
+     * Write message to storage..
+     * If queueMessageId or createdTime are null, then values will be generated.
+     */
+    UUID writeMessage(final DatabaseQueueMessage message);
+
+    DatabaseQueueMessage loadMessage(
+        final String queueName,
+        final String region,
+        final Long shardIdOrNull,
+        final DatabaseQueueMessage.Type type,
+        final UUID queueMessageId);
+
+    void deleteMessage(
+        final String queueName,
+        final String region,
+        final Long shardIdOrNull,
+        final DatabaseQueueMessage.Type type,
+        final UUID queueMessageId);
+
+    void writeMessageData(final UUID messageId, final DatabaseQueueMessageBody messageBody);
+
+    DatabaseQueueMessageBody loadMessageData(final UUID messageId);
+
+    void deleteMessageData(final UUID messageId);
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
new file mode 100644
index 0000000..65ffc47
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.qakka.serialization.queuemessages.impl;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
+import org.apache.usergrid.persistence.core.datastax.TableDefinition;
+import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException;
+import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardCounterSerialization;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.MessageFormat;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+@Singleton
+public class MessageCounterSerializationImpl implements ShardCounterSerialization {
+    private static final Logger logger = LoggerFactory.getLogger( MessageCounterSerializationImpl.class );
+
+    private final CassandraClient cassandraClient;
+
+    final static String TABLE_SHARD_COUNTERS       = "counters";
+    final static String COLUMN_QUEUE_NAME    = "queue_name";
+    final static String COLUMN_SHARD_ID      = "shard_id";
+    final static String COLUMN_COUNTER_VALUE = "counter_value";
+    final static String COLUMN_SHARD_TYPE    = "shard_type";
+
+    // design note: counters based on DataStax example here:
+    // https://docs.datastax.com/en/cql/3.1/cql/cql_using/use_counter_t.html
+
+    static final String CQL =
+        "CREATE TABLE IF NOT EXISTS shard_counters ( " +
+                "counter_value counter, " +
+                "queue_name    varchar, " +
+                "shard_type    varchar, " +
+                "shard_id      bigint, " +
+                "PRIMARY KEY (queue_name, shard_type, shard_id) " +
+        ");";
+
+
+    final long maxInMemoryIncrement;
+
+    class InMemoryCount {
+        long baseCount;
+        final AtomicLong increment = new AtomicLong( 0L );
+        InMemoryCount( long baseCount ) {
+            this.baseCount = baseCount;
+        }
+        public long value() {
+            return baseCount + increment.get();
+        }
+        public AtomicLong getIncrement() {
+            return increment;
+        }
+        void setBaseCount( long baseCount ) {
+            this.baseCount = baseCount;
+        }
+    }
+
+    private Map<String, InMemoryCount> inMemoryCounters = new ConcurrentHashMap<>(200);
+
+
+    @Inject
+    public MessageCounterSerializationImpl(QakkaFig qakkaFig, CassandraClient cassandraClient ) {
+        this.maxInMemoryIncrement = qakkaFig.getMaxInMemoryShardCounter();
+        this.cassandraClient = cassandraClient;
+    }
+
+
+    @Override
+    public void incrementCounter(String queueName, Shard.Type type, long shardId, long increment ) {
+
+        String key = queueName + type + shardId;
+        synchronized ( inMemoryCounters ) {
+
+            if ( inMemoryCounters.get( key ) == null ) {
+
+                Long value = retrieveCounterFromStorage( queueName, type, shardId );
+
+                if ( value == null ) {
+                    incrementCounterInStorage( queueName, type, shardId, 0L );
+                    inMemoryCounters.put( key, new InMemoryCount( 0L ));
+                } else {
+                    inMemoryCounters.put( key, new InMemoryCount( value ));
+                }
+                inMemoryCounters.get( key ).getIncrement().addAndGet( increment );
+                return;
+            }
+        }
+
+        InMemoryCount inMemoryCount = inMemoryCounters.get( key );
+
+        synchronized ( inMemoryCount ) {
+            long totalIncrement = inMemoryCount.getIncrement().addAndGet( increment );
+
+            if (totalIncrement > maxInMemoryIncrement) {
+                incrementCounterInStorage( queueName, type, shardId, totalIncrement );
+                inMemoryCount.setBaseCount( retrieveCounterFromStorage( queueName, type, shardId ) );
+                inMemoryCount.getIncrement().set( 0L );
+            }
+        }
+
+    }
+
+
+    @Override
+    public long getCounterValue( String queueName, Shard.Type type, long shardId ) {
+
+        String key = queueName + type + shardId;
+
+        synchronized ( inMemoryCounters ) {
+
+            if ( inMemoryCounters.get( key ) == null ) {
+
+                Long value = retrieveCounterFromStorage( queueName, type, shardId );
+
+                if ( value == null ) {
+                    throw new NotFoundException(
+                            MessageFormat.format( "No counter found for queue {0} type {1} shardId {2}",
+                                    queueName, type, shardId ));
+                } else {
+                    inMemoryCounters.put( key, new InMemoryCount( value ));
+                }
+            }
+        }
+
+        return inMemoryCounters.get( key ).value();
+    }
+
+    void incrementCounterInStorage( String queueName, Shard.Type type, long shardId, long increment ) {
+
+        Statement update = QueryBuilder.update( TABLE_SHARD_COUNTERS )
+                .where( QueryBuilder.eq(   COLUMN_QUEUE_NAME, queueName ) )
+                .and(   QueryBuilder.eq(   COLUMN_SHARD_TYPE, type.toString() ) )
+                .and(   QueryBuilder.eq(   COLUMN_SHARD_ID, shardId ) )
+                .with(  QueryBuilder.incr( COLUMN_COUNTER_VALUE, increment ) );
+        cassandraClient.getSession().execute( update );
+    }
+
+
+    Long retrieveCounterFromStorage( String queueName, Shard.Type type, long shardId ) {
+
+        Statement query = QueryBuilder.select().from( TABLE_SHARD_COUNTERS )
+                .where( QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName ) )
+                .and( QueryBuilder.eq( COLUMN_SHARD_TYPE, type.toString()) )
+                .and( QueryBuilder.eq( COLUMN_SHARD_ID, shardId ) );
+
+        ResultSet resultSet = cassandraClient.getSession().execute( query );
+        List<Row> all = resultSet.all();
+
+        if ( all.size() > 1 ) {
+            throw new QakkaRuntimeException(
+                    "Multiple rows for counter " + queueName + " type " + type + " shardId " + shardId );
+        }
+        if ( all.isEmpty() ) {
+            return null;
+        }
+        return all.get(0).getLong( COLUMN_COUNTER_VALUE );
+    }
+
+
+    @Override
+    public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() {
+        return Collections.EMPTY_LIST;
+    }
+
+    @Override
+    public Collection<TableDefinition> getTables() {
+        return Collections.singletonList( new TableDefinitionStringImpl( TABLE_SHARD_COUNTERS, CQL ) );
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
new file mode 100644
index 0000000..f55b936
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.queuemessages.impl;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.querybuilder.Clause;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.inject.Inject;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
+import org.apache.usergrid.persistence.core.datastax.TableDefinition;
+import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessageBody;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardCounterSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.UUID;
+
+
+public class QueueMessageSerializationImpl implements QueueMessageSerialization {
+
+    private static final Logger logger = LoggerFactory.getLogger( QueueMessageSerializationImpl.class );
+
+    private final CassandraClient cassandraClient;
+
+    private final ActorSystemFig            actorSystemFig;
+    private final ShardStrategy             shardStrategy;
+    private final ShardCounterSerialization shardCounterSerialization;
+
+    public final static String COLUMN_QUEUE_NAME       = "queue_name";
+    public final static String COLUMN_REGION           = "region";
+    public final static String COLUMN_SHARD_ID         = "shard_id";
+    public final static String COLUMN_QUEUED_AT        = "queued_at";
+    public final static String COLUMN_INFLIGHT_AT      = "inflight_at";
+    public final static String COLUMN_QUEUE_MESSAGE_ID = "queue_message_id";
+    public final static String COLUMN_MESSAGE_ID       = "message_id";
+    public final static String COLUMN_CONTENT_TYPE     = "content_type";
+    public final static String COLUMN_MESSAGE_DATA     = "data";
+
+    public final static String TABLE_MESSAGES_AVAILABLE = "messages_available";
+
+    public final static String TABLE_MESSAGES_INFLIGHT = "messages_inflight";
+
+    public final static String TABLE_MESSAGE_DATA = "message_data";
+
+    static final String MESSAGES_AVAILABLE =
+        "CREATE TABLE IF NOT EXISTS messages_available ( " +
+                "queue_name       text, " +
+                "region           text, " +
+                "shard_id         bigint, " +
+                "queue_message_id timeuuid, " +
+                "message_id       uuid, " +
+                "queued_at        bigint, " +
+                "inflight_at      bigint, " +
+                "PRIMARY KEY ((queue_name, region, shard_id), queue_message_id ) " +
+                ") WITH CLUSTERING ORDER BY (queue_message_id ASC); ";
+
+    static final String MESSAGES_INFLIGHT =
+        "CREATE TABLE IF NOT EXISTS messages_inflight ( " +
+                "queue_name       text, " +
+                "region           text, " +
+                "shard_id         bigint, " +
+                "queue_message_id timeuuid, " +
+                "message_id       uuid, " +
+                "queued_at        bigint, " +
+                "inflight_at      bigint, " +
+                "PRIMARY KEY ((queue_name, region, shard_id), queue_message_id ) " +
+                ") WITH CLUSTERING ORDER BY (queue_message_id ASC); ";
+
+    static final String MESSAGE_DATA =
+        "CREATE TABLE IF NOT EXISTS message_data ( " +
+                "message_id uuid, " +
+                "data blob, " +
+                "content_type text, " +
+                "PRIMARY KEY ((message_id)) " +
+                "); ";
+
+    @Inject
+    public QueueMessageSerializationImpl(
+            ActorSystemFig            actorSystemFig,
+            ShardStrategy             shardStrategy,
+            ShardCounterSerialization shardCounterSerialization,
+            CassandraClient           cassandraClient
+        ) {
+        this.actorSystemFig            = actorSystemFig;
+        this.shardStrategy             = shardStrategy;
+        this.shardCounterSerialization = shardCounterSerialization;
+        this.cassandraClient = cassandraClient;
+    }
+
+
+    @Override
+    public UUID writeMessage(final DatabaseQueueMessage message) {
+
+        final UUID queueMessageId =  message.getQueueMessageId() == null ?
+                QakkaUtils.getTimeUuid() : message.getQueueMessageId();
+
+        long queuedAt = message.getQueuedAt() == null ?
+                System.currentTimeMillis() : message.getQueuedAt();
+
+        long inflightAt = message.getInflightAt() == null ?
+                message.getQueuedAt() : message.getInflightAt();
+
+        Shard.Type shardType = DatabaseQueueMessage.Type.DEFAULT.equals( message.getType() ) ?
+                Shard.Type.DEFAULT : Shard.Type.INFLIGHT;
+
+        if ( message.getShardId() == null ) {
+            Shard shard = shardStrategy.selectShard(
+                    message.getQueueName(), actorSystemFig.getRegionLocal(), shardType, queueMessageId );
+            message.setShardId( shard.getShardId() );
+        }
+
+        Statement insert = QueryBuilder.insertInto(getTableName(message.getType()))
+                .value( COLUMN_QUEUE_NAME,       message.getQueueName())
+                .value( COLUMN_REGION,           message.getRegion())
+                .value( COLUMN_SHARD_ID,         message.getShardId())
+                .value( COLUMN_MESSAGE_ID,       message.getMessageId())
+                .value( COLUMN_QUEUE_MESSAGE_ID, queueMessageId)
+                .value( COLUMN_INFLIGHT_AT,      inflightAt )
+                .value( COLUMN_QUEUED_AT,        queuedAt);
+
+        cassandraClient.getSession().execute(insert);
+
+        shardCounterSerialization.incrementCounter( message.getQueueName(), shardType, message.getShardId(), 1 );
+
+        return queueMessageId;
+    }
+
+
+    @Override
+    public DatabaseQueueMessage loadMessage(
+            final String queueName,
+            final String region,
+            final Long shardIdOrNull,
+            final DatabaseQueueMessage.Type type,
+            final UUID queueMessageId ) {
+
+        if ( queueMessageId == null ) {
+            return null;
+        }
+
+        final long shardId;
+        if ( shardIdOrNull == null ) {
+            Shard.Type shardType = DatabaseQueueMessage.Type.DEFAULT.equals( type ) ?
+                    Shard.Type.DEFAULT : Shard.Type.INFLIGHT;
+            Shard shard = shardStrategy.selectShard(
+                    queueName, actorSystemFig.getRegionLocal(), shardType, queueMessageId );
+            shardId = shard.getShardId();
+        } else {
+            shardId = shardIdOrNull;
+        }
+
+        Clause queueNameClause = QueryBuilder.eq(      COLUMN_QUEUE_NAME, queueName );
+        Clause regionClause = QueryBuilder.eq(         COLUMN_REGION, region );
+        Clause shardIdClause = QueryBuilder.eq(        COLUMN_SHARD_ID, shardId );
+        Clause queueMessageIdClause = QueryBuilder.eq( COLUMN_QUEUE_MESSAGE_ID, queueMessageId);
+
+        Statement select = QueryBuilder.select().from(getTableName( type ))
+                .where(queueNameClause)
+                .and(regionClause)
+                .and(shardIdClause)
+                .and(queueMessageIdClause);
+
+        Row row = cassandraClient.getSession().execute(select).one();
+
+        if (row == null) {
+            return null;
+        }
+
+        return new DatabaseQueueMessage(
+            row.getUUID(   COLUMN_MESSAGE_ID),
+            type,
+            row.getString( COLUMN_QUEUE_NAME),
+            row.getString( COLUMN_REGION),
+            row.getLong(   COLUMN_SHARD_ID),
+            row.getLong(   COLUMN_QUEUED_AT),
+            row.getLong(   COLUMN_INFLIGHT_AT),
+            row.getUUID(   COLUMN_QUEUE_MESSAGE_ID)
+        );
+    }
+
+
+    @Override
+    public void deleteMessage(
+            final String queueName,
+            final String region,
+            final Long shardIdOrNull,
+            final DatabaseQueueMessage.Type type,
+            final UUID queueMessageId ) {
+
+        final long shardId;
+        if ( shardIdOrNull == null ) {
+            Shard.Type shardType = DatabaseQueueMessage.Type.DEFAULT.equals( type ) ?
+                    Shard.Type.DEFAULT : Shard.Type.INFLIGHT;
+            Shard shard = shardStrategy.selectShard(
+                    queueName, actorSystemFig.getRegionLocal(), shardType, queueMessageId );
+            shardId = shard.getShardId();
+        } else {
+            shardId = shardIdOrNull;
+        }
+
+        Clause queueNameClause = QueryBuilder.eq(      COLUMN_QUEUE_NAME, queueName );
+        Clause regionClause = QueryBuilder.eq(         COLUMN_REGION, region );
+        Clause shardIdClause = QueryBuilder.eq(        COLUMN_SHARD_ID, shardId );
+        Clause queueMessageIdClause = QueryBuilder.eq( COLUMN_QUEUE_MESSAGE_ID, queueMessageId);
+
+        Statement delete = QueryBuilder.delete().from(getTableName( type ))
+                .where(queueNameClause)
+                .and(regionClause)
+                .and(shardIdClause)
+                .and(queueMessageIdClause);
+
+        ResultSet resultSet = cassandraClient.getSession().execute( delete );
+
+        String s = "s";
+    }
+
+
+    @Override
+    public DatabaseQueueMessageBody loadMessageData(final UUID messageId ){
+
+        Clause messageIdClause = QueryBuilder.eq( COLUMN_MESSAGE_ID, messageId );
+
+        Statement select = QueryBuilder.select().from( TABLE_MESSAGE_DATA).where(messageIdClause);
+
+        Row row = cassandraClient.getSession().execute(select).one();
+        if ( row == null ) {
+            return null;
+        }
+
+        return new DatabaseQueueMessageBody(
+                row.getBytes( COLUMN_MESSAGE_DATA),
+                row.getString( COLUMN_CONTENT_TYPE));
+    }
+
+
+    @Override
+    public void writeMessageData( final UUID messageId, final DatabaseQueueMessageBody messageBody ) {
+        Preconditions.checkArgument(QakkaUtils.isTimeUuid(messageId), "MessageId is not a type 1 UUID");
+
+        Statement insert = QueryBuilder.insertInto(TABLE_MESSAGE_DATA)
+                .value( COLUMN_MESSAGE_ID, messageId)
+                .value( COLUMN_MESSAGE_DATA, messageBody.getBlob())
+                .value( COLUMN_CONTENT_TYPE, messageBody.getContentType());
+
+        cassandraClient.getSession().execute(insert);
+    }
+
+
+    @Override
+    public void deleteMessageData( final UUID messageId ) {
+
+        Clause messageIdClause = QueryBuilder.eq(COLUMN_MESSAGE_ID, messageId);
+
+        Statement delete = QueryBuilder.delete().from(TABLE_MESSAGE_DATA)
+                .where(messageIdClause);
+
+        cassandraClient.getSession().execute(delete);
+    }
+
+
+    public static String getTableName(DatabaseQueueMessage.Type messageType){
+
+        String table;
+        if( messageType.equals(DatabaseQueueMessage.Type.DEFAULT)) {
+            table = TABLE_MESSAGES_AVAILABLE;
+        }else if (messageType.equals(DatabaseQueueMessage.Type.INFLIGHT)) {
+            table = TABLE_MESSAGES_INFLIGHT;
+        }else{
+            throw new IllegalArgumentException("Unknown DatabaseQueueMessage Type");
+        }
+
+        return table;
+    }
+
+    @Override
+    public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() {
+        return Collections.EMPTY_LIST;
+    }
+
+    @Override
+    public Collection<TableDefinition> getTables() {
+        return Lists.newArrayList(
+                new TableDefinitionStringImpl( TABLE_MESSAGES_AVAILABLE, MESSAGES_AVAILABLE ),
+                new TableDefinitionStringImpl( TABLE_MESSAGES_INFLIGHT, MESSAGES_INFLIGHT ),
+                new TableDefinitionStringImpl( TABLE_MESSAGE_DATA, MESSAGE_DATA )
+        );
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/DatabaseQueue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/DatabaseQueue.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/DatabaseQueue.java
new file mode 100644
index 0000000..dcf273a
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/DatabaseQueue.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.queues;
+
+public class DatabaseQueue {
+
+    private final String name;
+    private final String regions;
+
+    private String defaultDestinations;
+    private Long defaultDelayMs;
+    private Integer retryCount;
+    private Integer handlingTimeoutSec;
+    private String deadLetterQueue;
+
+
+    public DatabaseQueue(final String name,
+                         final String regions,
+                         final String defaultDestinations,
+                         final Long defaultDelayMs,
+                         final Integer retryCount,
+                         final Integer handlingTimeoutSec,
+                         final String deadLetterQueue ){
+
+        this.name = name;
+        this.regions = regions;
+        this.defaultDestinations = defaultDestinations;
+        this.defaultDelayMs = defaultDelayMs;
+        this.retryCount = retryCount;
+        this.handlingTimeoutSec = handlingTimeoutSec;
+        this.deadLetterQueue = deadLetterQueue;
+
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public String getDeadLetterQueue() {
+        return deadLetterQueue;
+    }
+
+    public Integer getHandlingTimeoutSec() {
+        return handlingTimeoutSec;
+    }
+
+    public Integer getRetryCount() {
+        return retryCount;
+    }
+
+    public Long getDefaultDelayMs() {
+        return defaultDelayMs;
+    }
+
+    public String getDefaultDestinations() {
+        return defaultDestinations;
+    }
+
+    public String getRegions() {
+        return regions;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = name.hashCode();
+        result = ( 31 * result ) + regions.hashCode();
+
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+
+        if( this == obj){
+            return true;
+        }
+
+        if( !(obj instanceof DatabaseQueue)){
+            return false;
+        }
+
+        DatabaseQueue that = (DatabaseQueue) obj;
+
+        if( !this.name.equalsIgnoreCase(that.name)){
+            return false;
+        }
+        if( !this.regions.equals(that.regions)){
+            return false;
+        }
+
+        return true;
+
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/QueueSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/QueueSerialization.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/QueueSerialization.java
new file mode 100644
index 0000000..b00d269
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/QueueSerialization.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.queues;
+
+import org.apache.usergrid.persistence.core.migration.schema.Migration;
+
+import java.util.List;
+
+
+public interface QueueSerialization extends Migration {
+
+    void writeQueue(DatabaseQueue queue);
+
+    DatabaseQueue getQueue(String name);
+
+    void deleteQueue(String name);
+
+    List<String> getListOfQueues();
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java
new file mode 100644
index 0000000..932097a
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.queues.impl;
+
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.querybuilder.Clause;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.google.inject.Inject;
+import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
+import org.apache.usergrid.persistence.core.datastax.TableDefinition;
+import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.impl.QueueMessageSerializationImpl;
+import org.apache.usergrid.persistence.qakka.serialization.queues.DatabaseQueue;
+import org.apache.usergrid.persistence.qakka.serialization.queues.QueueSerialization;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class QueueSerializationImpl implements QueueSerialization {
+
+    private static final Logger logger = LoggerFactory.getLogger( QueueMessageSerializationImpl.class );
+
+    private final CassandraClient cassandraClient;
+
+    public final static String COLUMN_QUEUE_NAME = "queue_name";
+    public final static String COLUMN_REGIONS = "regions";
+    public final static String COLUMN_DEFAULT_DESTINATIONS = "default_destinations";
+    public final static String COLUMN_DEFAULT_DELAY_MS = "default_delay_ms";
+    public final static String COLUMN_RETRY_COUNT = "retry_count";
+    public final static String COLUMN_HANDLING_TIMEOUT_SEC = "handling_timeout_sec";
+    public final static String COLUMN_DEAD_LETTER_QUEUE = "dead_letter_queue";
+
+
+    public final static String TABLE_QUEUES = "queues";
+
+    static final String CQL =
+        "CREATE TABLE IF NOT EXISTS queues ( " +
+            "queue_name           text, " +
+            "regions              text, " +
+            "default_destinations text, " +
+            "default_delay_ms     bigint, " +
+            "retry_count          int, " +
+            "handling_timeout_sec int, " +
+            "dead_letter_queue    text, " +
+            "PRIMARY KEY ((queue_name)) " +
+            "); ";
+
+
+    @Inject
+    public QueueSerializationImpl( CassandraClient cassandraClient ) {
+        this.cassandraClient = cassandraClient;
+    }
+
+
+    @Override
+    public void writeQueue(DatabaseQueue queue) {
+
+        Statement insert = QueryBuilder.insertInto(TABLE_QUEUES)
+                .value(COLUMN_QUEUE_NAME, queue.getName())
+                .value(COLUMN_REGIONS, queue.getRegions())
+                .value(COLUMN_DEFAULT_DESTINATIONS, queue.getDefaultDestinations())
+                .value(COLUMN_DEFAULT_DELAY_MS, queue.getDefaultDelayMs())
+                .value(COLUMN_RETRY_COUNT, queue.getRetryCount())
+                .value(COLUMN_HANDLING_TIMEOUT_SEC, queue.getHandlingTimeoutSec())
+                .value(COLUMN_DEAD_LETTER_QUEUE, queue.getDeadLetterQueue());
+
+
+        cassandraClient.getSession().execute(insert);
+
+    }
+
+    @Override
+    public DatabaseQueue getQueue(String name) {
+
+        Clause queueNameClause = QueryBuilder.eq(COLUMN_QUEUE_NAME, name);
+
+        Statement query = QueryBuilder.select().all().from(TABLE_QUEUES)
+                .where(queueNameClause);
+
+        Row row = cassandraClient.getSession().execute(query).one();
+
+        if(row == null){
+            return null;
+        }
+
+        final String queueName = row.getString(COLUMN_QUEUE_NAME);
+        final String regions = row.getString(COLUMN_REGIONS);
+        final String defaultDestinations = row.getString(COLUMN_DEFAULT_DESTINATIONS);
+        final long defaultDelayMs = row.getLong(COLUMN_DEFAULT_DELAY_MS);
+        final int retryCount = row.getInt(COLUMN_RETRY_COUNT);
+        final int handlingTimeoutSec = row.getInt(COLUMN_HANDLING_TIMEOUT_SEC);
+        final String deadLetterQueue = row.getString(COLUMN_DEAD_LETTER_QUEUE);
+
+        return new DatabaseQueue( queueName, regions, defaultDestinations, defaultDelayMs, retryCount,
+                handlingTimeoutSec, deadLetterQueue);
+
+    }
+
+    @Override
+    public void deleteQueue(String name) {
+
+        Clause queueNameClause = QueryBuilder.eq(COLUMN_QUEUE_NAME, name);
+
+        Statement delete = QueryBuilder.delete().from(TABLE_QUEUES)
+                .where(queueNameClause);
+
+        cassandraClient.getSession().execute(delete);
+    }
+
+    @Override
+    public List<String> getListOfQueues() {
+
+        Statement select = QueryBuilder.select().all().from( TABLE_QUEUES );
+        ResultSet rs = cassandraClient.getSession().execute( select );
+
+        return rs.all().stream()
+                .map( row -> row.getString( COLUMN_QUEUE_NAME ))
+                .collect( Collectors.toList() );
+    }
+
+
+    @Override
+    public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() {
+        return Collections.EMPTY_LIST;
+    }
+
+    @Override
+    public Collection<TableDefinition> getTables() {
+        return Collections.singletonList( new TableDefinitionStringImpl( "queues", CQL ) );
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/Shard.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/Shard.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/Shard.java
new file mode 100644
index 0000000..20c802d
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/Shard.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.sharding;
+
+
+import java.util.UUID;
+
+public class Shard {
+
+    public enum Type {
+
+        DEFAULT, INFLIGHT
+    }
+
+    private String queueName;
+    private String region;
+    private long shardId;
+    private Type type;
+    private UUID pointer;
+
+    public Shard(final String queueName, final String region, final Type type, final long shardId, UUID pointer){
+
+        this.queueName = queueName;
+        this.region = region;
+        this.type = type;
+        this.shardId = shardId;
+        this.pointer = pointer;
+
+    }
+
+    public String getQueueName() {
+        return queueName;
+    }
+
+    public String getRegion() {
+        return region;
+    }
+
+    public long getShardId() {
+        return shardId;
+    }
+
+    public Type getType() {
+        return type;
+    }
+
+    public UUID getPointer() {
+        return pointer;
+    }
+
+    public void setPointer(UUID pointer) {
+        this.pointer = pointer;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = queueName.hashCode();
+        result = ( 31 * result ) + region.hashCode();
+        result = ( 31 * result ) + (int)shardId;
+        result = ( 31 * result ) + type.hashCode();
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+
+        if( this == obj){
+            return true;
+        }
+
+        if( !(obj instanceof Shard)){
+            return false;
+        }
+
+        Shard that = (Shard) obj;
+
+        if( !this.queueName.equalsIgnoreCase(that.queueName)){
+            return false;
+        }
+        if( !this.region.equalsIgnoreCase(that.region)){
+            return false;
+        }
+        if( this.shardId != that.shardId){
+            return false;
+        }
+        if( !this.type.equals(that.type)){
+            return false;
+        }
+
+        return true;
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerialization.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerialization.java
new file mode 100644
index 0000000..c29c548
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerialization.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.qakka.serialization.sharding;
+
+import org.apache.usergrid.persistence.core.migration.schema.Migration;
+
+
+public interface ShardCounterSerialization extends Migration {
+
+    void incrementCounter(String queueName, Shard.Type type, long shardId, long increment);
+
+    long getCounterValue(String name, Shard.Type type, long shardId);
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIterator.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIterator.java
new file mode 100644
index 0000000..31e31ce
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIterator.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.sharding;
+
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.querybuilder.Clause;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.impl.ShardSerializationImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class ShardIterator implements Iterator<Shard> {
+
+    private static final Logger logger = LoggerFactory.getLogger( ShardIterator.class );
+
+    private final CassandraClient cassandraClient;
+
+    private final int PAGE_SIZE = 100;
+    private final String queueName;
+    private final String region;
+    private final Shard.Type shardType;
+    private final Optional<Long> shardId;
+
+    private Iterator<Shard> currentIterator;
+
+    private long nextStart = 0L;
+
+
+    public ShardIterator(
+            final CassandraClient cassandraClient,
+            final String queueName,
+            final String region,
+            final Shard.Type shardtype,
+            final Optional<Long> lastShardId){
+
+        this.queueName = queueName;
+        this.region = region;
+        this.shardType = shardtype;
+        this.shardId = lastShardId.isPresent() ? lastShardId : Optional.of(0L);
+        this.cassandraClient = cassandraClient;
+    }
+
+    @Override
+    public boolean hasNext() {
+
+        if(currentIterator == null || !currentIterator.hasNext()){
+            advance();
+        }
+
+        return currentIterator.hasNext();
+
+    }
+
+    @Override
+    public Shard next() {
+
+        if ( !hasNext() ) {
+            throw new NoSuchElementException( "No next shard exists" );
+        }
+
+        return currentIterator.next();
+
+    }
+
+    private void advance(){
+
+
+        Clause queueNameClause = QueryBuilder.eq( ShardSerializationImpl.COLUMN_QUEUE_NAME, queueName);
+        Clause regionClause = QueryBuilder.eq( ShardSerializationImpl.COLUMN_REGION, region);
+        Clause activeClause = QueryBuilder.eq( ShardSerializationImpl.COLUMN_ACTIVE, 1);
+        Clause shardIdClause;
+        if(nextStart == 0L && shardId.isPresent()){
+            shardIdClause = QueryBuilder.gt( ShardSerializationImpl.COLUMN_SHARD_ID, shardId.get());
+        }else if( nextStart == 0L && !shardId.isPresent()){
+            shardIdClause = QueryBuilder.gte( ShardSerializationImpl.COLUMN_SHARD_ID, 0L);
+
+        }else{
+            shardIdClause = QueryBuilder.gt( ShardSerializationImpl.COLUMN_SHARD_ID, nextStart);
+        }
+
+
+
+        Statement query = QueryBuilder.select().all().from(ShardSerializationImpl.getTableName(shardType))
+                .where(queueNameClause)
+                .and(regionClause)
+                .and(activeClause)
+                .and(shardIdClause)
+                .limit(PAGE_SIZE);
+
+        List<Row> rows = cassandraClient.getSession().execute(query).all();
+
+
+        currentIterator = getIteratorFromRows(rows);
+
+
+    }
+
+
+    private Iterator<Shard> getIteratorFromRows( List<Row> rows){
+
+        List<Shard> shards = new ArrayList<>(rows.size());
+
+        rows.forEach(row -> {
+
+            final String queueName = row.getString( ShardSerializationImpl.COLUMN_QUEUE_NAME);
+            final String region = row.getString( ShardSerializationImpl.COLUMN_REGION);
+            final long shardId = row.getLong( ShardSerializationImpl.COLUMN_SHARD_ID);
+            final UUID pointer = row.getUUID( ShardSerializationImpl.COLUMN_POINTER);
+
+            shards.add(new Shard(queueName, region, shardType, shardId, pointer));
+
+            nextStart = shardId;
+
+        });
+
+        return shards.iterator();
+
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerialization.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerialization.java
new file mode 100644
index 0000000..c0173ab
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerialization.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.sharding;
+
+import org.apache.usergrid.persistence.core.migration.schema.Migration;
+
+
+public interface ShardSerialization extends Migration {
+
+    void createShard(final Shard shard);
+
+    Shard loadShard(final Shard shard);
+
+    void deleteShard(final Shard shard);
+
+    void updateShardPointer(final Shard shard);
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardStrategy.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardStrategy.java
new file mode 100644
index 0000000..013f0b6
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardStrategy.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.sharding;
+
+import java.util.UUID;
+
+
+public interface ShardStrategy {
+
+    /**
+     * Select shard that should be used for the specified Queue Message.
+     * @param queueName Name of queue
+     * @param region    Region
+     * @param type      Indicates whether message is inflight or available
+     * @param pointer   Queue Message ID (must be Time-based)
+     */
+    Shard selectShard(String queueName, String region, Shard.Type type, UUID pointer);
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/PlaceholderShardStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/PlaceholderShardStrategy.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/PlaceholderShardStrategy.java
new file mode 100644
index 0000000..3ca79e6
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/PlaceholderShardStrategy.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.sharding.impl;
+
+
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardStrategy;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+
+// TODO: delete me!
+
+public class PlaceholderShardStrategy implements ShardStrategy {
+
+    private Map<String, Shard> shardMap = new HashMap<>();
+
+
+    @Override
+    public Shard selectShard(String queueName, String region, Shard.Type type, UUID pointer) {
+        String key = queueName + region + type;
+        shardMap.putIfAbsent( key, new Shard( queueName, region, type, 0L, pointer ) );
+        return shardMap.get( key );
+    }
+}