You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/01/28 23:21:53 UTC

[43/96] [abbrv] [partial] Change package namespace to org.apache.usergrid

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/ConsumerTransaction.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/ConsumerTransaction.java b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/ConsumerTransaction.java
new file mode 100644
index 0000000..5df1b79
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/ConsumerTransaction.java
@@ -0,0 +1,470 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.mq.cassandra.io;
+
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.usergrid.locking.Lock;
+import org.apache.usergrid.locking.LockManager;
+import org.apache.usergrid.locking.exception.UGLockException;
+import org.apache.usergrid.mq.Message;
+import org.apache.usergrid.mq.QueueQuery;
+import org.apache.usergrid.mq.QueueResults;
+import org.apache.usergrid.persistence.cassandra.CassandraService;
+import org.apache.usergrid.persistence.exceptions.QueueException;
+import org.apache.usergrid.persistence.exceptions.TransactionNotFoundException;
+import org.apache.usergrid.utils.UUIDUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import me.prettyprint.hector.api.Keyspace;
+import me.prettyprint.hector.api.beans.HColumn;
+import me.prettyprint.hector.api.mutation.Mutator;
+import me.prettyprint.hector.api.query.SliceQuery;
+import static me.prettyprint.hector.api.factory.HFactory.createColumn;
+import static me.prettyprint.hector.api.factory.HFactory.createMutator;
+import static me.prettyprint.hector.api.factory.HFactory.createSliceQuery;
+import static org.apache.usergrid.mq.cassandra.CassandraMQUtils.getConsumerId;
+import static org.apache.usergrid.mq.cassandra.CassandraMQUtils.getQueueClientTransactionKey;
+import static org.apache.usergrid.mq.cassandra.CassandraMQUtils.getQueueId;
+import static org.apache.usergrid.mq.cassandra.QueuesCF.CONSUMER_QUEUE_TIMEOUTS;
+
+
+/**
+ * Reads from the queue and starts a transaction
+ *
+ * @author tnine
+ */
+public class ConsumerTransaction extends NoTransactionSearch
+{
+
+    private static final Logger logger = LoggerFactory.getLogger( ConsumerTransaction.class );
+    private static final int MAX_READ = 10000;
+    private final LockManager lockManager;
+    private final UUID applicationId;
+    protected final CassandraService cass;
+
+    //timeout on reading lock
+    private final int lockTimeout;
+
+
+    /**
+     * @param ko
+     */
+    public ConsumerTransaction( UUID applicationId, Keyspace ko, LockManager lockManager, CassandraService cass, int lockTimeout )
+    {
+        super( ko );
+        this.applicationId = applicationId;
+        this.lockManager = lockManager;
+        this.cass = cass;
+        this.lockTimeout = lockTimeout;
+    }
+
+
+    /**
+     * Renew the existing transaction. Does so by deleting the exiting timeout, and replacing it with a new value
+     *
+     * @param queuePath The queue path
+     * @param transactionId The transaction id
+     * @param query The query params
+     *
+     * @return The new transaction uuid
+     */
+    public UUID renewTransaction( String queuePath, UUID transactionId, QueueQuery query )
+            throws TransactionNotFoundException
+    {
+        long now = System.currentTimeMillis();
+
+        if ( query == null )
+        {
+            query = new QueueQuery();
+        }
+
+        UUID queueId = getQueueId( queuePath );
+        UUID consumerId = getConsumerId( queueId, query );
+        ByteBuffer key = getQueueClientTransactionKey( queueId, consumerId );
+
+        // read the original transaction, if it's not there, then we can't possibly
+        // extend it
+        SliceQuery<ByteBuffer, UUID, UUID> q = createSliceQuery( ko, be, ue, ue );
+        q.setColumnFamily( CONSUMER_QUEUE_TIMEOUTS.getColumnFamily() );
+        q.setKey( key );
+        q.setColumnNames( transactionId );
+
+        HColumn<UUID, UUID> col = q.execute().get().getColumnByName( transactionId );
+
+        if ( col == null )
+        {
+            throw new TransactionNotFoundException(
+                    String.format( "No transaction with id %s exists", transactionId ) );
+        }
+
+        UUID origTrans = col.getName();
+        UUID messageId = col.getValue();
+
+        // Generate a new expiration and insert it
+        UUID expirationId = UUIDUtils.newTimeUUID( now + query.getTimeout() );
+
+        logger.debug( "Writing new timeout at '{}' for message '{}'", expirationId, messageId );
+
+        Mutator<ByteBuffer> mutator = createMutator( ko, be );
+
+        mutator.addInsertion( key, CONSUMER_QUEUE_TIMEOUTS.getColumnFamily(),
+                createColumn( expirationId, messageId, cass.createTimestamp(), ue, ue ) );
+
+        mutator.execute();
+
+        // now delete the old value
+        deleteTransaction( queueId, consumerId, origTrans );
+
+        return expirationId;
+    }
+
+
+    /** Delete the specified transaction */
+    public void deleteTransaction( String queuePath, UUID transactionId, QueueQuery query )
+    {
+
+        if ( query == null )
+        {
+            query = new QueueQuery();
+        }
+
+        UUID queueId = getQueueId( queuePath );
+        UUID consumerId = getConsumerId( queueId, query );
+
+        deleteTransaction( queueId, consumerId, transactionId );
+    }
+
+
+    /** Delete the specified transaction */
+    private void deleteTransaction( UUID queueId, UUID consumerId, UUID transactionId )
+    {
+
+        Mutator<ByteBuffer> mutator = createMutator( ko, be );
+        ByteBuffer key = getQueueClientTransactionKey( queueId, consumerId );
+
+        mutator.addDeletion( key, CONSUMER_QUEUE_TIMEOUTS.getColumnFamily(), transactionId, ue,
+                cass.createTimestamp() );
+
+        mutator.execute();
+    }
+
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.apache.usergrid.mq.cassandra.io.QueueSearch#getResults(java.lang.String,
+     * org.apache.usergrid.mq.QueueQuery)
+     */
+    @Override
+    public QueueResults getResults( String queuePath, QueueQuery query )
+    {
+
+        UUID queueId = getQueueId( queuePath );
+        UUID consumerId = getConsumerId( queueId, query );
+
+        if ( query.getLimit() > MAX_READ )
+        {
+            throw new IllegalArgumentException( String.format(
+                    "You specified a size of %d, you cannot specify a size larger than %d when using transations",
+                    query.getLimit( DEFAULT_READ ), MAX_READ ) );
+        }
+
+        QueueResults results = null;
+
+        Lock lock = lockManager.createLock( applicationId, queueId.toString(), consumerId.toString() );
+
+        try
+        {
+
+            //only try to get a lock with a timeout, if we can't bail
+            if(!lock.tryLock(lockTimeout, TimeUnit.SECONDS)){
+                throw new QueueException( "Unable to obtain a lock on queue '" + queuePath + "' after '" + lockTimeout + "'seconds" );
+            }
+
+            long startTime = System.currentTimeMillis();
+
+            UUID startTimeUUID = UUIDUtils.newTimeUUID( startTime, 0 );
+
+            QueueBounds bounds = getQueueBounds( queueId );
+
+            //queue has never been written to
+            if ( bounds == null )
+            {
+                return createResults( new ArrayList<Message>( 0 ), queuePath, queueId, consumerId );
+            }
+
+            // with transactional reads, we can't read into the future, set the bounds
+            // to be now
+            bounds = new QueueBounds( bounds.getOldest(), startTimeUUID );
+
+            SearchParam params = getParams( queueId, consumerId, query );
+
+            List<UUID> ids = getQueueRange( queueId, bounds, params );
+
+            // get a list of ids from the consumer.
+
+            List<TransactionPointer> pointers = getConsumerIds( queueId, consumerId, params, startTimeUUID );
+
+            TransactionPointer pointer = null;
+
+            int lastTransactionIndex = -1;
+
+            for ( int i = 0; i < pointers.size(); i++ )
+            {
+
+                pointer = pointers.get( i );
+
+                int insertIndex = Collections.binarySearch( ids, pointer.expiration );
+
+                // we're done, this message goes at the end, no point in continuing
+                // since
+                // we have our full result set
+                if ( insertIndex <= params.limit * -1 - 1 )
+                {
+                    break;
+                }
+
+                // get the insertion index into the set
+                insertIndex = ( insertIndex + 1 ) * -1;
+
+                ids.add( insertIndex, pointer.targetMessage );
+
+                lastTransactionIndex = i;
+            }
+
+            // now we've merge the results, trim them to size;
+            if ( ids.size() > params.limit )
+            {
+                ids = ids.subList( 0, params.limit );
+            }
+
+            // load the messages
+            List<Message> messages = loadMessages( ids, params.reversed );
+
+            // write our future timeouts for all these messages
+            writeTransactions( messages, query.getTimeout() + startTime, queueId, consumerId );
+
+            // remove all read transaction pointers
+            deleteTransactionPointers( pointers, lastTransactionIndex + 1, queueId, consumerId );
+
+            // return the results
+            results = createResults( messages, queuePath, queueId, consumerId );
+
+            UUID lastReadTransactionPointer =
+                    lastTransactionIndex == -1 ? null : pointers.get( lastTransactionIndex ).expiration;
+
+            UUID lastId = messages.size() == 0 ? null : messages.get( messages.size() - 1 ).getUuid();
+
+            // our last read id will either be the last read transaction pointer, or
+            // the
+            // last read messages uuid, whichever is greater
+            UUID lastReadId = UUIDUtils.max( lastReadTransactionPointer, lastId );
+
+            writeClientPointer( queueId, consumerId, lastReadId );
+        }
+        catch ( UGLockException e )
+        {
+            logger.debug( "Unable to acquire lock", e );
+            throw new QueueException( "Unable to acquire lock", e );
+        }
+        finally
+        {
+            try
+            {
+                lock.unlock();
+            }
+            catch ( UGLockException e )
+            {
+                logger.debug( "Unable to release lock", e );
+                throw new QueueException( "Unable to release lock", e );
+            }
+        }
+
+        return results;
+    }
+
+
+    /**
+     * Get all pending transactions that have timed out
+     *
+     * @param queueId The queue id
+     * @param consumerId The consumer id
+     * @param params The server params
+     * @param startTimeUUID The start time
+     */
+    protected List<TransactionPointer> getConsumerIds( UUID queueId, UUID consumerId, SearchParam params,
+                                                       UUID startTimeUUID )
+    {
+
+        SliceQuery<ByteBuffer, UUID, UUID> q = createSliceQuery( ko, be, ue, ue );
+        q.setColumnFamily( CONSUMER_QUEUE_TIMEOUTS.getColumnFamily() );
+        q.setKey( getQueueClientTransactionKey( queueId, consumerId ) );
+        q.setRange( params.startId, startTimeUUID, false, params.limit + 1 );
+
+        List<HColumn<UUID, UUID>> cassResults = q.execute().get().getColumns();
+
+        List<TransactionPointer> results = new ArrayList<TransactionPointer>( params.limit );
+
+        for ( HColumn<UUID, UUID> column : cassResults )
+        {
+
+            if ( logger.isDebugEnabled() )
+            {
+                logger.debug( "Adding uuid '{}' for original message '{}' to results for queue '{}' and consumer '{}'",
+                        new Object[] { column.getName(), column.getValue(), queueId, consumerId } );
+                logger.debug( "Max timeuuid : '{}', Current timeuuid : '{}', comparison '{}'", new Object[] {
+                        startTimeUUID, column.getName(), UUIDUtils.compare( startTimeUUID, column.getName() )
+                } );
+            }
+
+            results.add( new TransactionPointer( column.getName(), column.getValue() ) );
+        }
+
+        return results;
+    }
+
+
+    public boolean hasOutstandingTransactions( UUID queueId, UUID consumerId )
+    {
+        SliceQuery<ByteBuffer, UUID, UUID> q = createSliceQuery( ko, be, ue, ue );
+        q.setColumnFamily( CONSUMER_QUEUE_TIMEOUTS.getColumnFamily() );
+        q.setKey( getQueueClientTransactionKey( queueId, consumerId ) );
+        q.setRange( null, null, false, 1 );
+        return q.execute().get().getColumns().size() > 0;
+    }
+
+
+    /**
+     * Delete all re-read transaction pointers
+     *
+     * @param pointers The list of transaction pointers
+     * @param maxIndex The index to stop at (exclusive)
+     * @param queueId The queue id
+     * @param consumerId The consumer id
+     */
+    protected void deleteTransactionPointers( List<TransactionPointer> pointers, int maxIndex, UUID queueId,
+                                              UUID consumerId )
+    {
+
+        if ( maxIndex == 0 || pointers.size() == 0 )
+        {
+            return;
+        }
+
+        Mutator<ByteBuffer> mutator = createMutator( ko, be );
+        ByteBuffer key = getQueueClientTransactionKey( queueId, consumerId );
+
+        for ( int i = 0; i < maxIndex && i < pointers.size(); i++ )
+        {
+            UUID pointer = pointers.get( i ).expiration;
+
+            if ( logger.isDebugEnabled() )
+            {
+                logger.debug( "Removing transaction pointer '{}' for queue '{}' and consumer '{}'", new Object[] {
+                        pointer, queueId, consumerId
+                } );
+            }
+
+            mutator.addDeletion( key, CONSUMER_QUEUE_TIMEOUTS.getColumnFamily(), pointer, ue, cass.createTimestamp() );
+        }
+
+        mutator.execute();
+    }
+
+
+    /**
+     * Write the transaction timeouts
+     *
+     * @param messages The messages to load
+     * @param futureTimeout The time these message should expire
+     * @param queueId The queue UUId
+     * @param consumerId The consumer Id
+     */
+    protected void writeTransactions( List<Message> messages, final long futureTimeout, UUID queueId, UUID consumerId )
+    {
+
+        Mutator<ByteBuffer> mutator = createMutator( ko, be );
+
+        ByteBuffer key = getQueueClientTransactionKey( queueId, consumerId );
+
+        int counter = 0;
+
+        long time = cass.createTimestamp();
+
+        for ( Message message : messages )
+        {
+            // note we're not incrementing futureSnapshot on purpose. The uuid
+            // generation should give us a sequenced unique ID for each response, even
+            // if the
+            // time is the same since we increment the counter. If we read more than
+            // 10k messages in a single transaction, our millisecond will roll to the
+            // next due to 10k being the max amount of 1/10 microsecond headroom. Not
+            // possible to avoid this given the way time uuids are encoded.
+            UUID expirationId = UUIDUtils.newTimeUUID( futureTimeout, counter );
+            UUID messageId = message.getUuid();
+
+            logger.debug( "Writing new timeout at '{}' for message '{}'", expirationId, messageId );
+
+            mutator.addInsertion( key, CONSUMER_QUEUE_TIMEOUTS.getColumnFamily(),
+                    createColumn( expirationId, messageId, time, ue, ue ) );
+
+            // add the transactionid to the message
+            message.setTransaction( expirationId );
+            counter++;
+        }
+
+        mutator.execute();
+    }
+
+
+    private static class TransactionPointer
+    {
+        private UUID expiration;
+        private UUID targetMessage;
+
+
+        /**
+         * @param expiration
+         * @param targetMessage
+         */
+        public TransactionPointer( UUID expiration, UUID targetMessage )
+        {
+            super();
+            this.expiration = expiration;
+            this.targetMessage = targetMessage;
+        }
+
+
+        /*
+         * (non-Javadoc)
+         *
+         * @see java.lang.Object#toString()
+         */
+        @Override
+        public String toString()
+        {
+            return "TransactionPointer [expiration=" + expiration + ", targetMessage=" + targetMessage + "]";
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/EndSearch.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/EndSearch.java b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/EndSearch.java
new file mode 100644
index 0000000..fd5497f
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/EndSearch.java
@@ -0,0 +1,69 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.mq.cassandra.io;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.mq.QueueQuery;
+
+import me.prettyprint.hector.api.Keyspace;
+
+
+/**
+ * Reads from the queue without starting transactions.
+ *
+ * @author tnine
+ */
+public class EndSearch extends NoTransactionSearch
+{
+
+    /**
+     * @param ko
+     * @param cassTimestamp
+     */
+    public EndSearch( Keyspace ko )
+    {
+        super( ko );
+    } /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.usergrid.mq.cassandra.io.NoTransaction#getParams(java.util.UUID,
+     * java.util.UUID, org.apache.usergrid.mq.QueueQuery)
+     */
+
+
+    @Override
+    protected SearchParam getParams( UUID queueId, UUID consumerId, QueueQuery query )
+    {
+        UUID lastMessageId = query.getLastMessageId();
+        return new SearchParam( lastMessageId, true, lastMessageId != null, query.getLimit( DEFAULT_READ ) );
+    }
+
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see
+     * org.apache.usergrid.mq.cassandra.io.FifoSearch#writeClientPointer(java.util.UUID,
+     * java.util.UUID, java.util.UUID)
+     */
+    @Override
+    protected void writeClientPointer( UUID queueId, UUID consumerId, UUID lastReturnedId )
+    {
+        // no op for searches from the end
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/FilterSearch.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/FilterSearch.java b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/FilterSearch.java
new file mode 100644
index 0000000..42332a8
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/FilterSearch.java
@@ -0,0 +1,258 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.mq.cassandra.io;
+
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import org.apache.usergrid.mq.Message;
+import org.apache.usergrid.mq.QueryProcessor;
+import org.apache.usergrid.mq.QueueQuery;
+import org.apache.usergrid.mq.QueueResults;
+import org.apache.usergrid.mq.QueryProcessor.QuerySlice;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.uuid.UUIDComparator;
+
+import me.prettyprint.hector.api.Keyspace;
+import me.prettyprint.hector.api.beans.AbstractComposite.ComponentEquality;
+import me.prettyprint.hector.api.beans.DynamicComposite;
+import me.prettyprint.hector.api.beans.HColumn;
+import static me.prettyprint.hector.api.factory.HFactory.createSliceQuery;
+import static org.apache.usergrid.mq.Queue.getQueueId;
+import static org.apache.usergrid.mq.cassandra.CassandraMQUtils.getConsumerId;
+import static org.apache.usergrid.mq.cassandra.QueueManagerImpl.DEFAULT_SEARCH_COUNT;
+import static org.apache.usergrid.mq.cassandra.QueueManagerImpl.QUEUE_SHARD_INTERVAL;
+import static org.apache.usergrid.mq.cassandra.QueuesCF.PROPERTY_INDEX;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.key;
+import static org.apache.usergrid.utils.CompositeUtils.setEqualityFlag;
+import static org.apache.usergrid.utils.ConversionUtils.bytebuffer;
+import static org.apache.usergrid.utils.NumberUtils.roundLong;
+import static org.apache.usergrid.utils.UUIDUtils.getTimestampInMillis;
+
+
+/**
+ * Searches in the queue without transactions
+ *
+ * @author tnine
+ */
+public class FilterSearch extends NoTransactionSearch
+{
+
+    private static final Logger logger = LoggerFactory.getLogger( FilterSearch.class );
+
+
+    /**
+     *
+     */
+    public FilterSearch( Keyspace ko )
+    {
+        super( ko );
+    }
+
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.apache.usergrid.mq.cassandra.io.QueueSearch#getResults(java.lang.String,
+     * org.apache.usergrid.mq.QueueQuery)
+     */
+    @Override
+    public QueueResults getResults( String queuePath, QueueQuery query )
+    {
+
+        QueryProcessor qp = new QueryProcessor( query );
+        List<QuerySlice> slices = qp.getSlices();
+
+        long limit = query.getLimit();
+
+        UUID queueId = getQueueId( queuePath );
+        UUID consumerId = getConsumerId( queueId, query );
+        QueueBounds bounds = getQueueBounds( queueId );
+
+        UUIDComparator comparator = query.isReversed() ? new ReverseUUIDComparator() : new UUIDComparator();
+
+        SortedSet<UUID> merged = null;
+
+        for ( QuerySlice slice : slices )
+        {
+            SortedSet<UUID> results =
+                    searchQueueRange( ko, queueId, bounds, slice, query.getLastMessageId(), query.isReversed(),
+                            comparator );
+
+            if ( merged == null )
+            {
+                merged = results;
+            }
+            else
+            {
+                merged.retainAll( results );
+            }
+        }
+
+        // now trim. Not efficient, but when indexing is updated, seeking will
+        // change, so I'm not worried about this.
+        if ( merged.size() > limit )
+        {
+            Iterator<UUID> current = merged.iterator();
+            UUID max = null;
+
+
+            for ( int i = 0; i <= limit && current.hasNext(); i++ )
+            {
+                max = current.next();
+            }
+
+            merged = merged.headSet( max );
+        }
+
+        List<Message> messages = loadMessages( merged, query.isReversed() );
+
+        QueueResults results = createResults( messages, queuePath, queueId, consumerId );
+
+        return results;
+    }
+
+
+    public SortedSet<UUID> searchQueueRange( Keyspace ko, UUID queueId, QueueBounds bounds, QuerySlice slice, UUID last,
+                                             boolean reversed, UUIDComparator comparator )
+    {
+
+        TreeSet<UUID> uuid_set = new TreeSet<UUID>( comparator );
+
+        if ( bounds == null )
+        {
+            logger.error( "Necessary queue bounds not found" );
+            return uuid_set;
+        }
+
+        UUID start_uuid = reversed ? bounds.getNewest() : bounds.getOldest();
+
+        UUID finish_uuid = reversed ? bounds.getOldest() : bounds.getNewest();
+
+        if ( last != null )
+        {
+            start_uuid = last;
+        }
+
+
+        if ( finish_uuid == null )
+        {
+            logger.error( "No last message in queue" );
+            return uuid_set;
+        }
+
+        long start_ts_shard = roundLong( getTimestampInMillis( start_uuid ), QUEUE_SHARD_INTERVAL );
+
+        long finish_ts_shard = roundLong( getTimestampInMillis( finish_uuid ), QUEUE_SHARD_INTERVAL );
+
+        long current_ts_shard = start_ts_shard;
+
+        if ( reversed )
+        {
+            current_ts_shard = finish_ts_shard;
+        }
+
+        ByteBuffer start = null;
+        if ( slice.getCursor() != null )
+        {
+            start = slice.getCursor();
+        }
+        else if ( slice.getStart() != null )
+        {
+            DynamicComposite s = new DynamicComposite( slice.getStart().getCode(), slice.getStart().getValue() );
+            if ( !slice.getStart().isInclusive() )
+            {
+                setEqualityFlag( s, ComponentEquality.GREATER_THAN_EQUAL );
+            }
+            start = s.serialize();
+        }
+
+        ByteBuffer finish = null;
+        if ( slice.getFinish() != null )
+        {
+            DynamicComposite f = new DynamicComposite( slice.getFinish().getCode(), slice.getFinish().getValue() );
+            if ( slice.getFinish().isInclusive() )
+            {
+                setEqualityFlag( f, ComponentEquality.GREATER_THAN_EQUAL );
+            }
+            finish = f.serialize();
+        }
+
+        while ( ( current_ts_shard >= start_ts_shard ) && ( current_ts_shard <= finish_ts_shard ) && ( uuid_set.size()
+                < DEFAULT_SEARCH_COUNT ) )
+        {
+
+            while ( true )
+            {
+                List<HColumn<ByteBuffer, ByteBuffer>> results =
+                        createSliceQuery( ko, be, be, be ).setColumnFamily( PROPERTY_INDEX.getColumnFamily() )
+                                .setKey( bytebuffer( key( queueId, current_ts_shard, slice.getPropertyName() ) ) )
+                                .setRange( start, finish, false, DEFAULT_SEARCH_COUNT ).execute().get().getColumns();
+
+                for ( HColumn<ByteBuffer, ByteBuffer> column : results )
+                {
+                    DynamicComposite c = DynamicComposite.fromByteBuffer( column.getName().duplicate() );
+                    UUID uuid = c.get( 2, ue );
+
+                    uuid_set.add( uuid );
+                }
+
+                if ( results.size() < DEFAULT_SEARCH_COUNT )
+                {
+                    break;
+                }
+
+                start = results.get( results.size() - 1 ).getName().duplicate();
+            }
+
+            if ( reversed )
+            {
+                current_ts_shard -= QUEUE_SHARD_INTERVAL;
+            }
+            else
+            {
+                current_ts_shard += QUEUE_SHARD_INTERVAL;
+            }
+        }
+
+        // trim the results
+        return uuid_set.headSet( finish_uuid ).tailSet( start_uuid );
+    }
+
+
+    private static class ReverseUUIDComparator extends UUIDComparator
+    {
+
+        /*
+         * (non-Javadoc)
+         *
+         * @see com.fasterxml.uuid.UUIDComparator#compare(java.util.UUID,
+         * java.util.UUID)
+         */
+        @Override
+        public int compare( UUID u1, UUID u2 )
+        {
+            return super.compare( u1, u2 ) * -1;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/NoTransactionSearch.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/NoTransactionSearch.java b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/NoTransactionSearch.java
new file mode 100644
index 0000000..5ca2eb4
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/NoTransactionSearch.java
@@ -0,0 +1,131 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.mq.cassandra.io;
+
+
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.usergrid.mq.Message;
+import org.apache.usergrid.mq.QueueQuery;
+import org.apache.usergrid.mq.QueueResults;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import me.prettyprint.hector.api.Keyspace;
+import static org.apache.usergrid.mq.cassandra.CassandraMQUtils.getConsumerId;
+import static org.apache.usergrid.mq.cassandra.CassandraMQUtils.getQueueId;
+
+
+/**
+ * Reads from the queue without starting transactions.
+ *
+ * @author tnine
+ */
+public class NoTransactionSearch extends AbstractSearch
+{
+
+    private static final Logger logger = LoggerFactory.getLogger( NoTransactionSearch.class );
+
+    protected static final int DEFAULT_READ = 1;
+
+
+    /**
+     * @param ko
+     */
+    public NoTransactionSearch( Keyspace ko )
+    {
+        super( ko );
+    }
+
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.apache.usergrid.mq.cassandra.io.QueueSearch#getResults(java.lang.String,
+     * org.apache.usergrid.mq.QueueQuery)
+     */
+    @Override
+    public QueueResults getResults( String queuePath, QueueQuery query )
+    {
+
+        UUID queueId = getQueueId( queuePath );
+        UUID consumerId = getConsumerId( queueId, query );
+        QueueBounds bounds = getQueueBounds( queueId );
+        SearchParam params = getParams( queueId, consumerId, query );
+
+        List<UUID> ids = getIds( queueId, consumerId, bounds, params );
+
+        List<Message> messages = loadMessages( ids, params.reversed );
+
+        QueueResults results = createResults( messages, queuePath, queueId, consumerId );
+
+        writeClientPointer( queueId, consumerId, results.getLast() );
+
+        return results;
+    }
+
+
+    /** Get information for reading no transactionally from the query. Subclasses can override this behavior */
+    protected SearchParam getParams( UUID queueId, UUID consumerId, QueueQuery query )
+    {
+        UUID lastReadMessageId = getConsumerQueuePosition( queueId, consumerId );
+
+        if ( logger.isDebugEnabled() )
+        {
+            logger.debug( "Last message id is '{}' for queueId '{}' and clientId '{}'",
+                    new Object[] { lastReadMessageId, queueId, consumerId } );
+        }
+
+        return new SearchParam( lastReadMessageId, false, lastReadMessageId != null, query.getLimit( DEFAULT_READ ) );
+    }
+
+
+    /** Get the list of ids we should load and return to the client. Message ids should be in order on return */
+    protected List<UUID> getIds( UUID queueId, UUID consumerId, QueueBounds bounds, SearchParam params )
+    {
+        return getQueueRange( queueId, bounds, params );
+    }
+
+
+    protected static class SearchParam
+    {
+
+        /** The uuid to start seeking from */
+        protected final UUID startId;
+
+        /** true if we should seek from high to low */
+        protected final boolean reversed;
+        /** The number of results to include */
+        protected final int limit;
+        /** true if the first result should be skipped. Useful for paging from a previous result */
+        protected final boolean skipFirst;
+
+
+        /**
+         * @param startId
+         * @param reversed
+         * @param count
+         */
+        public SearchParam( UUID startId, boolean reversed, boolean skipFirst, int count )
+        {
+            this.startId = startId;
+            this.reversed = reversed;
+            this.skipFirst = skipFirst;
+            this.limit = count;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/QueueBounds.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/QueueBounds.java b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/QueueBounds.java
new file mode 100644
index 0000000..6e7cf4b
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/QueueBounds.java
@@ -0,0 +1,91 @@
+package org.apache.usergrid.mq.cassandra.io;
+
+
+import java.util.UUID;
+
+
+public class QueueBounds
+{
+
+    private final UUID oldest;
+    private final UUID newest;
+
+
+    public QueueBounds( UUID oldest, UUID newest )
+    {
+        this.oldest = oldest;
+        this.newest = newest;
+    }
+
+
+    public UUID getOldest()
+    {
+        return oldest;
+    }
+
+
+    public UUID getNewest()
+    {
+        return newest;
+    }
+
+
+    @Override
+    public int hashCode()
+    {
+        final int prime = 31;
+        int result = 1;
+        result = ( prime * result ) + ( ( newest == null ) ? 0 : newest.hashCode() );
+        result = ( prime * result ) + ( ( oldest == null ) ? 0 : oldest.hashCode() );
+        return result;
+    }
+
+
+    @Override
+    public boolean equals( Object obj )
+    {
+        if ( this == obj )
+        {
+            return true;
+        }
+        if ( obj == null )
+        {
+            return false;
+        }
+        if ( getClass() != obj.getClass() )
+        {
+            return false;
+        }
+        QueueBounds other = ( QueueBounds ) obj;
+        if ( newest == null )
+        {
+            if ( other.newest != null )
+            {
+                return false;
+            }
+        }
+        else if ( !newest.equals( other.newest ) )
+        {
+            return false;
+        }
+        if ( oldest == null )
+        {
+            if ( other.oldest != null )
+            {
+                return false;
+            }
+        }
+        else if ( !oldest.equals( other.oldest ) )
+        {
+            return false;
+        }
+        return true;
+    }
+
+
+    @Override
+    public String toString()
+    {
+        return "QueueBounds [oldest=" + oldest + ", newest=" + newest + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/QueueSearch.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/QueueSearch.java b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/QueueSearch.java
new file mode 100644
index 0000000..c12afe6
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/QueueSearch.java
@@ -0,0 +1,29 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.mq.cassandra.io;
+
+
+import org.apache.usergrid.mq.QueueQuery;
+import org.apache.usergrid.mq.QueueResults;
+
+
+/** @author tnine */
+public interface QueueSearch
+{
+
+    /** Get the results for this queue search */
+    public QueueResults getResults( String queuePath, QueueQuery query );
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/StartSearch.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/StartSearch.java b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/StartSearch.java
new file mode 100644
index 0000000..370f841
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/StartSearch.java
@@ -0,0 +1,68 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.mq.cassandra.io;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.mq.QueueQuery;
+
+import me.prettyprint.hector.api.Keyspace;
+
+
+/**
+ * Reads from the queue without starting transactions.
+ *
+ * @author tnine
+ */
+public class StartSearch extends NoTransactionSearch
+{
+
+    /**
+     * @param ko
+     * @param cassTimestamp
+     */
+    public StartSearch( Keyspace ko )
+    {
+        super( ko );
+    }
+
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.apache.usergrid.mq.cassandra.io.NoTransaction#getParams(java.util.UUID,
+     * java.util.UUID, org.apache.usergrid.mq.QueueQuery)
+     */
+    @Override
+    protected SearchParam getParams( UUID queueId, UUID consumerId, QueueQuery query )
+    {
+        UUID lastMessageId = query.getLastMessageId();
+
+
+        return new SearchParam( lastMessageId, false, lastMessageId != null, query.getLimit( DEFAULT_READ ) );
+    }
+
+
+    /* (non-Javadoc)
+     * @see org.apache.usergrid.mq.cassandra.io.FifoSearch#writeClientPointer(java.util.UUID, java.util.UUID, java.util.UUID)
+     */
+    @Override
+    protected void writeClientPointer( UUID queueId, UUID consumerId, UUID lastReturnedId )
+    {
+        //no op for searches from the start
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/AbstractEntity.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/AbstractEntity.java b/stack/core/src/main/java/org/apache/usergrid/persistence/AbstractEntity.java
new file mode 100644
index 0000000..91d9da2
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/AbstractEntity.java
@@ -0,0 +1,342 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence;
+
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.usergrid.persistence.annotations.EntityProperty;
+import org.codehaus.jackson.annotate.JsonAnyGetter;
+import org.codehaus.jackson.annotate.JsonAnySetter;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion;
+
+import static org.apache.usergrid.persistence.Schema.PROPERTY_NAME;
+
+
+/**
+ * The abstract superclass implementation of the Entity interface.
+ *
+ * @author edanuff
+ */
+@XmlRootElement
+public abstract class AbstractEntity implements Entity {
+
+    protected UUID uuid;
+
+    protected Long created;
+
+    protected Long modified;
+
+    protected Map<String, Object> dynamic_properties = new TreeMap<String, Object>( String.CASE_INSENSITIVE_ORDER );
+
+    protected Map<String, Set<Object>> dynamic_sets = new TreeMap<String, Set<Object>>( String.CASE_INSENSITIVE_ORDER );
+
+
+    @Override
+    @EntityProperty(required = true, mutable = false, basic = true, indexed = false)
+    @JsonSerialize(include = Inclusion.NON_NULL)
+    public UUID getUuid() {
+        return uuid;
+    }
+
+
+    @Override
+    public void setUuid( UUID uuid ) {
+        this.uuid = uuid;
+    }
+
+
+    @Override
+    @EntityProperty(required = true, mutable = false, basic = true, indexed = false)
+    public String getType() {
+        return Schema.getDefaultSchema().getEntityType( this.getClass() );
+    }
+
+
+    @Override
+    public void setType( String type ) {
+    }
+
+
+    @Override
+    @EntityProperty(indexed = true, required = true, mutable = false)
+    @JsonSerialize(include = Inclusion.NON_NULL)
+    public Long getCreated() {
+        return created;
+    }
+
+
+    @Override
+    public void setCreated( Long created ) {
+        if ( created == null ) {
+            created = System.currentTimeMillis();
+        }
+        this.created = created;
+    }
+
+
+    @Override
+    @EntityProperty(indexed = true, required = true, mutable = true)
+    @JsonSerialize(include = Inclusion.NON_NULL)
+    public Long getModified() {
+        return modified;
+    }
+
+
+    @Override
+    public void setModified( Long modified ) {
+        if ( modified == null ) {
+            modified = System.currentTimeMillis();
+        }
+        this.modified = modified;
+    }
+
+
+    @Override
+    @JsonSerialize(include = Inclusion.NON_NULL)
+    public String getName() {
+        Object value = getProperty( PROPERTY_NAME );
+
+        if ( value instanceof UUID ) {
+            // fixes existing data that uses UUID in USERGRID-2099
+            return value.toString();
+        }
+
+        return ( String ) value;
+    }
+
+
+    @Override
+    @JsonIgnore
+    public Map<String, Object> getProperties() {
+        return Schema.getDefaultSchema().getEntityProperties( this );
+    }
+
+
+    @Override
+    public final Object getProperty( String propertyName ) {
+        return Schema.getDefaultSchema().getEntityProperty( this, propertyName );
+    }
+
+
+    @Override
+    public final void setProperty( String propertyName, Object propertyValue ) {
+        Schema.getDefaultSchema().setEntityProperty( this, propertyName, propertyValue );
+    }
+
+
+    @Override
+    public void setProperties( Map<String, Object> properties ) {
+        dynamic_properties = new TreeMap<String, Object>( String.CASE_INSENSITIVE_ORDER );
+        addProperties( properties );
+    }
+
+
+    @Override
+    public void addProperties( Map<String, Object> properties ) {
+        if ( properties == null ) {
+            return;
+        }
+        for ( Entry<String, Object> entry : properties.entrySet() ) {
+            setProperty( entry.getKey(), entry.getValue() );
+        }
+    }
+
+
+    @Override
+    @JsonSerialize(include = Inclusion.NON_NULL)
+    public Object getMetadata( String key ) {
+        return getDataset( "metadata", key );
+    }
+
+
+    @Override
+    public void setMetadata( String key, Object value ) {
+        setDataset( "metadata", key, value );
+    }
+
+
+    @Override
+    public void mergeMetadata( Map<String, Object> new_metadata ) {
+        mergeDataset( "metadata", new_metadata );
+    }
+
+
+    @Override
+    public void clearMetadata() {
+        clearDataset( "metadata" );
+    }
+
+
+    public <T> T getDataset( String property, String key ) {
+        Object md = dynamic_properties.get( property );
+        if ( md == null ) {
+            return null;
+        }
+        if ( !( md instanceof Map<?, ?> ) ) {
+            return null;
+        }
+        @SuppressWarnings("unchecked") Map<String, T> metadata = ( Map<String, T> ) md;
+        return metadata.get( key );
+    }
+
+
+    public <T> void setDataset( String property, String key, T value ) {
+        if ( key == null ) {
+            return;
+        }
+        Object md = dynamic_properties.get( property );
+        if ( !( md instanceof Map<?, ?> ) ) {
+            md = new HashMap<String, T>();
+            dynamic_properties.put( property, md );
+        }
+        @SuppressWarnings("unchecked") Map<String, T> metadata = ( Map<String, T> ) md;
+        metadata.put( key, value );
+    }
+
+
+    public <T> void mergeDataset( String property, Map<String, T> new_metadata ) {
+        Object md = dynamic_properties.get( property );
+        if ( !( md instanceof Map<?, ?> ) ) {
+            md = new HashMap<String, T>();
+            dynamic_properties.put( property, md );
+        }
+        @SuppressWarnings("unchecked") Map<String, T> metadata = ( Map<String, T> ) md;
+        metadata.putAll( new_metadata );
+    }
+
+
+    public void clearDataset( String property ) {
+        dynamic_properties.remove( property );
+    }
+
+
+    @Override
+    public List<Entity> getCollections( String key ) {
+        return getDataset( "collections", key );
+    }
+
+
+    @Override
+    public void setCollections( String key, List<Entity> results ) {
+        setDataset( "collections", key, results );
+    }
+
+
+    @Override
+    public List<Entity> getConnections( String key ) {
+        return getDataset( "connections", key );
+    }
+
+
+    @Override
+    public void setConnections( String key, List<Entity> results ) {
+        setDataset( "connections", key, results );
+    }
+
+
+    @Override
+    public String toString() {
+        return "Entity(" + getProperties() + ")";
+    }
+
+
+    @Override
+    @JsonAnySetter
+    public void setDynamicProperty( String key, Object value ) {
+        dynamic_properties.put( key, value );
+    }
+
+
+    @Override
+    @JsonAnyGetter
+    public Map<String, Object> getDynamicProperties() {
+        return dynamic_properties;
+    }
+
+
+    @Override
+    public final int compareTo( Entity o ) {
+        if ( o == null ) {
+            return 1;
+        }
+        try {
+            long t1 = getUuid().timestamp();
+            long t2 = o.getUuid().timestamp();
+            return ( t1 < t2 ) ? -1 : ( t1 == t2 ) ? 0 : 1;
+        }
+        catch ( UnsupportedOperationException e ) {
+        }
+        return getUuid().compareTo( o.getUuid() );
+    }
+
+
+    @Override
+    public Entity toTypedEntity() {
+        Entity entity = EntityFactory.newEntity( getUuid(), getType() );
+        entity.setProperties( getProperties() );
+        return entity;
+    }
+
+
+    /* (non-Javadoc)
+     * @see java.lang.Object#hashCode()
+     */
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ( ( uuid == null ) ? 0 : uuid.hashCode() );
+        return result;
+    }
+
+
+    /* (non-Javadoc)
+     * @see java.lang.Object#equals(java.lang.Object)
+     */
+    @Override
+    public boolean equals( Object obj ) {
+        if ( this == obj ) {
+            return true;
+        }
+        if ( obj == null ) {
+            return false;
+        }
+        if ( getClass() != obj.getClass() ) {
+            return false;
+        }
+        AbstractEntity other = ( AbstractEntity ) obj;
+        if ( uuid == null ) {
+            if ( other.uuid != null ) {
+                return false;
+            }
+        }
+        else if ( !uuid.equals( other.uuid ) ) {
+            return false;
+        }
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/AggregateCounter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/AggregateCounter.java b/stack/core/src/main/java/org/apache/usergrid/persistence/AggregateCounter.java
new file mode 100644
index 0000000..b82c551
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/AggregateCounter.java
@@ -0,0 +1,48 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence;
+
+
+public class AggregateCounter {
+    private long timestamp;
+    private long value;
+
+
+    public AggregateCounter( long timestamp, long value ) {
+        this.timestamp = timestamp;
+        this.value = value;
+    }
+
+
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+
+    public void setTimestamp( long timestamp ) {
+        this.timestamp = timestamp;
+    }
+
+
+    public long getValue() {
+        return value;
+    }
+
+
+    public void setValue( long value ) {
+        this.value = value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/AggregateCounterSet.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/AggregateCounterSet.java b/stack/core/src/main/java/org/apache/usergrid/persistence/AggregateCounterSet.java
new file mode 100644
index 0000000..c14d043
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/AggregateCounterSet.java
@@ -0,0 +1,116 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence;
+
+
+import java.util.List;
+import java.util.UUID;
+
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion;
+
+
+public class AggregateCounterSet {
+    private String name;
+    private UUID user;
+    private UUID group;
+    private UUID queue;
+    private String category;
+    private List<AggregateCounter> values;
+
+
+    public AggregateCounterSet( String name, UUID user, UUID group, String category, List<AggregateCounter> values ) {
+        this.name = name;
+        this.user = user;
+        this.group = group;
+        this.category = category;
+        this.values = values;
+    }
+
+
+    public AggregateCounterSet( String name, UUID queue, String category, List<AggregateCounter> values ) {
+        this.name = name;
+        setQueue( queue );
+        this.category = category;
+        this.values = values;
+    }
+
+
+    @JsonSerialize(include = Inclusion.NON_NULL)
+    public UUID getUser() {
+        return user;
+    }
+
+
+    public void setUser( UUID user ) {
+        this.user = user;
+    }
+
+
+    @JsonSerialize(include = Inclusion.NON_NULL)
+    public UUID getGroup() {
+        return group;
+    }
+
+
+    public void setGroup( UUID group ) {
+        this.group = group;
+    }
+
+
+    @JsonSerialize(include = Inclusion.NON_NULL)
+    public String getCategory() {
+        return category;
+    }
+
+
+    public void setCategory( String category ) {
+        this.category = category;
+    }
+
+
+    @JsonSerialize(include = Inclusion.NON_NULL)
+    public String getName() {
+        return name;
+    }
+
+
+    public void setName( String name ) {
+        this.name = name;
+    }
+
+
+    @JsonSerialize(include = Inclusion.NON_NULL)
+    public List<AggregateCounter> getValues() {
+        return values;
+    }
+
+
+    public void setValues( List<AggregateCounter> values ) {
+        this.values = values;
+    }
+
+
+    @JsonSerialize(include = Inclusion.NON_NULL)
+    public UUID getQueue() {
+        return queue;
+    }
+
+
+    public void setQueue( UUID queue ) {
+        this.queue = queue;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/AssociatedEntityRef.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/AssociatedEntityRef.java b/stack/core/src/main/java/org/apache/usergrid/persistence/AssociatedEntityRef.java
new file mode 100644
index 0000000..aceaf81
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/AssociatedEntityRef.java
@@ -0,0 +1,21 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence;
+
+
+public interface AssociatedEntityRef extends EntityRef {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/CollectionRef.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/CollectionRef.java b/stack/core/src/main/java/org/apache/usergrid/persistence/CollectionRef.java
new file mode 100644
index 0000000..4a560b7
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/CollectionRef.java
@@ -0,0 +1,26 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence;
+
+
+public interface CollectionRef extends AssociatedEntityRef {
+
+    public EntityRef getOwnerEntity();
+
+    public String getCollectionName();
+
+    public EntityRef getItemRef();
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/ConnectedEntityRef.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/ConnectedEntityRef.java b/stack/core/src/main/java/org/apache/usergrid/persistence/ConnectedEntityRef.java
new file mode 100644
index 0000000..c551ac7
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/ConnectedEntityRef.java
@@ -0,0 +1,22 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence;
+
+
+public interface ConnectedEntityRef extends EntityRef {
+
+    public String getConnectionType();
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/ConnectionRef.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/ConnectionRef.java b/stack/core/src/main/java/org/apache/usergrid/persistence/ConnectionRef.java
new file mode 100644
index 0000000..9a5340c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/ConnectionRef.java
@@ -0,0 +1,34 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence;
+
+
+import java.util.List;
+
+
+/**
+ * Connection tuple
+ *
+ * @author edanuff
+ */
+public interface ConnectionRef extends ConnectedEntityRef, AssociatedEntityRef {
+
+    public EntityRef getConnectingEntity();
+
+    public List<ConnectedEntityRef> getPairedConnections();
+
+    public ConnectedEntityRef getConnectedEntity();
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/CounterQuery.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/CounterQuery.java b/stack/core/src/main/java/org/apache/usergrid/persistence/CounterQuery.java
new file mode 100644
index 0000000..ea4e017
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/CounterQuery.java
@@ -0,0 +1,313 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.usergrid.persistence.Query.CounterFilterPredicate;
+import org.apache.usergrid.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.usergrid.utils.ClassUtils.cast;
+import static org.apache.usergrid.utils.ListUtils.firstBoolean;
+import static org.apache.usergrid.utils.ListUtils.firstInteger;
+import static org.apache.usergrid.utils.ListUtils.firstLong;
+import static org.apache.usergrid.utils.ListUtils.isEmpty;
+import static org.apache.usergrid.utils.MapUtils.toMapList;
+
+
+public class CounterQuery {
+
+    public static final Logger logger = LoggerFactory.getLogger( CounterQuery.class );
+
+    public static final int DEFAULT_MAX_RESULTS = 10;
+
+    private int limit = 0;
+    boolean limitSet = false;
+
+    private Long startTime;
+    private Long finishTime;
+    private boolean pad;
+    private CounterResolution resolution = CounterResolution.ALL;
+    private List<String> categories;
+    private List<CounterFilterPredicate> counterFilters;
+
+
+    public CounterQuery() {
+    }
+
+
+    public CounterQuery( CounterQuery q ) {
+        if ( q != null ) {
+            limit = q.limit;
+            limitSet = q.limitSet;
+            startTime = q.startTime;
+            finishTime = q.finishTime;
+            resolution = q.resolution;
+            pad = q.pad;
+            categories = q.categories != null ? new ArrayList<String>( q.categories ) : null;
+            counterFilters =
+                    q.counterFilters != null ? new ArrayList<CounterFilterPredicate>( q.counterFilters ) : null;
+        }
+    }
+
+
+    public static CounterQuery newQueryIfNull( CounterQuery query ) {
+        if ( query == null ) {
+            query = new CounterQuery();
+        }
+        return query;
+    }
+
+
+    public static CounterQuery fromJsonString( String json ) {
+        Object o = JsonUtils.parse( json );
+        if ( o instanceof Map ) {
+            @SuppressWarnings({ "unchecked", "rawtypes" }) Map<String, List<String>> params =
+                    cast( toMapList( ( Map ) o ) );
+            return fromQueryParams( params );
+        }
+        return null;
+    }
+
+
+    public static CounterQuery fromQueryParams( Map<String, List<String>> params ) {
+
+        CounterQuery q = null;
+        Integer limit = null;
+        Long startTime = null;
+        Long finishTime = null;
+        Boolean pad = null;
+        CounterResolution resolution = null;
+        List<CounterFilterPredicate> counterFilters = null;
+        List<String> categories = null;
+
+        List<String> l = null;
+
+        limit = firstInteger( params.get( "limit" ) );
+        startTime = firstLong( params.get( "start_time" ) );
+        finishTime = firstLong( params.get( "end_time" ) );
+
+        l = params.get( "resolution" );
+        if ( !isEmpty( l ) ) {
+            resolution = CounterResolution.fromString( l.get( 0 ) );
+        }
+
+        categories = params.get( "category" );
+
+        l = params.get( "counter" );
+        if ( !isEmpty( l ) ) {
+            counterFilters = CounterFilterPredicate.fromList( l );
+        }
+
+        pad = firstBoolean( params.get( "pad" ) );
+
+        if ( limit != null ) {
+            q = newQueryIfNull( q );
+            q.setLimit( limit );
+        }
+
+        if ( startTime != null ) {
+            q = newQueryIfNull( q );
+            q.setStartTime( startTime );
+        }
+
+        if ( finishTime != null ) {
+            q = newQueryIfNull( q );
+            q.setFinishTime( finishTime );
+        }
+
+        if ( resolution != null ) {
+            q = newQueryIfNull( q );
+            q.setResolution( resolution );
+        }
+
+        if ( categories != null ) {
+            q = newQueryIfNull( q );
+            q.setCategories( categories );
+        }
+
+        if ( counterFilters != null ) {
+            q = newQueryIfNull( q );
+            q.setCounterFilters( counterFilters );
+        }
+
+        if ( pad != null ) {
+            q = newQueryIfNull( q );
+            q.setPad( pad );
+        }
+
+        return q;
+    }
+
+
+    public int getLimit() {
+        return getLimit( DEFAULT_MAX_RESULTS );
+    }
+
+
+    public int getLimit( int defaultMax ) {
+        if ( limit <= 0 ) {
+            if ( defaultMax > 0 ) {
+                return defaultMax;
+            }
+            else {
+                return DEFAULT_MAX_RESULTS;
+            }
+        }
+        return limit;
+    }
+
+
+    public void setLimit( int limit ) {
+        limitSet = true;
+        this.limit = limit;
+    }
+
+
+    public CounterQuery withLimit( int limit ) {
+        limitSet = true;
+        this.limit = limit;
+        return this;
+    }
+
+
+    public boolean isLimitSet() {
+        return limitSet;
+    }
+
+
+    public Long getStartTime() {
+        return startTime;
+    }
+
+
+    public void setStartTime( Long startTime ) {
+        this.startTime = startTime;
+    }
+
+
+    public CounterQuery withStartTime( Long startTime ) {
+        this.startTime = startTime;
+        return this;
+    }
+
+
+    public Long getFinishTime() {
+        return finishTime;
+    }
+
+
+    public void setFinishTime( Long finishTime ) {
+        this.finishTime = finishTime;
+    }
+
+
+    public CounterQuery withFinishTime( Long finishTime ) {
+        this.finishTime = finishTime;
+        return this;
+    }
+
+
+    public boolean isPad() {
+        return pad;
+    }
+
+
+    public void setPad( boolean pad ) {
+        this.pad = pad;
+    }
+
+
+    public CounterQuery withPad( boolean pad ) {
+        this.pad = pad;
+        return this;
+    }
+
+
+    public void setResolution( CounterResolution resolution ) {
+        this.resolution = resolution;
+    }
+
+
+    public CounterResolution getResolution() {
+        return resolution;
+    }
+
+
+    public CounterQuery withResolution( CounterResolution resolution ) {
+        this.resolution = resolution;
+        return this;
+    }
+
+
+    public List<String> getCategories() {
+        return categories;
+    }
+
+
+    public CounterQuery addCategory( String category ) {
+        if ( categories == null ) {
+            categories = new ArrayList<String>();
+        }
+        categories.add( category );
+        return this;
+    }
+
+
+    public void setCategories( List<String> categories ) {
+        this.categories = categories;
+    }
+
+
+    public CounterQuery withCategories( List<String> categories ) {
+        this.categories = categories;
+        return this;
+    }
+
+
+    public List<CounterFilterPredicate> getCounterFilters() {
+        return counterFilters;
+    }
+
+
+    public CounterQuery addCounterFilter( String counter ) {
+        CounterFilterPredicate p = CounterFilterPredicate.fromString( counter );
+        if ( p == null ) {
+            return this;
+        }
+        if ( counterFilters == null ) {
+            counterFilters = new ArrayList<CounterFilterPredicate>();
+        }
+        counterFilters.add( p );
+        return this;
+    }
+
+
+    public void setCounterFilters( List<CounterFilterPredicate> counterFilters ) {
+        this.counterFilters = counterFilters;
+    }
+
+
+    public CounterQuery withCounterFilters( List<CounterFilterPredicate> counterFilters ) {
+        this.counterFilters = counterFilters;
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/CounterResolution.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/CounterResolution.java b/stack/core/src/main/java/org/apache/usergrid/persistence/CounterResolution.java
new file mode 100644
index 0000000..b944a3b
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/CounterResolution.java
@@ -0,0 +1,84 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence;
+
+
+public enum CounterResolution {
+    ALL( 0 ), MINUTE( 1 ), FIVE_MINUTES( 5 ), HALF_HOUR( 30 ), HOUR( 60 ), SIX_HOUR( 60 * 6 ), HALF_DAY( 60 * 12 ),
+    DAY( 60 * 24 ), WEEK( 60 * 24 * 7 ), MONTH( 60 * 24 * ( 365 / 12 ) );
+
+    private final long interval;
+
+
+    CounterResolution( long minutes ) {
+        interval = minutes * 60 * 1000;
+    }
+
+
+    public long interval() {
+        return interval;
+    }
+
+
+    public long round( long timestamp ) {
+        if ( interval == 0 ) {
+            return 1;
+        }
+        return ( timestamp / interval ) * interval;
+    }
+
+
+    public long next( long timestamp ) {
+        return round( timestamp ) + interval;
+    }
+
+
+    public static CounterResolution fromOrdinal( int i ) {
+        if ( ( i < 0 ) || ( i >= CounterResolution.values().length ) ) {
+            throw new IndexOutOfBoundsException( "Invalid ordinal" );
+        }
+        return CounterResolution.values()[i];
+    }
+
+
+    public static CounterResolution fromMinutes( int m ) {
+        m = m * 60 * 1000;
+        for ( int i = CounterResolution.values().length - 1; i >= 0; i-- ) {
+            if ( CounterResolution.values()[i].interval <= m ) {
+                return CounterResolution.values()[i];
+            }
+        }
+        return ALL;
+    }
+
+
+    public static CounterResolution fromString( String s ) {
+        if ( s == null ) {
+            return ALL;
+        }
+        try {
+            return CounterResolution.valueOf( s.toUpperCase() );
+        }
+        catch ( IllegalArgumentException e ) {
+        }
+        try {
+            return fromMinutes( Integer.valueOf( s ) );
+        }
+        catch ( NumberFormatException e ) {
+        }
+        return ALL;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/CredentialsInfo.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/CredentialsInfo.java b/stack/core/src/main/java/org/apache/usergrid/persistence/CredentialsInfo.java
new file mode 100644
index 0000000..1065a76
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/CredentialsInfo.java
@@ -0,0 +1,176 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence;
+
+
+import java.util.Map;
+import java.util.TreeMap;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.codehaus.jackson.annotate.JsonAnyGetter;
+import org.codehaus.jackson.annotate.JsonAnySetter;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion;
+
+
+@XmlRootElement
+public class CredentialsInfo implements Comparable<CredentialsInfo> {
+
+    boolean recoverable;
+    boolean encrypted;
+    String cipher;
+    String key;
+    String secret;
+    String hashType;
+    Long created;
+
+    /**
+     * A list of crypto algorithms to apply to unecrypted input for comparison. Note that cipher and hashtype should be
+     * deprecated
+     */
+    private String[] cryptoChain;
+
+    protected Map<String, Object> properties = new TreeMap<String, Object>( String.CASE_INSENSITIVE_ORDER );
+
+
+    public CredentialsInfo() {
+        created = System.currentTimeMillis();
+    }
+
+
+    public boolean getRecoverable() {
+        return recoverable;
+    }
+
+
+    public void setRecoverable( boolean recoverable ) {
+        this.recoverable = recoverable;
+    }
+
+
+    public boolean getEncrypted() {
+        return encrypted;
+    }
+
+
+    public void setEncrypted( boolean encrypted ) {
+        this.encrypted = encrypted;
+    }
+
+
+    @JsonSerialize(include = Inclusion.NON_NULL)
+    public String getCipher() {
+        return cipher;
+    }
+
+
+    public void setCipher( String cipher ) {
+        this.cipher = cipher;
+    }
+
+
+    @JsonSerialize(include = Inclusion.NON_NULL)
+    public String getKey() {
+        return key;
+    }
+
+
+    public void setKey( String key ) {
+        this.key = key;
+    }
+
+
+    @JsonSerialize(include = Inclusion.NON_NULL)
+    public String getSecret() {
+        return secret;
+    }
+
+
+    public void setSecret( String secret ) {
+        this.secret = secret;
+    }
+
+
+    public static String getCredentialsSecret( CredentialsInfo credentials ) {
+        if ( credentials == null ) {
+            return null;
+        }
+        return credentials.getSecret();
+    }
+
+
+    @JsonAnyGetter
+    public Map<String, Object> getProperties() {
+        return properties;
+    }
+
+
+    @JsonAnySetter
+    public void setProperty( String key, Object value ) {
+        properties.put( key, value );
+    }
+
+
+    public Object getProperty( String key ) {
+        return properties.get( key );
+    }
+
+
+    /** @return the hashType */
+    public String getHashType() {
+        return hashType;
+    }
+
+
+    /**
+     * Used for handling legacy passwords encrypted in md5 or similar.
+     *
+     * @param hashType the hashType to set
+     */
+    public void setHashType( String hashType ) {
+        this.hashType = hashType;
+    }
+
+
+    /** @return the cryptoChain */
+    public String[] getCryptoChain() {
+        return cryptoChain;
+    }
+
+
+    /** @param cryptoChain the cryptoChain to set */
+    public void setCryptoChain( String[] cryptoChain ) {
+        this.cryptoChain = cryptoChain;
+    }
+
+
+    public Long getCreated() {
+        return created;
+    }
+
+
+    @Override
+    public int compareTo( CredentialsInfo o ) {
+        if ( created == o.created ) {
+            return 0;
+        }
+        if ( o.created == null ) {
+            return 1;
+        }
+        return o.created.compareTo( created );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/DynamicEntity.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/DynamicEntity.java b/stack/core/src/main/java/org/apache/usergrid/persistence/DynamicEntity.java
new file mode 100644
index 0000000..ea6df03
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/DynamicEntity.java
@@ -0,0 +1,111 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence;
+
+
+import java.util.Map;
+import java.util.UUID;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.usergrid.persistence.annotations.EntityProperty;
+import org.apache.usergrid.utils.UUIDUtils;
+
+
+/**
+ * Dynamic entities can represent any entity type whether specified in the Schema or not.
+ *
+ * @author edanuff
+ */
+@XmlRootElement
+public class DynamicEntity extends AbstractEntity {
+
+    @EntityProperty(indexed = true, fulltextIndexed = false, required = false, mutable = true, aliasProperty = true,
+            basic = true)
+    protected String name;
+
+    protected String type;
+
+
+    /**
+     *
+     */
+    public DynamicEntity() {
+        // setId(UUIDUtils.newTimeUUID());
+    }
+
+
+    /**
+     * @param id
+     */
+    public DynamicEntity( UUID id ) {
+        setUuid( id );
+    }
+
+
+    /**
+     * @param type
+     */
+    public DynamicEntity( String type ) {
+        setUuid( UUIDUtils.newTimeUUID() );
+        setType( type );
+    }
+
+
+    /**
+     * @param id
+     * @param type
+     */
+    public DynamicEntity( String type, UUID id ) {
+        setUuid( id );
+        setType( type );
+    }
+
+
+    /**
+     * @param id
+     * @param type
+     */
+    public DynamicEntity( String type, UUID id, Map<String, Object> propertyMap ) {
+        setUuid( id );
+        setType( type );
+        setProperties( propertyMap );
+    }
+
+
+    @Override
+    @EntityProperty(required = true, mutable = false, basic = true, indexed = false)
+    public String getType() {
+        return type;
+    }
+
+
+    @Override
+    public void setType( String type ) {
+        this.type = type;
+    }
+
+
+    @Override
+    public String getName() {
+        return name;
+    }
+
+
+    public void setName( String name ) {
+        this.name = name;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/Entity.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/Entity.java b/stack/core/src/main/java/org/apache/usergrid/persistence/Entity.java
new file mode 100644
index 0000000..42df9e0
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/Entity.java
@@ -0,0 +1,103 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence;
+
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.usergrid.persistence.annotations.EntityProperty;
+import org.codehaus.jackson.annotate.JsonAnyGetter;
+import org.codehaus.jackson.annotate.JsonAnySetter;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.annotate.JsonPropertyOrder;
+
+import static org.apache.usergrid.persistence.Schema.PROPERTY_NAME;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_TYPE;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_URI;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_UUID;
+
+
+/** Entities are the base object type in the service. */
+@XmlRootElement
+@JsonPropertyOrder({ PROPERTY_UUID, PROPERTY_TYPE, PROPERTY_URI, PROPERTY_NAME })
+public interface Entity extends EntityRef, Comparable<Entity> {
+
+    @Override
+    @EntityProperty(required = true, mutable = false, basic = true, indexed = false)
+    public UUID getUuid();
+
+    public void setUuid( UUID id );
+
+    @Override
+    @EntityProperty(required = true, mutable = false, basic = true, indexed = true)
+    public String getType();
+
+    public void setType( String type );
+
+    public abstract String getName();
+
+    @EntityProperty(indexed = true, required = true, mutable = false)
+    public abstract Long getCreated();
+
+    public abstract void setCreated( Long created );
+
+    @EntityProperty(indexed = true, required = true, mutable = true)
+    public abstract Long getModified();
+
+    public abstract void setModified( Long modified );
+
+    @JsonIgnore
+    public Map<String, Object> getProperties();
+
+    public void setProperties( Map<String, Object> properties );
+
+    public void addProperties( Map<String, Object> properties );
+
+    public abstract Object getProperty( String propertyName );
+
+    public abstract void setProperty( String propertyName, Object propertyValue );
+
+    @Override
+    public abstract int compareTo( Entity o );
+
+    public abstract Entity toTypedEntity();
+
+    public abstract Object getMetadata( String key );
+
+    public abstract void setMetadata( String key, Object value );
+
+    public abstract void mergeMetadata( Map<String, Object> metadata );
+
+    public abstract void clearMetadata();
+
+    public abstract List<Entity> getCollections( String key );
+
+    public abstract void setCollections( String name, List<Entity> results );
+
+    public abstract List<Entity> getConnections( String key );
+
+    public abstract void setConnections( String name, List<Entity> results );
+
+    @JsonAnySetter
+    public abstract void setDynamicProperty( String key, Object value );
+
+    @JsonAnyGetter
+    public abstract Map<String, Object> getDynamicProperties();
+}