You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/01/28 23:21:30 UTC

[20/96] [abbrv] [partial] Change package namespace to org.apache.usergrid

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/usergrid/mq/cassandra/QueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/usergrid/mq/cassandra/QueueManagerImpl.java b/stack/core/src/main/java/org/usergrid/mq/cassandra/QueueManagerImpl.java
deleted file mode 100644
index fd18e55..0000000
--- a/stack/core/src/main/java/org/usergrid/mq/cassandra/QueueManagerImpl.java
+++ /dev/null
@@ -1,1391 +0,0 @@
-/*******************************************************************************
- * Copyright 2012 Apigee Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.usergrid.mq.cassandra;
-
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.usergrid.locking.LockManager;
-import org.usergrid.mq.CounterQuery;
-import org.usergrid.mq.Message;
-import org.usergrid.mq.Query;
-import org.usergrid.mq.Query.CounterFilterPredicate;
-import org.usergrid.mq.QueryProcessor;
-import org.usergrid.mq.QueryProcessor.QuerySlice;
-import org.usergrid.mq.Queue;
-import org.usergrid.mq.QueueManager;
-import org.usergrid.mq.QueueQuery;
-import org.usergrid.mq.QueueResults;
-import org.usergrid.mq.QueueSet;
-import org.usergrid.mq.QueueSet.QueueInfo;
-import org.usergrid.mq.cassandra.QueueIndexUpdate.QueueIndexEntry;
-import org.usergrid.mq.cassandra.io.ConsumerTransaction;
-import org.usergrid.mq.cassandra.io.EndSearch;
-import org.usergrid.mq.cassandra.io.FilterSearch;
-import org.usergrid.mq.cassandra.io.NoTransactionSearch;
-import org.usergrid.mq.cassandra.io.QueueBounds;
-import org.usergrid.mq.cassandra.io.QueueSearch;
-import org.usergrid.mq.cassandra.io.StartSearch;
-import org.usergrid.persistence.AggregateCounter;
-import org.usergrid.persistence.AggregateCounterSet;
-import org.usergrid.persistence.CounterResolution;
-import org.usergrid.persistence.Results;
-import org.usergrid.persistence.cassandra.CassandraPersistenceUtils;
-import org.usergrid.persistence.cassandra.CassandraService;
-import org.usergrid.persistence.cassandra.CounterUtils;
-import org.usergrid.persistence.cassandra.CounterUtils.AggregateCounterSelection;
-import org.usergrid.persistence.exceptions.TransactionNotFoundException;
-import org.usergrid.utils.UUIDUtils;
-
-import com.fasterxml.uuid.UUIDComparator;
-
-import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
-import me.prettyprint.cassandra.serializers.BytesArraySerializer;
-import me.prettyprint.cassandra.serializers.DynamicCompositeSerializer;
-import me.prettyprint.cassandra.serializers.LongSerializer;
-import me.prettyprint.cassandra.serializers.StringSerializer;
-import me.prettyprint.cassandra.serializers.UUIDSerializer;
-import me.prettyprint.hector.api.Keyspace;
-import me.prettyprint.hector.api.beans.AbstractComposite.ComponentEquality;
-import me.prettyprint.hector.api.beans.ColumnSlice;
-import me.prettyprint.hector.api.beans.CounterRow;
-import me.prettyprint.hector.api.beans.CounterRows;
-import me.prettyprint.hector.api.beans.CounterSlice;
-import me.prettyprint.hector.api.beans.DynamicComposite;
-import me.prettyprint.hector.api.beans.HColumn;
-import me.prettyprint.hector.api.beans.HCounterColumn;
-import me.prettyprint.hector.api.factory.HFactory;
-import me.prettyprint.hector.api.mutation.Mutator;
-import me.prettyprint.hector.api.query.MultigetSliceCounterQuery;
-import me.prettyprint.hector.api.query.QueryResult;
-import me.prettyprint.hector.api.query.SliceCounterQuery;
-import me.prettyprint.hector.api.query.SliceQuery;
-
-import static me.prettyprint.hector.api.factory.HFactory.createColumn;
-import static me.prettyprint.hector.api.factory.HFactory.createCounterSliceQuery;
-import static me.prettyprint.hector.api.factory.HFactory.createMutator;
-import static me.prettyprint.hector.api.factory.HFactory.createSliceQuery;
-import static org.usergrid.mq.Queue.QUEUE_CREATED;
-import static org.usergrid.mq.Queue.QUEUE_MODIFIED;
-import static org.usergrid.mq.Queue.QUEUE_NEWEST;
-import static org.usergrid.mq.Queue.QUEUE_OLDEST;
-import static org.usergrid.mq.Queue.getQueueId;
-import static org.usergrid.mq.Queue.normalizeQueuePath;
-import static org.usergrid.mq.QueuePosition.CONSUMER;
-import static org.usergrid.mq.QueuePosition.END;
-import static org.usergrid.mq.QueuePosition.LAST;
-import static org.usergrid.mq.QueuePosition.START;
-import static org.usergrid.mq.cassandra.CassandraMQUtils.addMessageToMutator;
-import static org.usergrid.mq.cassandra.CassandraMQUtils.addQueueToMutator;
-import static org.usergrid.mq.cassandra.CassandraMQUtils.deserializeMessage;
-import static org.usergrid.mq.cassandra.CassandraMQUtils.deserializeQueue;
-import static org.usergrid.mq.cassandra.CassandraMQUtils.getQueueShardRowKey;
-import static org.usergrid.mq.cassandra.QueueIndexUpdate.indexValueCode;
-import static org.usergrid.mq.cassandra.QueueIndexUpdate.toIndexableValue;
-import static org.usergrid.mq.cassandra.QueueIndexUpdate.validIndexableValue;
-import static org.usergrid.mq.cassandra.QueueIndexUpdate.validIndexableValueOrJson;
-import static org.usergrid.mq.cassandra.QueuesCF.COUNTERS;
-import static org.usergrid.mq.cassandra.QueuesCF.MESSAGE_PROPERTIES;
-import static org.usergrid.mq.cassandra.QueuesCF.PROPERTY_INDEX;
-import static org.usergrid.mq.cassandra.QueuesCF.PROPERTY_INDEX_ENTRIES;
-import static org.usergrid.mq.cassandra.QueuesCF.QUEUE_DICTIONARIES;
-import static org.usergrid.mq.cassandra.QueuesCF.QUEUE_INBOX;
-import static org.usergrid.mq.cassandra.QueuesCF.QUEUE_PROPERTIES;
-import static org.usergrid.mq.cassandra.QueuesCF.QUEUE_SUBSCRIBERS;
-import static org.usergrid.mq.cassandra.QueuesCF.QUEUE_SUBSCRIPTIONS;
-import static org.usergrid.persistence.Schema.DICTIONARY_COUNTERS;
-import static org.usergrid.persistence.cassandra.ApplicationCF.APPLICATION_AGGREGATE_COUNTERS;
-import static org.usergrid.persistence.cassandra.CassandraPersistenceUtils.batchExecute;
-import static org.usergrid.persistence.cassandra.CassandraPersistenceUtils.key;
-import static org.usergrid.persistence.cassandra.CassandraService.RETRY_COUNT;
-import static org.usergrid.utils.CompositeUtils.setEqualityFlag;
-import static org.usergrid.utils.CompositeUtils.setGreaterThanEqualityFlag;
-import static org.usergrid.utils.ConversionUtils.bytebuffer;
-import static org.usergrid.utils.IndexUtils.getKeyValueList;
-import static org.usergrid.utils.MapUtils.emptyMapWithKeys;
-import static org.usergrid.utils.NumberUtils.roundLong;
-import static org.usergrid.utils.UUIDUtils.getTimestampInMicros;
-import static org.usergrid.utils.UUIDUtils.newTimeUUID;
-
-
-public class QueueManagerImpl implements QueueManager {
-
-    public static final Logger logger = LoggerFactory.getLogger( QueueManagerImpl.class );
-
-    public static final String DICTIONARY_SUBSCRIBER_INDEXES = "subscriber_indexes";
-    public static final String DICTIONARY_MESSAGE_INDEXES = "message_indexes";
-
-    public static final int QUEUE_SHARD_INTERVAL = 1000 * 60 * 60 * 24;
-    public static final int INDEX_ENTRY_LIST_COUNT = 1000;
-
-    public static final int DEFAULT_SEARCH_COUNT = 10000;
-    public static final int ALL_COUNT = 100000000;
-
-    private UUID applicationId;
-    private CassandraService cass;
-    private CounterUtils counterUtils;
-    private LockManager lockManager;
-    private int lockTimeout;
-
-    public static final StringSerializer se = new StringSerializer();
-    public static final ByteBufferSerializer be = new ByteBufferSerializer();
-    public static final UUIDSerializer ue = new UUIDSerializer();
-    public static final BytesArraySerializer bae = new BytesArraySerializer();
-    public static final DynamicCompositeSerializer dce = new DynamicCompositeSerializer();
-    public static final LongSerializer le = new LongSerializer();
-
-
-    public QueueManagerImpl() {
-    }
-
-
-    public QueueManagerImpl init( CassandraService cass, CounterUtils counterUtils, LockManager lockManager,
-                                  UUID applicationId, int lockTimeout ) {
-        this.cass = cass;
-        this.counterUtils = counterUtils;
-        this.applicationId = applicationId;
-        this.lockManager = lockManager;
-        this.lockTimeout = lockTimeout;
-        return this;
-    }
-
-
-    @Override
-    public Message getMessage( UUID messageId ) {
-        SliceQuery<UUID, String, ByteBuffer> q =
-                createSliceQuery( cass.getApplicationKeyspace( applicationId ), ue, se, be );
-        q.setColumnFamily( MESSAGE_PROPERTIES.getColumnFamily() );
-        q.setKey( messageId );
-        q.setRange( null, null, false, ALL_COUNT );
-        QueryResult<ColumnSlice<String, ByteBuffer>> r = q.execute();
-        ColumnSlice<String, ByteBuffer> slice = r.get();
-        List<HColumn<String, ByteBuffer>> results = slice.getColumns();
-        return deserializeMessage( results );
-    }
-
-
-    public Message batchPostToQueue( Mutator<ByteBuffer> batch, String queuePath, Message message,
-                                     MessageIndexUpdate indexUpdate, long timestamp ) {
-
-        queuePath = normalizeQueuePath( queuePath );
-        UUID queueId = getQueueId( queuePath );
-
-        message.sync();
-
-        addMessageToMutator( batch, message, timestamp );
-
-        long shard_ts = roundLong( message.getTimestamp(), QUEUE_SHARD_INTERVAL );
-
-        logger.debug( "Adding message with id '{}' to queue '{}'", message.getUuid(), queueId );
-
-        batch.addInsertion( getQueueShardRowKey( queueId, shard_ts ), QUEUE_INBOX.getColumnFamily(),
-                createColumn( message.getUuid(), ByteBuffer.allocate( 0 ), timestamp, ue, be ) );
-
-        long oldest_ts = Long.MAX_VALUE - getTimestampInMicros( message.getUuid() );
-        batch.addInsertion( bytebuffer( queueId ), QUEUE_PROPERTIES.getColumnFamily(),
-                createColumn( QUEUE_OLDEST, message.getUuid(), oldest_ts, se, ue ) );
-
-        long newest_ts = getTimestampInMicros( message.getUuid() );
-        batch.addInsertion( bytebuffer( queueId ), QUEUE_PROPERTIES.getColumnFamily(),
-                createColumn( QUEUE_NEWEST, message.getUuid(), newest_ts, se, ue ) );
-
-        batch.addInsertion( bytebuffer( getQueueId( "/" ) ), QUEUE_SUBSCRIBERS.getColumnFamily(),
-                createColumn( queuePath, queueId, timestamp, se, ue ) );
-
-        counterUtils.batchIncrementQueueCounter( batch, getQueueId( "/" ), queuePath, 1L, timestamp, applicationId );
-
-        if ( indexUpdate == null ) {
-            indexUpdate = new MessageIndexUpdate( message );
-        }
-        indexUpdate.addToMutation( batch, queueId, shard_ts, timestamp );
-
-        counterUtils.addMessageCounterMutations( batch, applicationId, queueId, message, timestamp );
-
-        batch.addInsertion( bytebuffer( queueId ), QUEUE_PROPERTIES.getColumnFamily(),
-                createColumn( QUEUE_CREATED, timestamp / 1000, Long.MAX_VALUE - timestamp, se, le ) );
-
-        batch.addInsertion( bytebuffer( queueId ), QUEUE_PROPERTIES.getColumnFamily(),
-                createColumn( QUEUE_MODIFIED, timestamp / 1000, timestamp, se, le ) );
-
-        return message;
-    }
-
-
-    @Override
-    public Message postToQueue( String queuePath, Message message ) {
-        long timestamp = cass.createTimestamp();
-        Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
-
-        queuePath = normalizeQueuePath( queuePath );
-
-        MessageIndexUpdate indexUpdate = new MessageIndexUpdate( message );
-
-        batchPostToQueue( batch, queuePath, message, indexUpdate, timestamp );
-
-        batchExecute( batch, RETRY_COUNT );
-
-        String firstSubscriberQueuePath = null;
-        while ( true ) {
-
-            QueueSet subscribers = getSubscribers( queuePath, firstSubscriberQueuePath, 1000 );
-
-            if ( subscribers.getQueues().isEmpty() ) {
-                break;
-            }
-
-            batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
-            for ( QueueInfo q : subscribers.getQueues() ) {
-                batchPostToQueue( batch, q.getPath(), message, indexUpdate, timestamp );
-
-                firstSubscriberQueuePath = q.getPath();
-            }
-            batchExecute( batch, RETRY_COUNT );
-
-            if ( !subscribers.hasMore() ) {
-                break;
-            }
-        }
-
-        return message;
-    }
-
-
-    @Override
-    public List<Message> postToQueue( String queuePath, List<Message> messages ) {
-
-        // Can't do this as one big batch operation because it will
-        // time out
-
-        for ( Message message : messages ) {
-            postToQueue( queuePath, message );
-        }
-
-        return messages;
-    }
-
-
-    static TreeSet<UUID> add( TreeSet<UUID> a, UUID uuid, boolean reversed, int limit ) {
-
-        if ( a == null ) {
-            a = new TreeSet<UUID>( new UUIDComparator() );
-        }
-
-        if ( uuid == null ) {
-            return a;
-        }
-
-        // if we have less than the limit, just add it
-        if ( a.size() < limit ) {
-            a.add( uuid );
-        }
-        else if ( reversed ) {
-            // if reversed, we want to add more recent messages
-            // and eject the oldest
-            if ( UUIDComparator.staticCompare( uuid, a.first() ) > 0 ) {
-                a.pollFirst();
-                a.add( uuid );
-            }
-        }
-        else {
-            // add older messages and eject the newset
-            if ( UUIDComparator.staticCompare( uuid, a.last() ) < 0 ) {
-                a.pollLast();
-                a.add( uuid );
-            }
-        }
-
-        return a;
-    }
-
-
-    static TreeSet<UUID> add( TreeSet<UUID> a, TreeSet<UUID> b, boolean reversed, int limit ) {
-
-        if ( b == null ) {
-            return a;
-        }
-
-        for ( UUID uuid : b ) {
-            a = add( a, uuid, reversed, limit );
-        }
-
-        return a;
-    }
-
-
-    static TreeSet<UUID> mergeOr( TreeSet<UUID> a, TreeSet<UUID> b, boolean reversed, int limit ) {
-        TreeSet<UUID> mergeSet = new TreeSet<UUID>( new UUIDComparator() );
-
-        if ( ( a == null ) && ( b == null ) ) {
-            return mergeSet;
-        }
-        else if ( a == null ) {
-            return b;
-        }
-        else if ( b == null ) {
-            return a;
-        }
-
-        add( mergeSet, a, reversed, limit );
-        add( mergeSet, b, reversed, limit );
-
-        return mergeSet;
-    }
-
-
-    static TreeSet<UUID> mergeAnd( TreeSet<UUID> a, TreeSet<UUID> b, boolean reversed, int limit ) {
-        TreeSet<UUID> mergeSet = new TreeSet<UUID>( new UUIDComparator() );
-
-        if ( a == null ) {
-            return mergeSet;
-        }
-        if ( b == null ) {
-            return mergeSet;
-        }
-
-        for ( UUID uuid : b ) {
-            if ( a.contains( b ) ) {
-                add( mergeSet, uuid, reversed, limit );
-            }
-        }
-
-        return mergeSet;
-    }
-
-
-    @Override
-    public QueueResults getFromQueue( String queuePath, QueueQuery query ) {
-
-        if ( query == null ) {
-            query = new QueueQuery();
-        }
-
-        Keyspace ko = cass.getApplicationKeyspace( applicationId );
-
-        QueueSearch search = null;
-
-        if ( query.hasFilterPredicates() ) {
-            search = new FilterSearch( ko );
-        }
-
-        else if ( query.getPosition() == LAST || query.getPosition() == CONSUMER ) {
-            if ( query.getTimeout() > 0 ) {
-                search = new ConsumerTransaction( applicationId, ko, lockManager, cass, lockTimeout );
-            }
-            else {
-                search = new NoTransactionSearch( ko );
-            }
-        }
-        else if ( query.getPosition() == START ) {
-
-            search = new StartSearch( ko );
-        }
-        else if ( query.getPosition() == END ) {
-            search = new EndSearch( ko );
-        }
-        else {
-            throw new IllegalArgumentException( "You must specify a valid position or query" );
-        }
-
-        return search.getResults( queuePath, query );
-    }
-
-
-    public void batchSubscribeToQueue( Mutator<ByteBuffer> batch, String publisherQueuePath, UUID publisherQueueId,
-                                       String subscriberQueuePath, UUID subscriberQueueId, long timestamp ) {
-
-        batch.addInsertion( bytebuffer( publisherQueueId ), QUEUE_SUBSCRIBERS.getColumnFamily(),
-                createColumn( subscriberQueuePath, subscriberQueueId, timestamp, se, ue ) );
-
-        batch.addInsertion( bytebuffer( subscriberQueueId ), QUEUE_SUBSCRIPTIONS.getColumnFamily(),
-                createColumn( publisherQueuePath, publisherQueueId, timestamp, se, ue ) );
-    }
-
-
-    @Override
-    public QueueSet subscribeToQueue( String publisherQueuePath, String subscriberQueuePath ) {
-
-        publisherQueuePath = normalizeQueuePath( publisherQueuePath );
-        UUID publisherQueueId = getQueueId( publisherQueuePath );
-
-        subscriberQueuePath = normalizeQueuePath( subscriberQueuePath );
-        UUID subscriberQueueId = getQueueId( subscriberQueuePath );
-
-        UUID timestampUuid = newTimeUUID();
-        long timestamp = getTimestampInMicros( timestampUuid );
-
-        Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
-
-        batchSubscribeToQueue( batch, publisherQueuePath, publisherQueueId, subscriberQueuePath, subscriberQueueId,
-                timestamp );
-
-        try {
-            Queue queue = getQueue( subscriberQueuePath, subscriberQueueId );
-            if ( queue != null ) {
-                batchUpdateQueuePropertiesIndexes( batch, publisherQueueId, subscriberQueuePath, subscriberQueueId,
-                        queue.getProperties(), timestampUuid );
-            }
-        }
-        catch ( Exception e ) {
-            logger.error( "Unable to update index", e );
-        }
-
-        batchExecute( batch, RETRY_COUNT );
-
-        return new QueueSet().addQueue( subscriberQueuePath, subscriberQueueId );
-    }
-
-
-    public void batchUnsubscribeFromQueue( Mutator<ByteBuffer> batch, String publisherQueuePath, UUID publisherQueueId,
-                                           String subscriberQueuePath, UUID subscriberQueueId, long timestamp ) {
-
-        batch.addDeletion( bytebuffer( publisherQueueId ), QUEUE_SUBSCRIBERS.getColumnFamily(), subscriberQueuePath, se,
-                timestamp );
-
-        batch.addDeletion( bytebuffer( subscriberQueueId ), QUEUE_SUBSCRIPTIONS.getColumnFamily(), publisherQueuePath,
-                se, timestamp );
-    }
-
-
-    @Override
-    public QueueSet unsubscribeFromQueue( String publisherQueuePath, String subscriberQueuePath ) {
-
-        publisherQueuePath = normalizeQueuePath( publisherQueuePath );
-        UUID publisherQueueId = getQueueId( publisherQueuePath );
-
-        subscriberQueuePath = normalizeQueuePath( subscriberQueuePath );
-        UUID subscriberQueueId = getQueueId( subscriberQueuePath );
-
-        UUID timestampUuid = newTimeUUID();
-        long timestamp = getTimestampInMicros( timestampUuid );
-
-        Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
-
-        batchUnsubscribeFromQueue( batch, publisherQueuePath, publisherQueueId, subscriberQueuePath, subscriberQueueId,
-                timestamp );
-
-        try {
-            Queue queue = getQueue( subscriberQueuePath, subscriberQueueId );
-
-            batchUpdateQueuePropertiesIndexes( batch, publisherQueueId, subscriberQueuePath, subscriberQueueId,
-                    emptyMapWithKeys( queue.getProperties() ), timestampUuid );
-        }
-        catch ( Exception e ) {
-            logger.error( "Unable to update index", e );
-        }
-
-        batchExecute( batch, RETRY_COUNT );
-
-        return new QueueSet().addQueue( subscriberQueuePath, subscriberQueueId );
-    }
-
-
-    @Override
-    public QueueSet getSubscribers( String publisherQueuePath, String firstSubscriberQueuePath, int limit ) {
-
-        UUID publisherQueueId = getQueueId( publisherQueuePath );
-
-        Keyspace ko = cass.getApplicationKeyspace( applicationId );
-
-        if ( firstSubscriberQueuePath != null ) {
-            limit += 1;
-        }
-
-        List<HColumn<String, UUID>> columns = createSliceQuery( ko, ue, se, ue ).setKey( publisherQueueId )
-                .setColumnFamily( QUEUE_SUBSCRIBERS.getColumnFamily() )
-                .setRange( normalizeQueuePath( firstSubscriberQueuePath ), null, false, limit + 1 ).execute().get()
-                .getColumns();
-
-        QueueSet queues = new QueueSet();
-
-        int count = Math.min( limit, columns.size() );
-        if ( columns != null ) {
-            for ( int i = firstSubscriberQueuePath != null ? 1 : 0; i < count; i++ ) {
-                HColumn<String, UUID> column = columns.get( i );
-                queues.addQueue( column.getName(), column.getValue() );
-            }
-        }
-        if ( columns.size() > limit ) {
-            queues.setMore( true );
-        }
-        return queues;
-    }
-
-
-    @Override
-    public QueueSet getSubscriptions( String subscriberQueuePath, String firstSubscriptionQueuePath, int limit ) {
-
-        UUID subscriberQueueId = getQueueId( subscriberQueuePath );
-
-        Keyspace ko = cass.getApplicationKeyspace( applicationId );
-
-        if ( firstSubscriptionQueuePath != null ) {
-            limit += 1;
-        }
-
-        List<HColumn<String, UUID>> columns = createSliceQuery( ko, ue, se, ue ).setKey( subscriberQueueId )
-                .setColumnFamily( QUEUE_SUBSCRIPTIONS.getColumnFamily() )
-                .setRange( normalizeQueuePath( firstSubscriptionQueuePath ), null, false, limit + 1 ).execute().get()
-                .getColumns();
-
-        QueueSet queues = new QueueSet();
-
-        int count = Math.min( limit, columns.size() );
-        if ( columns != null ) {
-            for ( int i = firstSubscriptionQueuePath != null ? 1 : 0; i < count; i++ ) {
-                HColumn<String, UUID> column = columns.get( i );
-                queues.addQueue( column.getName(), column.getValue() );
-            }
-        }
-        if ( columns.size() > limit ) {
-            queues.setMore( true );
-        }
-        return queues;
-    }
-
-
-    @Override
-    public QueueSet addSubscribersToQueue( String publisherQueuePath, List<String> subscriberQueuePaths ) {
-
-        publisherQueuePath = normalizeQueuePath( publisherQueuePath );
-        UUID publisherQueueId = getQueueId( publisherQueuePath );
-
-        UUID timestampUuid = newTimeUUID();
-        long timestamp = getTimestampInMicros( timestampUuid );
-
-        Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
-
-        QueueSet queues = new QueueSet();
-
-        for ( String subscriberQueuePath : subscriberQueuePaths ) {
-
-            subscriberQueuePath = normalizeQueuePath( subscriberQueuePath );
-            UUID subscriberQueueId = getQueueId( subscriberQueuePath );
-
-            batchSubscribeToQueue( batch, publisherQueuePath, publisherQueueId, subscriberQueuePath, subscriberQueueId,
-                    timestamp );
-
-            try {
-                Queue queue = getQueue( subscriberQueuePath, subscriberQueueId );
-
-                batchUpdateQueuePropertiesIndexes( batch, publisherQueueId, subscriberQueuePath, subscriberQueueId,
-                        queue.getProperties(), timestampUuid );
-            }
-            catch ( Exception e ) {
-                logger.error( "Unable to update index", e );
-            }
-
-            queues.addQueue( subscriberQueuePath, subscriberQueueId );
-        }
-
-        batchExecute( batch, RETRY_COUNT );
-
-        return queues;
-    }
-
-
-    @Override
-    public QueueSet removeSubscribersFromQueue( String publisherQueuePath, List<String> subscriberQueuePaths ) {
-
-        publisherQueuePath = normalizeQueuePath( publisherQueuePath );
-        UUID publisherQueueId = getQueueId( publisherQueuePath );
-
-        UUID timestampUuid = newTimeUUID();
-        long timestamp = getTimestampInMicros( timestampUuid );
-
-        Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
-
-        QueueSet queues = new QueueSet();
-
-        for ( String subscriberQueuePath : subscriberQueuePaths ) {
-
-            subscriberQueuePath = normalizeQueuePath( subscriberQueuePath );
-            UUID subscriberQueueId = getQueueId( subscriberQueuePath );
-
-            batchUnsubscribeFromQueue( batch, publisherQueuePath, publisherQueueId, subscriberQueuePath,
-                    subscriberQueueId, timestamp );
-
-            try {
-                Queue queue = getQueue( subscriberQueuePath, subscriberQueueId );
-
-                batchUpdateQueuePropertiesIndexes( batch, publisherQueueId, subscriberQueuePath, subscriberQueueId,
-                        emptyMapWithKeys( queue.getProperties() ), timestampUuid );
-            }
-            catch ( Exception e ) {
-                logger.error( "Unable to update index", e );
-            }
-
-            queues.addQueue( subscriberQueuePath, subscriberQueueId );
-        }
-
-        batchExecute( batch, RETRY_COUNT );
-
-        return queues;
-    }
-
-
-    @Override
-    public QueueSet subscribeToQueues( String subscriberQueuePath, List<String> publisherQueuePaths ) {
-
-        subscriberQueuePath = normalizeQueuePath( subscriberQueuePath );
-        UUID subscriberQueueId = getQueueId( subscriberQueuePath );
-
-        UUID timestampUuid = newTimeUUID();
-        long timestamp = getTimestampInMicros( timestampUuid );
-
-        Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
-
-        QueueSet queues = new QueueSet();
-
-        for ( String publisherQueuePath : publisherQueuePaths ) {
-
-            publisherQueuePath = normalizeQueuePath( publisherQueuePath );
-            UUID publisherQueueId = getQueueId( publisherQueuePath );
-
-            batchSubscribeToQueue( batch, publisherQueuePath, publisherQueueId, subscriberQueuePath, subscriberQueueId,
-                    timestamp );
-
-            try {
-                Queue queue = getQueue( subscriberQueuePath, subscriberQueueId );
-
-                batchUpdateQueuePropertiesIndexes( batch, publisherQueueId, subscriberQueuePath, subscriberQueueId,
-                        queue.getProperties(), timestampUuid );
-            }
-            catch ( Exception e ) {
-                logger.error( "Unable to update index", e );
-            }
-
-            queues.addQueue( publisherQueuePath, publisherQueueId );
-        }
-
-        batchExecute( batch, RETRY_COUNT );
-
-        return queues;
-    }
-
-
-    @Override
-    public QueueSet unsubscribeFromQueues( String subscriberQueuePath, List<String> publisherQueuePaths ) {
-
-        subscriberQueuePath = normalizeQueuePath( subscriberQueuePath );
-        UUID subscriberQueueId = getQueueId( subscriberQueuePath );
-
-        UUID timestampUuid = newTimeUUID();
-        long timestamp = getTimestampInMicros( timestampUuid );
-
-        Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
-
-        QueueSet queues = new QueueSet();
-
-        for ( String publisherQueuePath : publisherQueuePaths ) {
-
-            publisherQueuePath = normalizeQueuePath( publisherQueuePath );
-            UUID publisherQueueId = getQueueId( publisherQueuePath );
-
-            batchUnsubscribeFromQueue( batch, publisherQueuePath, publisherQueueId, subscriberQueuePath,
-                    subscriberQueueId, timestamp );
-
-            try {
-                Queue queue = getQueue( subscriberQueuePath, subscriberQueueId );
-
-                batchUpdateQueuePropertiesIndexes( batch, publisherQueueId, subscriberQueuePath, subscriberQueueId,
-                        emptyMapWithKeys( queue.getProperties() ), timestampUuid );
-            }
-            catch ( Exception e ) {
-                logger.error( "Unable to update index", e );
-            }
-
-            queues.addQueue( publisherQueuePath, publisherQueueId );
-        }
-
-        batchExecute( batch, RETRY_COUNT );
-
-        return queues;
-    }
-
-
-    @Override
-    public void incrementAggregateQueueCounters( String queuePath, String category, String counterName, long value ) {
-        long timestamp = cass.createTimestamp();
-        Mutator<ByteBuffer> m = createMutator( cass.getApplicationKeyspace( applicationId ), be );
-        counterUtils.batchIncrementAggregateCounters( m, applicationId, null, null, getQueueId( queuePath ), category,
-                counterName, value, timestamp );
-        batchExecute( m, CassandraService.RETRY_COUNT );
-    }
-
-
-    public AggregateCounterSet getAggregateCounters( UUID queueId, String category, String counterName,
-                                                     CounterResolution resolution, long start, long finish,
-                                                     boolean pad ) {
-
-        start = resolution.round( start );
-        finish = resolution.round( finish );
-        long expected_time = start;
-        Keyspace ko = cass.getApplicationKeyspace( applicationId );
-        SliceCounterQuery<String, Long> q = createCounterSliceQuery( ko, se, le );
-        q.setColumnFamily( APPLICATION_AGGREGATE_COUNTERS.getColumnFamily() );
-        q.setRange( start, finish, false, ALL_COUNT );
-        QueryResult<CounterSlice<Long>> r = q.setKey(
-                counterUtils.getAggregateCounterRow( counterName, null, null, queueId, category, resolution ) )
-                                             .execute();
-        List<AggregateCounter> counters = new ArrayList<AggregateCounter>();
-        for ( HCounterColumn<Long> column : r.get().getColumns() ) {
-            AggregateCounter count = new AggregateCounter( column.getName(), column.getValue() );
-            if ( pad && !( resolution == CounterResolution.ALL ) ) {
-                while ( count.getTimestamp() != expected_time ) {
-                    counters.add( new AggregateCounter( expected_time, 0 ) );
-                    expected_time = resolution.next( expected_time );
-                }
-                expected_time = resolution.next( expected_time );
-            }
-            counters.add( count );
-        }
-        if ( pad && !( resolution == CounterResolution.ALL ) ) {
-            while ( expected_time <= finish ) {
-                counters.add( new AggregateCounter( expected_time, 0 ) );
-                expected_time = resolution.next( expected_time );
-            }
-        }
-        return new AggregateCounterSet( counterName, queueId, category, counters );
-    }
-
-
-    public List<AggregateCounterSet> getAggregateCounters( UUID queueId, CounterQuery query ) throws Exception {
-
-        CounterResolution resolution = query.getResolution();
-        if ( resolution == null ) {
-            resolution = CounterResolution.ALL;
-        }
-        long start = query.getStartTime() != null ? query.getStartTime() : 0;
-        long finish = query.getFinishTime() != null ? query.getFinishTime() : 0;
-        boolean pad = query.isPad();
-        if ( start <= 0 ) {
-            start = 0;
-        }
-        if ( ( finish <= 0 ) || ( finish < start ) ) {
-            finish = System.currentTimeMillis();
-        }
-        start = resolution.round( start );
-        finish = resolution.round( finish );
-        long expected_time = start;
-
-        if ( pad && ( resolution != CounterResolution.ALL ) ) {
-            long max_counters = ( finish - start ) / resolution.interval();
-            if ( max_counters > 1000 ) {
-                finish = resolution.round( start + ( resolution.interval() * 1000 ) );
-            }
-        }
-
-        List<CounterFilterPredicate> filters = query.getCounterFilters();
-        if ( filters == null ) {
-            return null;
-        }
-        Map<String, org.usergrid.persistence.cassandra.CounterUtils.AggregateCounterSelection> selections =
-                new HashMap<String, AggregateCounterSelection>();
-        Keyspace ko = cass.getApplicationKeyspace( applicationId );
-
-        for ( CounterFilterPredicate filter : filters ) {
-            AggregateCounterSelection selection =
-                    new AggregateCounterSelection( filter.getName(), null, null, queueId, filter.getCategory() );
-            selections.put( selection.getRow( resolution ), selection );
-        }
-
-        MultigetSliceCounterQuery<String, Long> q = HFactory.createMultigetSliceCounterQuery( ko, se, le );
-        q.setColumnFamily( APPLICATION_AGGREGATE_COUNTERS.getColumnFamily() );
-        q.setRange( start, finish, false, ALL_COUNT );
-        QueryResult<CounterRows<String, Long>> rows = q.setKeys( selections.keySet() ).execute();
-
-        List<AggregateCounterSet> countSets = new ArrayList<AggregateCounterSet>();
-        for ( CounterRow<String, Long> r : rows.get() ) {
-            expected_time = start;
-            List<AggregateCounter> counters = new ArrayList<AggregateCounter>();
-            for ( HCounterColumn<Long> column : r.getColumnSlice().getColumns() ) {
-                AggregateCounter count = new AggregateCounter( column.getName(), column.getValue() );
-                if ( pad && ( resolution != CounterResolution.ALL ) ) {
-                    while ( count.getTimestamp() != expected_time ) {
-                        counters.add( new AggregateCounter( expected_time, 0 ) );
-                        expected_time = resolution.next( expected_time );
-                    }
-                    expected_time = resolution.next( expected_time );
-                }
-                counters.add( count );
-            }
-            if ( pad && ( resolution != CounterResolution.ALL ) ) {
-                while ( expected_time <= finish ) {
-                    counters.add( new AggregateCounter( expected_time, 0 ) );
-                    expected_time = resolution.next( expected_time );
-                }
-            }
-            AggregateCounterSelection selection = selections.get( r.getKey() );
-            countSets.add( new AggregateCounterSet( selection.getName(), queueId, selection.getCategory(), counters ) );
-        }
-
-        Collections.sort( countSets, new Comparator<AggregateCounterSet>() {
-            @Override
-            public int compare( AggregateCounterSet o1, AggregateCounterSet o2 ) {
-                String s1 = o1.getName();
-                String s2 = o2.getName();
-                return s1.compareTo( s2 );
-            }
-        } );
-        return countSets;
-    }
-
-
-    @Override
-    public Results getAggregateQueueCounters( String queuePath, String category, String counterName,
-                                              CounterResolution resolution, long start, long finish, boolean pad ) {
-        return Results.fromCounters(
-                getAggregateCounters( getQueueId( queuePath ), category, counterName, resolution, start, finish,
-                        pad ) );
-    }
-
-
-    @Override
-    public Results getAggregateQueueCounters( String queuePath, CounterQuery query ) throws Exception {
-        return Results.fromCounters( getAggregateCounters( getQueueId( queuePath ), query ) );
-    }
-
-
-    @Override
-    public void incrementQueueCounters( String queuePath, Map<String, Long> counts ) {
-        long timestamp = cass.createTimestamp();
-        Mutator<ByteBuffer> m = createMutator( cass.getApplicationKeyspace( applicationId ), be );
-        counterUtils.batchIncrementQueueCounters( m, getQueueId( queuePath ), counts, timestamp, applicationId );
-        batchExecute( m, CassandraService.RETRY_COUNT );
-    }
-
-
-    @Override
-    public void incrementQueueCounter( String queuePath, String name, long value ) {
-        long timestamp = cass.createTimestamp();
-        Mutator<ByteBuffer> m = createMutator( cass.getApplicationKeyspace( applicationId ), be );
-        counterUtils.batchIncrementQueueCounter( m, getQueueId( queuePath ), name, value, timestamp, applicationId );
-        batchExecute( m, CassandraService.RETRY_COUNT );
-    }
-
-
-    public Map<String, Long> getQueueCounters( UUID queueId ) throws Exception {
-
-        Map<String, Long> counters = new HashMap<String, Long>();
-        Keyspace ko = cass.getApplicationKeyspace( applicationId );
-        SliceCounterQuery<UUID, String> q = createCounterSliceQuery( ko, ue, se );
-        q.setColumnFamily( COUNTERS.getColumnFamily() );
-        q.setRange( null, null, false, ALL_COUNT );
-        QueryResult<CounterSlice<String>> r = q.setKey( queueId ).execute();
-        for ( HCounterColumn<String> column : r.get().getColumns() ) {
-            counters.put( column.getName(), column.getValue() );
-        }
-        return counters;
-    }
-
-
-    @Override
-    public Map<String, Long> getQueueCounters( String queuePath ) throws Exception {
-        return getQueueCounters( getQueueId( queuePath ) );
-    }
-
-
-    @Override
-    public Set<String> getQueueCounterNames( String queuePath ) throws Exception {
-        Set<String> names = new HashSet<String>();
-        Keyspace ko = cass.getApplicationKeyspace( applicationId );
-        SliceQuery<String, String, ByteBuffer> q = createSliceQuery( ko, se, se, be );
-        q.setColumnFamily( QueuesCF.QUEUE_DICTIONARIES.toString() );
-        q.setKey( CassandraPersistenceUtils.key( getQueueId( queuePath ), DICTIONARY_COUNTERS ).toString() );
-        q.setRange( null, null, false, ALL_COUNT );
-
-        List<HColumn<String, ByteBuffer>> columns = q.execute().get().getColumns();
-        for ( HColumn<String, ByteBuffer> column : columns ) {
-            names.add( column.getName() );
-        }
-        return names;
-    }
-
-
-    public Queue getQueue( String queuePath, UUID queueId ) {
-        SliceQuery<UUID, String, ByteBuffer> q =
-                createSliceQuery( cass.getApplicationKeyspace( applicationId ), ue, se, be );
-        q.setColumnFamily( QUEUE_PROPERTIES.getColumnFamily() );
-        q.setKey( queueId );
-        q.setRange( null, null, false, ALL_COUNT );
-        QueryResult<ColumnSlice<String, ByteBuffer>> r = q.execute();
-        ColumnSlice<String, ByteBuffer> slice = r.get();
-        List<HColumn<String, ByteBuffer>> results = slice.getColumns();
-        return deserializeQueue( results );
-    }
-
-
-    @Override
-    public Queue getQueue( String queuePath ) {
-        return getQueue( queuePath, getQueueId( queuePath ) );
-    }
-
-
-    @Override
-    public Queue updateQueue( String queuePath, Queue queue ) {
-        queue.setPath( queuePath );
-
-        UUID timestampUuid = newTimeUUID();
-        long timestamp = getTimestampInMicros( timestampUuid );
-
-        Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
-
-        addQueueToMutator( batch, queue, timestamp );
-
-        try {
-            batchUpdateQueuePropertiesIndexes( batch, queuePath, queue.getUuid(), queue.getProperties(),
-                    timestampUuid );
-        }
-        catch ( Exception e ) {
-            logger.error( "Unable to update queue", e );
-        }
-
-        batch.addInsertion( bytebuffer( queue.getUuid() ), QUEUE_PROPERTIES.getColumnFamily(),
-                createColumn( QUEUE_CREATED, timestamp / 1000, Long.MAX_VALUE - timestamp, se, le ) );
-
-        batch.addInsertion( bytebuffer( queue.getUuid() ), QUEUE_PROPERTIES.getColumnFamily(),
-                createColumn( QUEUE_MODIFIED, timestamp / 1000, timestamp, se, le ) );
-
-        batchExecute( batch, RETRY_COUNT );
-
-        return queue;
-    }
-
-
-    @Override
-    public Queue updateQueue( String queuePath, Map<String, Object> properties ) {
-        return updateQueue( queuePath, new Queue( properties ) );
-    }
-
-
-    public void batchUpdateQueuePropertiesIndexes( Mutator<ByteBuffer> batch, String subscriberQueuePath,
-                                                   UUID subscriberQueueId, Map<String, Object> properties,
-                                                   UUID timestampUuid ) throws Exception {
-
-        QueueSet subscriptions = getSubscriptions( subscriberQueuePath, null, ALL_COUNT );
-
-        if ( subscriptions != null ) {
-
-            for ( Map.Entry<String, Object> property : properties.entrySet() ) {
-
-                if ( !Queue.QUEUE_PROPERTIES.containsKey( property.getKey() ) ) {
-
-                    QueueIndexUpdate indexUpdate =
-                            batchStartQueueIndexUpdate( batch, subscriberQueuePath, subscriberQueueId,
-                                    property.getKey(), property.getValue(), timestampUuid );
-
-                    for ( QueueInfo subscription : subscriptions.getQueues() ) {
-                        batchUpdateQueueIndex( indexUpdate, subscription.getUuid() );
-                    }
-                }
-            }
-        }
-    }
-
-
-    public void batchUpdateQueuePropertiesIndexes( Mutator<ByteBuffer> batch, UUID publisherQueueId,
-                                                   String subscriberQueuePath, UUID subscriberQueueId,
-                                                   Map<String, Object> properties, UUID timestampUuid )
-            throws Exception {
-
-        for ( Map.Entry<String, Object> property : properties.entrySet() ) {
-
-            if ( !Queue.QUEUE_PROPERTIES.containsKey( property.getKey() ) ) {
-
-                QueueIndexUpdate indexUpdate =
-                        batchStartQueueIndexUpdate( batch, subscriberQueuePath, subscriberQueueId, property.getKey(),
-                                property.getValue(), timestampUuid );
-
-                batchUpdateQueueIndex( indexUpdate, publisherQueueId );
-            }
-        }
-    }
-
-
-    public QueueIndexUpdate batchUpdateQueueIndex( QueueIndexUpdate indexUpdate, UUID subcriptionQueueId )
-            throws Exception {
-
-        logger.info( "batchUpdateQueueIndex" );
-
-        Mutator<ByteBuffer> batch = indexUpdate.getBatch();
-
-        // queue_id,prop_name
-        Object index_key = key( subcriptionQueueId, indexUpdate.getEntryName() );
-
-        // subscription_queue_id,subscriber_queue_id,prop_name
-
-        for ( QueueIndexEntry entry : indexUpdate.getPrevEntries() ) {
-
-            if ( entry.getValue() != null ) {
-
-                index_key = key( subcriptionQueueId, entry.getPath() );
-
-                batch.addDeletion( bytebuffer( index_key ), PROPERTY_INDEX.getColumnFamily(), entry.getIndexComposite(),
-                        dce, indexUpdate.getTimestamp() );
-            }
-            else {
-                logger.error( "Unexpected condition - deserialized property value is null" );
-            }
-        }
-
-        if ( indexUpdate.getNewEntries().size() > 0 ) {
-
-            for ( QueueIndexEntry indexEntry : indexUpdate.getNewEntries() ) {
-
-                index_key = key( subcriptionQueueId, indexEntry.getPath() );
-
-                batch.addInsertion( bytebuffer( index_key ), PROPERTY_INDEX.getColumnFamily(),
-                        createColumn( indexEntry.getIndexComposite(), ByteBuffer.allocate( 0 ),
-                                indexUpdate.getTimestamp(), dce, be ) );
-            }
-        }
-
-        for ( String index : indexUpdate.getIndexesSet() ) {
-            batch.addInsertion( bytebuffer( key( subcriptionQueueId, DICTIONARY_SUBSCRIBER_INDEXES ) ),
-                    QUEUE_DICTIONARIES.getColumnFamily(),
-                    createColumn( index, ByteBuffer.allocate( 0 ), indexUpdate.getTimestamp(), se, be ) );
-        }
-
-        return indexUpdate;
-    }
-
-
-    public QueueIndexUpdate batchStartQueueIndexUpdate( Mutator<ByteBuffer> batch, String queuePath, UUID queueId,
-                                                        String entryName, Object entryValue, UUID timestampUuid )
-            throws Exception {
-
-        long timestamp = getTimestampInMicros( timestampUuid );
-
-        QueueIndexUpdate indexUpdate =
-                new QueueIndexUpdate( batch, queuePath, queueId, entryName, entryValue, timestampUuid );
-
-        List<HColumn<ByteBuffer, ByteBuffer>> entries = null;
-
-        entries = createSliceQuery( cass.getApplicationKeyspace( applicationId ), ue, be, be )
-                .setColumnFamily( PROPERTY_INDEX_ENTRIES.getColumnFamily() ).setKey( queueId )
-                .setRange( DynamicComposite.toByteBuffer( entryName ),
-                        setGreaterThanEqualityFlag( new DynamicComposite( entryName ) ).serialize(), false,
-                        INDEX_ENTRY_LIST_COUNT ).execute().get().getColumns();
-
-        if ( logger.isInfoEnabled() ) {
-            logger.info( "Found {} previous index entries for {} of entity {}", new Object[] {
-                    entries.size(), entryName, queueId
-            } );
-        }
-
-        // Delete all matching entries from entry list
-        for ( HColumn<ByteBuffer, ByteBuffer> entry : entries ) {
-            UUID prev_timestamp = null;
-            Object prev_value = null;
-            String prev_obj_path = null;
-
-            // new format:
-            // composite(entryName,
-            // value_code,prev_value,prev_timestamp,prev_obj_path) = null
-            DynamicComposite composite = DynamicComposite.fromByteBuffer( entry.getName().duplicate() );
-            prev_value = composite.get( 2 );
-            prev_timestamp = ( UUID ) composite.get( 3 );
-            if ( composite.size() > 4 ) {
-                prev_obj_path = ( String ) composite.get( 4 );
-            }
-
-            if ( prev_value != null ) {
-
-                String entryPath = entryName;
-                if ( ( prev_obj_path != null ) && ( prev_obj_path.length() > 0 ) ) {
-                    entryPath = entryName + "." + prev_obj_path;
-                }
-
-                indexUpdate.addPrevEntry( entryPath, prev_value, prev_timestamp );
-
-                // composite(property_value,subscriber_id,entry_timestamp)
-                batch.addDeletion( bytebuffer( queueId ), PROPERTY_INDEX_ENTRIES.getColumnFamily(),
-                        entry.getName().duplicate(), be, timestamp );
-            }
-            else {
-                logger.error( "Unexpected condition - deserialized property value is null" );
-            }
-        }
-
-        if ( validIndexableValueOrJson( entryValue ) ) {
-
-            List<Map.Entry<String, Object>> list = getKeyValueList( entryName, entryValue, false );
-
-            for ( Map.Entry<String, Object> indexEntry : list ) {
-
-                if ( validIndexableValue( indexEntry.getValue() ) ) {
-                    indexUpdate.addNewEntry( indexEntry.getKey(), toIndexableValue( indexEntry.getValue() ) );
-                }
-            }
-
-            for ( Map.Entry<String, Object> indexEntry : list ) {
-
-                String name = indexEntry.getKey();
-                if ( name.startsWith( entryName + "." ) ) {
-                    name = name.substring( entryName.length() + 1 );
-                }
-                else if ( name.startsWith( entryName ) ) {
-                    name = name.substring( entryName.length() );
-                }
-
-                batch.addInsertion( bytebuffer( queueId ), PROPERTY_INDEX_ENTRIES.getColumnFamily(), createColumn(
-                        DynamicComposite
-                                .toByteBuffer( entryName, indexValueCode( entryValue ), toIndexableValue( entryValue ),
-                                        indexUpdate.getTimestampUuid(), name ), ByteBuffer.allocate( 0 ), timestamp, be,
-                        be ) );
-
-                indexUpdate.addIndex( indexEntry.getKey() );
-            }
-
-            indexUpdate.addIndex( entryName );
-        }
-
-        return indexUpdate;
-    }
-
-
-    public QueueSet searchQueueIndex( UUID publisherQueueId, QuerySlice slice, int count ) throws Exception {
-
-        ByteBuffer start = null;
-        if ( slice.getCursor() != null ) {
-            start = slice.getCursor();
-        }
-        else if ( slice.getStart() != null ) {
-            DynamicComposite s = new DynamicComposite( slice.getStart().getCode(), slice.getStart().getValue() );
-            if ( !slice.getStart().isInclusive() ) {
-                setEqualityFlag( s, ComponentEquality.GREATER_THAN_EQUAL );
-            }
-            start = s.serialize();
-        }
-
-        ByteBuffer finish = null;
-        if ( slice.getFinish() != null ) {
-            DynamicComposite f = new DynamicComposite( slice.getFinish().getCode(), slice.getFinish().getValue() );
-            if ( slice.getFinish().isInclusive() ) {
-                setEqualityFlag( f, ComponentEquality.GREATER_THAN_EQUAL );
-            }
-            finish = f.serialize();
-        }
-
-        if ( slice.isReversed() && ( start != null ) && ( finish != null ) ) {
-            ByteBuffer temp = start;
-            start = finish;
-            finish = temp;
-        }
-
-        List<HColumn<ByteBuffer, ByteBuffer>> results =
-                createSliceQuery( cass.getApplicationKeyspace( applicationId ), be, be, be )
-                        .setColumnFamily( PROPERTY_INDEX.getColumnFamily() )
-                        .setKey( bytebuffer( key( publisherQueueId, slice.getPropertyName() ) ) )
-                        .setRange( start, finish, slice.isReversed(), count ).execute().get().getColumns();
-
-        QueueSet queues = new QueueSet();
-        for ( HColumn<ByteBuffer, ByteBuffer> column : results ) {
-            DynamicComposite c = DynamicComposite.fromByteBuffer( column.getName() );
-            queues.addQueue( c.get( 3, se ), c.get( 2, ue ) );
-        }
-        return queues;
-    }
-
-
-    @Override
-    public QueueSet searchSubscribers( String publisherQueuePath, Query query ) {
-
-        if ( query == null ) {
-            query = new Query();
-        }
-
-        publisherQueuePath = normalizeQueuePath( publisherQueuePath );
-        UUID publisherQueueId = getQueueId( publisherQueuePath );
-
-        if ( !query.hasFilterPredicates() && !query.hasSortPredicates() ) {
-
-            return getSubscribers( publisherQueuePath, null, query.getLimit() );
-        }
-
-        QueueSet results = null;
-        String composite_cursor = null;
-
-        QueryProcessor qp = new QueryProcessor( query );
-        List<QuerySlice> slices = qp.getSlices();
-        int search_count = query.getLimit() + 1;
-        if ( slices.size() > 1 ) {
-            search_count = DEFAULT_SEARCH_COUNT;
-        }
-        for ( QuerySlice slice : slices ) {
-
-            QueueSet r = null;
-            try {
-                r = searchQueueIndex( publisherQueueId, slice, search_count );
-            }
-            catch ( Exception e ) {
-                logger.error( "Error during search", e );
-            }
-
-            if ( r == null ) {
-                continue;
-            }
-
-            if ( r.size() > query.getLimit() ) {
-                r.setCursorToLastResult();
-            }
-
-            if ( r.getCursor() != null ) {
-                if ( composite_cursor != null ) {
-                    composite_cursor += "|";
-                }
-                else {
-                    composite_cursor = "";
-                }
-                int hashCode = slice.hashCode();
-                logger.info( "Cursor hash code: {} ", hashCode );
-                composite_cursor += hashCode + ":" + r.getCursor();
-            }
-
-            if ( results != null ) {
-                results.and( r );
-            }
-            else {
-                results = r;
-            }
-        }
-
-        return results;
-    }
-
-
-    @Override
-    public QueueSet getQueues( String firstQueuePath, int limit ) {
-        return getSubscribers( "/", firstQueuePath, limit );
-    }
-
-
-    @Override
-    public QueueSet getChildQueues( String publisherQueuePath, String firstQueuePath, int count ) {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-
-    @Override
-    public UUID getNewConsumerId() {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-
-    /*
-     * (non-Javadoc)
-     *
-     * @see org.usergrid.mq.QueueManager#renewTransaction(java.lang.String,
-     * java.lang.String, org.usergrid.mq.QueueQuery)
-     */
-    @Override
-    public UUID renewTransaction( String queuePath, UUID transactionId, QueueQuery query )
-            throws TransactionNotFoundException {
-        Keyspace ko = cass.getApplicationKeyspace( applicationId );
-        return new ConsumerTransaction( applicationId, ko, lockManager, cass, lockTimeout )
-                .renewTransaction( queuePath, transactionId, query );
-    }
-
-
-    /*
-     * (non-Javadoc)
-     *
-     * @see org.usergrid.mq.QueueManager#deleteTransaction(java.lang.String,
-     * java.lang.String, org.usergrid.mq.QueueQuery)
-     */
-    @Override
-    public void deleteTransaction( String queuePath, UUID transactionId, QueueQuery query ) {
-        this.commitTransaction( queuePath, transactionId, query );
-    }
-
-
-    @Override
-    public void commitTransaction( String queuePath, UUID transactionId, QueueQuery query ) {
-        Keyspace ko = cass.getApplicationKeyspace( applicationId );
-        new ConsumerTransaction( applicationId, ko, lockManager, cass, lockTimeout )
-                .deleteTransaction( queuePath, transactionId, query );
-    }
-
-
-    @Override
-    public boolean hasOutstandingTransactions( String queuePath, UUID consumerId ) {
-        UUID queueId = CassandraMQUtils.getQueueId( queuePath );
-
-        //no consumer id set, use the same one as the overall queue
-        if ( consumerId == null ) {
-            consumerId = queueId;
-        }
-
-        Keyspace ko = cass.getApplicationKeyspace( applicationId );
-
-        return new ConsumerTransaction( applicationId, ko, lockManager, cass , lockTimeout)
-                .hasOutstandingTransactions( queueId, consumerId );
-    }
-
-
-    @Override
-    public boolean hasMessagesInQueue( String queuePath, UUID consumerId ) {
-
-        Keyspace ko = cass.getApplicationKeyspace( applicationId );
-        UUID queueId = CassandraMQUtils.getQueueId( queuePath );
-
-        if ( consumerId == null ) {
-            consumerId = queueId;
-        }
-
-        NoTransactionSearch search = new NoTransactionSearch( ko );
-
-        QueueBounds bounds = search.getQueueBounds( queueId );
-
-        //Queue doesn't exist
-        if ( bounds == null ) {
-            return false;
-        }
-
-        UUID consumerPosition = search.getConsumerQueuePosition( queueId, consumerId );
-
-        //queue exists, but the consumer does not, meaning it's never read from the Q
-        if ( consumerPosition == null ) {
-            return true;
-        }
-
-        //check our consumer position against the newest message.  If it's equal or larger,
-        // we're read to the end of the queue
-        //note that this does not take transactions into consideration, just the client pointer relative to the largest
-        //message in the queue
-        return UUIDUtils.compare( consumerPosition, bounds.getNewest() ) < 0;
-    }
-
-
-    @Override
-    public boolean hasPendingReads( String queuePath, UUID consumerId ) {
-        return hasOutstandingTransactions( queuePath, consumerId ) || hasMessagesInQueue( queuePath, consumerId );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/usergrid/mq/cassandra/QueuesCF.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/usergrid/mq/cassandra/QueuesCF.java b/stack/core/src/main/java/org/usergrid/mq/cassandra/QueuesCF.java
deleted file mode 100644
index 96e25e3..0000000
--- a/stack/core/src/main/java/org/usergrid/mq/cassandra/QueuesCF.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*******************************************************************************
- * Copyright 2012 Apigee Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.usergrid.mq.cassandra;
-
-
-import java.util.List;
-
-import org.usergrid.persistence.cassandra.CFEnum;
-
-import me.prettyprint.hector.api.ddl.ColumnDefinition;
-
-import static me.prettyprint.hector.api.ddl.ComparatorType.COUNTERTYPE;
-import static org.usergrid.persistence.cassandra.CassandraPersistenceUtils.getIndexMetadata;
-
-// Auto-generated by ApplicationCFGenerator
-
-
-public enum QueuesCF implements CFEnum {
-
-    MESSAGE_PROPERTIES( "Entity_Properties", "BytesType", false ),
-
-    QUEUE_PROPERTIES( "Queue_Properties", "BytesType" ),
-
-    QUEUE_INBOX( "Queue_Inbox", "UUIDType" ),
-
-    QUEUE_DICTIONARIES( "Queue_Dictionaries", "BytesType" ),
-
-    QUEUE_SUBSCRIBERS( "Queue_Subscribers", "BytesType" ),
-
-    QUEUE_SUBSCRIPTIONS( "Queue_Subscriptions", "BytesType" ),
-
-    /**
-     * Time based UUID list of future timeouts for messages. The UUID value is a pointer to the original message in the
-     * topic
-     */
-    CONSUMER_QUEUE_TIMEOUTS( "MQ_Consumers_Timeout", "UUIDType" ),
-
-    CONSUMERS( "MQ_Consumers", "BytesType" ),
-
-    CONSUMER_QUEUE_MESSAGES_PROPERTIES( "Consumer_Queue_Messages_Properties", "BytesType" ),
-
-    COUNTERS( "MQ_Counters", "BytesType", COUNTERTYPE.getClassName() ),
-
-    PROPERTY_INDEX( "MQ_Property_Index",
-            "DynamicCompositeType(a=>AsciiType,b=>BytesType,i=>IntegerType,x=>LexicalUUIDType,l=>LongType," +
-                    "t=>TimeUUIDType,s=>UTF8Type,u=>UUIDType,A=>AsciiType(reversed=true),B=>BytesType(reversed=true)," +
-                    "I=>IntegerType(reversed=true),X=>LexicalUUIDType(reversed=true),L=>LongType(reversed=true)," +
-                    "T=>TimeUUIDType(reversed=true),S=>UTF8Type(reversed=true),U=>UUIDType(reversed=true))" ),
-
-    PROPERTY_INDEX_ENTRIES( "MQ_Property_Index_Entries",
-            "DynamicCompositeType(a=>AsciiType,b=>BytesType,i=>IntegerType,x=>LexicalUUIDType,l=>LongType," +
-                    "t=>TimeUUIDType,s=>UTF8Type,u=>UUIDType,A=>AsciiType(reversed=true),B=>BytesType(reversed=true)," +
-                    "I=>IntegerType(reversed=true),X=>LexicalUUIDType(reversed=true),L=>LongType(reversed=true)," +
-                    "T=>TimeUUIDType(reversed=true),S=>UTF8Type(reversed=true),U=>UUIDType(reversed=true))" ),;
-
-    public final static String STATIC_MESSAGES_KEYSPACE = "Usergrid_Messages";
-    public final static String APPLICATION_MESSAGES_KEYSPACE_SUFFIX = "_messages";
-
-    private final String cf;
-    private final String comparator;
-    private final String validator;
-    private final String indexes;
-    private final boolean create;
-
-
-    QueuesCF( String cf, String comparator ) {
-        this.cf = cf;
-        this.comparator = comparator;
-        validator = null;
-        indexes = null;
-        create = true;
-    }
-
-
-    QueuesCF( String cf, String comparator, boolean create ) {
-        this.cf = cf;
-        this.comparator = comparator;
-        validator = null;
-        indexes = null;
-        this.create = create;
-    }
-
-
-    QueuesCF( String cf, String comparator, String validator ) {
-        this.cf = cf;
-        this.comparator = comparator;
-        this.validator = validator;
-        indexes = null;
-        create = true;
-    }
-
-
-    QueuesCF( String cf, String comparator, String validator, String indexes ) {
-        this.cf = cf;
-        this.comparator = comparator;
-        this.validator = validator;
-        this.indexes = indexes;
-        create = true;
-    }
-
-
-    @Override
-    public String toString() {
-        return cf;
-    }
-
-
-    @Override
-    public String getColumnFamily() {
-        return cf;
-    }
-
-
-    @Override
-    public String getComparator() {
-        return comparator;
-    }
-
-
-    @Override
-    public String getValidator() {
-        return validator;
-    }
-
-
-    @Override
-    public boolean isComposite() {
-        return comparator.startsWith( "DynamicCompositeType" );
-    }
-
-
-    @Override
-    public List<ColumnDefinition> getMetadata() {
-        return getIndexMetadata( indexes );
-    }
-
-
-    @Override
-    public boolean create() {
-        return create;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/usergrid/mq/cassandra/io/AbstractSearch.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/usergrid/mq/cassandra/io/AbstractSearch.java b/stack/core/src/main/java/org/usergrid/mq/cassandra/io/AbstractSearch.java
deleted file mode 100644
index 089d025..0000000
--- a/stack/core/src/main/java/org/usergrid/mq/cassandra/io/AbstractSearch.java
+++ /dev/null
@@ -1,358 +0,0 @@
-/*******************************************************************************
- * Copyright 2012 Apigee Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.usergrid.mq.cassandra.io;
-
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.usergrid.mq.Message;
-import org.usergrid.mq.QueueResults;
-import org.usergrid.mq.cassandra.io.NoTransactionSearch.SearchParam;
-import org.usergrid.persistence.exceptions.QueueException;
-import org.usergrid.utils.UUIDUtils;
-
-import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
-import me.prettyprint.cassandra.serializers.UUIDSerializer;
-import me.prettyprint.hector.api.Keyspace;
-import me.prettyprint.hector.api.beans.ColumnSlice;
-import me.prettyprint.hector.api.beans.HColumn;
-import me.prettyprint.hector.api.beans.Row;
-import me.prettyprint.hector.api.beans.Rows;
-import me.prettyprint.hector.api.factory.HFactory;
-import me.prettyprint.hector.api.mutation.Mutator;
-import me.prettyprint.hector.api.query.SliceQuery;
-
-import static me.prettyprint.hector.api.factory.HFactory.createColumn;
-import static me.prettyprint.hector.api.factory.HFactory.createMultigetSliceQuery;
-import static me.prettyprint.hector.api.factory.HFactory.createMutator;
-import static me.prettyprint.hector.api.factory.HFactory.createSliceQuery;
-import static org.usergrid.mq.Queue.QUEUE_NEWEST;
-import static org.usergrid.mq.Queue.QUEUE_OLDEST;
-import static org.usergrid.mq.cassandra.CassandraMQUtils.deserializeMessage;
-import static org.usergrid.mq.cassandra.CassandraMQUtils.getQueueShardRowKey;
-import static org.usergrid.mq.cassandra.QueueManagerImpl.ALL_COUNT;
-import static org.usergrid.mq.cassandra.QueueManagerImpl.QUEUE_SHARD_INTERVAL;
-import static org.usergrid.mq.cassandra.QueueManagerImpl.se;
-import static org.usergrid.mq.cassandra.QueuesCF.CONSUMERS;
-import static org.usergrid.mq.cassandra.QueuesCF.MESSAGE_PROPERTIES;
-import static org.usergrid.mq.cassandra.QueuesCF.QUEUE_INBOX;
-import static org.usergrid.mq.cassandra.QueuesCF.QUEUE_PROPERTIES;
-import static org.usergrid.utils.NumberUtils.roundLong;
-import static org.usergrid.utils.UUIDUtils.MAX_TIME_UUID;
-import static org.usergrid.utils.UUIDUtils.MIN_TIME_UUID;
-import static org.usergrid.utils.UUIDUtils.getTimestampInMillis;
-
-
-/** @author tnine */
-public abstract class AbstractSearch implements QueueSearch
-{
-
-    private static final Logger logger = LoggerFactory.getLogger( AbstractSearch.class );
-
-    protected static final UUIDSerializer ue = new UUIDSerializer();
-
-    protected static final ByteBufferSerializer be = new ByteBufferSerializer();
-
-    protected Keyspace ko;
-
-
-    /**
-     *
-     */
-    public AbstractSearch( Keyspace ko )
-    {
-        this.ko = ko;
-    }
-
-
-    /**
-     * Get the position in the queue for the given appId, consumer and queu
-     *
-     * @param queueId The queueId
-     * @param consumerId The consumerId
-     */
-    public UUID getConsumerQueuePosition( UUID queueId, UUID consumerId )
-    {
-        HColumn<UUID, UUID> result =
-                HFactory.createColumnQuery( ko, ue, ue, ue ).setKey( consumerId ).setName( queueId )
-                        .setColumnFamily( CONSUMERS.getColumnFamily() ).execute().get();
-        if ( result != null )
-        {
-            return result.getValue();
-        }
-
-        return null;
-    }
-
-
-    /** Load the messages into an array list */
-    protected List<Message> loadMessages( Collection<UUID> messageIds, boolean reversed )
-    {
-
-        Rows<UUID, String, ByteBuffer> messageResults =
-                createMultigetSliceQuery( ko, ue, se, be ).setColumnFamily( MESSAGE_PROPERTIES.getColumnFamily() )
-                        .setKeys( messageIds ).setRange( null, null, false, ALL_COUNT ).execute().get();
-
-        List<Message> messages = new ArrayList<Message>( messageIds.size() );
-
-        for ( Row<UUID, String, ByteBuffer> row : messageResults )
-        {
-            Message message = deserializeMessage( row.getColumnSlice().getColumns() );
-
-            if ( message != null )
-            {
-                messages.add( message );
-            }
-        }
-
-        Collections.sort( messages, new RequestedOrderComparator( messageIds ) );
-
-        return messages;
-    }
-
-
-    /** Create the results to return from the given messages */
-    protected QueueResults createResults( List<Message> messages, String queuePath, UUID queueId, UUID consumerId )
-    {
-
-        UUID lastId = null;
-
-        if ( messages != null && messages.size() > 0 )
-        {
-            lastId = messages.get( messages.size() - 1 ).getUuid();
-        }
-
-        return new QueueResults( queuePath, queueId, messages, lastId, consumerId );
-    }
-
-
-    /**
-     * Get a list of UUIDs that can be read for the client. This comes directly from the queue inbox, and DOES NOT take
-     * into account client messages
-     *
-     * @param queueId The queue id to read
-     * @param bounds The bounds to use when reading
-     */
-    protected List<UUID> getQueueRange( UUID queueId, QueueBounds bounds, SearchParam params )
-    {
-
-        if ( bounds == null )
-        {
-            logger.error( "Necessary queue bounds not found" );
-            throw new QueueException( "Neccessary queue bounds not found" );
-        }
-
-        UUID finish_uuid = params.reversed ? bounds.getOldest() : bounds.getNewest();
-
-        List<UUID> results = new ArrayList<UUID>( params.limit );
-
-        UUID start = params.startId;
-
-        if ( start == null )
-        {
-            start = params.reversed ? bounds.getNewest() : bounds.getOldest();
-        }
-
-        if ( start == null )
-        {
-            logger.error( "No first message in queue" );
-            return results;
-        }
-
-        if ( finish_uuid == null )
-        {
-            logger.error( "No last message in queue" );
-            return results;
-        }
-
-        long start_ts_shard = roundLong( getTimestampInMillis( start ), QUEUE_SHARD_INTERVAL );
-
-        long finish_ts_shard = roundLong( getTimestampInMillis( finish_uuid ), QUEUE_SHARD_INTERVAL );
-
-        long current_ts_shard = start_ts_shard;
-        if ( params.reversed )
-        {
-            current_ts_shard = finish_ts_shard;
-        }
-
-        while ( ( current_ts_shard >= start_ts_shard ) && ( current_ts_shard <= finish_ts_shard ) )
-        {
-
-            UUID slice_start = MIN_TIME_UUID;
-            UUID slice_end = MAX_TIME_UUID;
-
-            if ( current_ts_shard == start_ts_shard )
-            {
-                slice_start = start;
-            }
-
-            if ( current_ts_shard == finish_ts_shard )
-            {
-                slice_end = finish_uuid;
-            }
-
-            SliceQuery<ByteBuffer, UUID, ByteBuffer> q = createSliceQuery( ko, be, ue, be );
-            q.setColumnFamily( QUEUE_INBOX.getColumnFamily() );
-            q.setKey( getQueueShardRowKey( queueId, current_ts_shard ) );
-            q.setRange( slice_start, slice_end, params.reversed, params.limit + 1 );
-
-            List<HColumn<UUID, ByteBuffer>> cassResults = q.execute().get().getColumns();
-
-            for ( int i = 0; i < cassResults.size(); i++ )
-            {
-                HColumn<UUID, ByteBuffer> column = cassResults.get( i );
-
-                // skip the first one, we've already read it
-                if ( i == 0 && params.skipFirst && params.startId.equals( column.getName() ) )
-                {
-                    continue;
-                }
-
-                UUID id = column.getName();
-
-                results.add( id );
-
-                logger.debug( "Added id '{}' to result set for queue id '{}'", id, queueId );
-
-                if ( results.size() >= params.limit )
-                {
-                    return results;
-                }
-            }
-
-            if ( params.reversed )
-            {
-                current_ts_shard -= QUEUE_SHARD_INTERVAL;
-            }
-            else
-            {
-                current_ts_shard += QUEUE_SHARD_INTERVAL;
-            }
-        }
-
-        return results;
-    }
-
-
-    /**
-     * Get the bounds for the queue
-     *
-     * @return The bounds for the queue
-     */
-    public QueueBounds getQueueBounds( UUID queueId )
-    {
-        try
-        {
-            ColumnSlice<String, UUID> result = HFactory.createSliceQuery( ko, ue, se, ue ).setKey( queueId )
-                                                       .setColumnNames( QUEUE_NEWEST, QUEUE_OLDEST )
-                                                       .setColumnFamily( QUEUE_PROPERTIES.getColumnFamily() ).execute()
-                                                       .get();
-            if ( result != null && result.getColumnByName( QUEUE_OLDEST ) != null
-                    && result.getColumnByName( QUEUE_NEWEST ) != null )
-            {
-                return new QueueBounds( result.getColumnByName( QUEUE_OLDEST ).getValue(),
-                        result.getColumnByName( QUEUE_NEWEST ).getValue() );
-            }
-        }
-        catch ( Exception e )
-        {
-            logger.error( "Error getting oldest queue message ID", e );
-        }
-        return null;
-    }
-
-
-    /**
-     * Write the updated client pointer
-     *
-     * @param lastReturnedId This is a null safe parameter. If it's null, this won't be written since it means we didn't
-     * read any messages
-     */
-    protected void writeClientPointer( UUID queueId, UUID consumerId, UUID lastReturnedId )
-    {
-        // nothing to do
-        if ( lastReturnedId == null )
-        {
-            return;
-        }
-
-        // we want to set the timestamp to the value from the time uuid. If this is
-        // not the max time uuid to ever be written
-        // for this consumer, we want this to be discarded to avoid internode race
-        // conditions with clock drift.
-        long colTimestamp = UUIDUtils.getTimestampInMicros( lastReturnedId );
-
-        Mutator<UUID> mutator = createMutator( ko, ue );
-
-        if ( logger.isDebugEnabled() )
-        {
-            logger.debug( "Writing last client id pointer of '{}' for queue '{}' and consumer '{}' with timestamp '{}",
-                    new Object[] {
-                            lastReturnedId, queueId, consumerId, colTimestamp
-                    } );
-        }
-
-        mutator.addInsertion( consumerId, CONSUMERS.getColumnFamily(),
-                createColumn( queueId, lastReturnedId, colTimestamp, ue, ue ) );
-
-        mutator.execute();
-    }
-
-
-    private class RequestedOrderComparator implements Comparator<Message>
-    {
-
-        private Map<UUID, Integer> indexCache = new HashMap<UUID, Integer>();
-
-
-        private RequestedOrderComparator( Collection<UUID> ids )
-        {
-            int i = 0;
-
-            for ( UUID id : ids )
-            {
-                indexCache.put( id, i );
-                i++;
-            }
-        }
-
-
-        /*
-         * (non-Javadoc)
-         *
-         * @see java.util.Comparator#compare(java.lang.Object, java.lang.Object)
-         */
-        @Override
-        public int compare( Message o1, Message o2 )
-        {
-            int o1Idx = indexCache.get( o1.getUuid() );
-
-            int o2Idx = indexCache.get( o2.getUuid() );
-
-            return o1Idx - o2Idx;
-        }
-    }
-}