You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/01/30 16:21:05 UTC

[29/34] git commit: update to master

update to master


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9fec2baa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9fec2baa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9fec2baa

Branch: refs/pull/39/merge
Commit: 9fec2baafe85ac97d3b16f4497166b1e79b87fe6
Parents: 3ccbfd8 33e0018
Author: Lewis John McGibbney <le...@apache.org>
Authored: Thu Jan 30 14:20:21 2014 +0000
Committer: Lewis John McGibbney <le...@apache.org>
Committed: Thu Jan 30 14:20:21 2014 +0000

----------------------------------------------------------------------
 portal/Gruntfile.js                             | 40 +++++++--
 portal/js/libs/usergrid.sdk.js                  |  4 +-
 portal/tests/protractor/applications.spec.js    |  6 +-
 portal/tests/protractor/forgotPassword.spec.js  |  5 ++
 portal/tests/protractor/profile.spec.js         | 14 ++--
 portal/tests/protractor/users.spec.js           |  5 +-
 portal/tests/protractor/util.js                 | 34 ++++++--
 portal/tests/protractorConf.js                  | 11 +--
 .../persistence/cassandra/CassandraService.java |  7 +-
 .../persistence/cassandra/QueryProcessor.java   |  2 +-
 .../cassandra/RelationManagerImpl.java          | 22 ++---
 .../cassandra/index/ConnectedIndexScanner.java  | 77 +++++++++++++----
 .../cassandra/index/IndexBucketScanner.java     | 33 ++++++--
 .../persistence/query/ir/SearchVisitor.java     |  7 +-
 .../query/ir/result/IntersectionIterator.java   |  4 +-
 .../query/ir/result/MergeIterator.java          | 28 +++++--
 .../query/ir/result/SliceIterator.java          | 12 +--
 .../query/ir/result/UnionIterator.java          | 17 +++-
 .../query/AllInConnectionNoTypeIT.java          |  1 +
 .../query/IntersectionUnionPagingIT.java        | 25 ++++--
 .../ir/result/IntersectionIteratorTest.java     | 12 +--
 .../query/ir/result/UnionIteratorTest.java      | 88 ++++++++++++++++++++
 stack/tools/pom.xml                             |  4 +
 .../apache/usergrid/tools/EntityCleanup.java    |  4 +-
 .../usergrid/tools/UniqueIndexCleanup.java      |  4 +-
 .../apache/usergrid/tools/bean/ExportOrg.java   | 11 +++
 26 files changed, 363 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9fec2baa/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java
index 1b2b9e3,0000000..d87ebd6
mode 100644,000000..100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java
@@@ -1,1134 -1,0 +1,1137 @@@
 +/*******************************************************************************
 + * Copyright 2012 Apigee Corporation
 + *
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + ******************************************************************************/
 +package org.apache.usergrid.persistence.cassandra;
 +
 +
 +import java.nio.ByteBuffer;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.HashMap;
 +import java.util.LinkedHashMap;
 +import java.util.LinkedHashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Properties;
 +import java.util.Set;
 +import java.util.UUID;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.ScheduledExecutorService;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.usergrid.locking.LockManager;
 +import org.apache.usergrid.persistence.IndexBucketLocator;
 +import org.apache.usergrid.persistence.IndexBucketLocator.IndexType;
 +import org.apache.usergrid.persistence.cassandra.index.IndexBucketScanner;
 +import org.apache.usergrid.persistence.cassandra.index.IndexScanner;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import me.prettyprint.cassandra.connection.HConnectionManager;
 +import me.prettyprint.cassandra.model.ConfigurableConsistencyLevel;
 +import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
 +import me.prettyprint.cassandra.serializers.BytesArraySerializer;
 +import me.prettyprint.cassandra.serializers.DynamicCompositeSerializer;
 +import me.prettyprint.cassandra.serializers.LongSerializer;
 +import me.prettyprint.cassandra.serializers.StringSerializer;
 +import me.prettyprint.cassandra.serializers.UUIDSerializer;
 +import me.prettyprint.cassandra.service.CassandraHostConfigurator;
 +import me.prettyprint.cassandra.service.ThriftKsDef;
 +import me.prettyprint.hector.api.Cluster;
 +import me.prettyprint.hector.api.ConsistencyLevelPolicy;
 +import me.prettyprint.hector.api.HConsistencyLevel;
 +import me.prettyprint.hector.api.Keyspace;
 +import me.prettyprint.hector.api.Serializer;
 +import me.prettyprint.hector.api.beans.ColumnSlice;
 +import me.prettyprint.hector.api.beans.DynamicComposite;
 +import me.prettyprint.hector.api.beans.HColumn;
 +import me.prettyprint.hector.api.beans.OrderedRows;
 +import me.prettyprint.hector.api.beans.Row;
 +import me.prettyprint.hector.api.beans.Rows;
 +import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition;
 +import me.prettyprint.hector.api.ddl.KeyspaceDefinition;
 +import me.prettyprint.hector.api.factory.HFactory;
 +import me.prettyprint.hector.api.mutation.Mutator;
 +import me.prettyprint.hector.api.query.ColumnQuery;
 +import me.prettyprint.hector.api.query.CountQuery;
 +import me.prettyprint.hector.api.query.MultigetSliceQuery;
 +import me.prettyprint.hector.api.query.QueryResult;
 +import me.prettyprint.hector.api.query.RangeSlicesQuery;
 +import me.prettyprint.hector.api.query.SliceQuery;
 +import static me.prettyprint.cassandra.service.FailoverPolicy.ON_FAIL_TRY_ALL_AVAILABLE;
 +import static me.prettyprint.hector.api.factory.HFactory.createColumn;
 +import static me.prettyprint.hector.api.factory.HFactory.createMultigetSliceQuery;
 +import static me.prettyprint.hector.api.factory.HFactory.createMutator;
 +import static me.prettyprint.hector.api.factory.HFactory.createRangeSlicesQuery;
 +import static me.prettyprint.hector.api.factory.HFactory.createSliceQuery;
 +import static me.prettyprint.hector.api.factory.HFactory.createVirtualKeyspace;
 +import static org.apache.commons.collections.MapUtils.getIntValue;
 +import static org.apache.commons.collections.MapUtils.getString;
 +import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_ID_SETS;
 +import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.batchExecute;
 +import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.buildSetIdListMutator;
 +import static org.apache.usergrid.utils.ConversionUtils.bytebuffer;
 +import static org.apache.usergrid.utils.ConversionUtils.bytebuffers;
 +import static org.apache.usergrid.utils.JsonUtils.mapToFormattedJsonString;
 +import static org.apache.usergrid.utils.MapUtils.asMap;
 +import static org.apache.usergrid.utils.MapUtils.filter;
 +
 +
 +public class CassandraService {
 +
 +    public static String SYSTEM_KEYSPACE = "Usergrid";
 +
 +    public static String STATIC_APPLICATION_KEYSPACE = "Usergrid_Applications";
 +
 +    public static final boolean USE_VIRTUAL_KEYSPACES = true;
 +
 +    public static final String APPLICATIONS_CF = "Applications";
 +    public static final String PROPERTIES_CF = "Properties";
 +    public static final String TOKENS_CF = "Tokens";
 +    public static final String PRINCIPAL_TOKEN_CF = "PrincipalTokens";
 +
 +    public static final int DEFAULT_COUNT = 1000;
 +    public static final int ALL_COUNT = 100000;
 +    public static final int INDEX_ENTRY_LIST_COUNT = 1000;
 +    public static final int DEFAULT_SEARCH_COUNT = 10000;
 +
 +    public static final int RETRY_COUNT = 5;
 +
 +    public static final String DEFAULT_APPLICATION = "default-app";
 +    public static final String DEFAULT_ORGANIZATION = "usergrid";
 +    public static final String MANAGEMENT_APPLICATION = "management";
 +
 +    public static final UUID MANAGEMENT_APPLICATION_ID = new UUID( 0, 1 );
 +    public static final UUID DEFAULT_APPLICATION_ID = new UUID( 0, 16 );
 +
 +    private static final Logger logger = LoggerFactory.getLogger( CassandraService.class );
 +
 +    private static final Logger db_logger =
 +            LoggerFactory.getLogger( CassandraService.class.getPackage().getName() + ".DB" );
 +
 +    Cluster cluster;
 +    CassandraHostConfigurator chc;
 +    Properties properties;
 +    LockManager lockManager;
 +
 +    ConsistencyLevelPolicy consistencyLevelPolicy;
 +
 +    private Keyspace systemKeyspace;
 +
 +    private Map<String, String> accessMap;
 +
 +    public static final StringSerializer se = new StringSerializer();
 +    public static final ByteBufferSerializer be = new ByteBufferSerializer();
 +    public static final UUIDSerializer ue = new UUIDSerializer();
 +    public static final BytesArraySerializer bae = new BytesArraySerializer();
 +    public static final DynamicCompositeSerializer dce = new DynamicCompositeSerializer();
 +    public static final LongSerializer le = new LongSerializer();
 +
 +    public static final UUID NULL_ID = new UUID( 0, 0 );
 +
 +
 +    public CassandraService( Properties properties, Cluster cluster,
 +                             CassandraHostConfigurator cassandraHostConfigurator, LockManager lockManager ) {
 +        this.properties = properties;
 +        this.cluster = cluster;
 +        chc = cassandraHostConfigurator;
 +        this.lockManager = lockManager;
 +        db_logger.info( "" + cluster.getKnownPoolHosts( false ) );
 +    }
 +
 +
 +    public void init() throws Exception {
 +        if ( consistencyLevelPolicy == null ) {
 +            consistencyLevelPolicy = new ConfigurableConsistencyLevel();
 +            ( ( ConfigurableConsistencyLevel ) consistencyLevelPolicy )
 +                    .setDefaultReadConsistencyLevel( HConsistencyLevel.ONE );
 +        }
 +        accessMap = new HashMap<String, String>( 2 );
 +        accessMap.put( "username", properties.getProperty( "cassandra.username" ) );
 +        accessMap.put( "password", properties.getProperty( "cassandra.password" ) );
 +        systemKeyspace =
 +                HFactory.createKeyspace( SYSTEM_KEYSPACE, cluster, consistencyLevelPolicy, ON_FAIL_TRY_ALL_AVAILABLE,
 +                        accessMap );
 +    }
 +
 +
 +    public Cluster getCluster() {
 +        return cluster;
 +    }
 +
 +
 +    public void setCluster( Cluster cluster ) {
 +        this.cluster = cluster;
 +    }
 +
 +
 +    public CassandraHostConfigurator getCassandraHostConfigurator() {
 +        return chc;
 +    }
 +
 +
 +    public void setCassandraHostConfigurator( CassandraHostConfigurator chc ) {
 +        this.chc = chc;
 +    }
 +
 +
 +    public Properties getProperties() {
 +        return properties;
 +    }
 +
 +
 +    public void setProperties( Properties properties ) {
 +        this.properties = properties;
 +    }
 +
 +
 +    public Map<String, String> getPropertiesMap() {
 +        if ( properties != null ) {
 +            return asMap( properties );
 +        }
 +        return null;
 +    }
 +
 +
 +    public LockManager getLockManager() {
 +        return lockManager;
 +    }
 +
 +
 +    public void setLockManager( LockManager lockManager ) {
 +        this.lockManager = lockManager;
 +    }
 +
 +
 +    public ConsistencyLevelPolicy getConsistencyLevelPolicy() {
 +        return consistencyLevelPolicy;
 +    }
 +
 +
 +    public void setConsistencyLevelPolicy( ConsistencyLevelPolicy consistencyLevelPolicy ) {
 +        this.consistencyLevelPolicy = consistencyLevelPolicy;
 +    }
 +
 +
 +    /** @return keyspace for application UUID */
 +    public static String keyspaceForApplication( UUID applicationId ) {
 +        if ( USE_VIRTUAL_KEYSPACES ) {
 +            return STATIC_APPLICATION_KEYSPACE;
 +        }
 +        else {
 +            return "Application_" + applicationId.toString().replace( '-', '_' );
 +        }
 +    }
 +
 +
 +    public static UUID prefixForApplication( UUID applicationId ) {
 +        if ( USE_VIRTUAL_KEYSPACES ) {
 +            return applicationId;
 +        }
 +        else {
 +            return null;
 +        }
 +    }
 +
 +
 +    public Keyspace getKeyspace( String keyspace, UUID prefix ) {
 +        Keyspace ko = null;
 +        if ( USE_VIRTUAL_KEYSPACES && ( prefix != null ) ) {
 +            ko = createVirtualKeyspace( keyspace, prefix, ue, cluster, consistencyLevelPolicy,
 +                    ON_FAIL_TRY_ALL_AVAILABLE, accessMap );
 +        }
 +        else {
 +            ko = HFactory.createKeyspace( keyspace, cluster, consistencyLevelPolicy, ON_FAIL_TRY_ALL_AVAILABLE,
 +                    accessMap );
 +        }
 +        return ko;
 +    }
 +
 +
 +    public Keyspace getApplicationKeyspace( UUID applicationId ) {
 +        assert applicationId != null;
 +        Keyspace ko = getKeyspace( keyspaceForApplication( applicationId ), prefixForApplication( applicationId ) );
 +        return ko;
 +    }
 +
 +
 +    /** The Usergrid_Applications keyspace directly */
 +    public Keyspace getUsergridApplicationKeyspace() {
 +        return getKeyspace( STATIC_APPLICATION_KEYSPACE, null );
 +    }
 +
 +
 +    public Keyspace getSystemKeyspace() {
 +        return systemKeyspace;
 +    }
 +
 +
 +    public boolean checkKeyspacesExist() {
 +        boolean exists = false;
 +        try {
 +            exists = cluster.describeKeyspace( SYSTEM_KEYSPACE ) != null
 +                    && cluster.describeKeyspace( STATIC_APPLICATION_KEYSPACE ) != null;
 +        }
 +        catch ( Exception ex ) {
 +            logger.error( "could not describe keyspaces", ex );
 +        }
 +        return exists;
 +    }
 +
 +
 +    /**
 +     * Lazy creates a column family in the keyspace. If it doesn't exist, it will be created, then the call will sleep
 +     * until all nodes have acknowledged the schema change
 +     */
 +    public void createColumnFamily( String keyspace, ColumnFamilyDefinition cfDef ) {
 +
 +        if ( !keySpaceExists( keyspace ) ) {
 +            createKeySpace( keyspace );
 +        }
 +
 +
 +        //add the cf
 +
 +        if ( !cfExists( keyspace, cfDef.getName() ) ) {
 +
 +            //default read repair chance to 0.1
 +            cfDef.setReadRepairChance( 0.1d );
 +
 +            cluster.addColumnFamily( cfDef, true );
 +            logger.info( "Created column family {} in keyspace {}", cfDef.getName(), keyspace );
 +        }
 +    }
 +
 +
 +    /** Create the column families in the list */
 +    public void createColumnFamilies( String keyspace, List<ColumnFamilyDefinition> cfDefs ) {
 +        for ( ColumnFamilyDefinition cfDef : cfDefs ) {
 +            createColumnFamily( keyspace, cfDef );
 +        }
 +    }
 +
 +
 +    /** Check if the keyspace exsts */
 +    public boolean keySpaceExists( String keyspace ) {
 +        KeyspaceDefinition ksDef = cluster.describeKeyspace( keyspace );
 +
 +        return ksDef != null;
 +    }
 +
 +
 +    /** Create the keyspace */
 +    private void createKeySpace( String keyspace ) {
 +        logger.info( "Creating keyspace: {}", keyspace );
 +
 +        String strategy_class =
 +                getString( properties, "cassandra.keyspace.strategy", "org.apache.cassandra.locator.SimpleStrategy" );
 +        logger.info( "Using strategy: {}", strategy_class );
 +
 +        int replication_factor = getIntValue( properties, "cassandra.keyspace.replication", 1 );
 +        logger.info( "Using replication (may be overriden by strategy options): {}", replication_factor );
 +
 +        // try {
 +        ThriftKsDef ks_def = ( ThriftKsDef ) HFactory
 +                .createKeyspaceDefinition( keyspace, strategy_class, replication_factor,
 +                        new ArrayList<ColumnFamilyDefinition>() );
 +
 +        @SuppressWarnings({ "unchecked", "rawtypes" }) Map<String, String> strategy_options =
 +                filter( ( Map ) properties, "cassandra.keyspace.strategy.options.", true );
 +        if ( strategy_options.size() > 0 ) {
 +            logger.info( "Strategy options: {}", mapToFormattedJsonString( strategy_options ) );
 +            ks_def.setStrategyOptions( strategy_options );
 +        }
 +
 +        cluster.addKeyspace( ks_def );
 +
 +        waitForCreation( keyspace );
 +
 +        logger.info( "Created keyspace {}", keyspace );
 +    }
 +
 +
 +    /** Wait until all nodes agree on the same schema version */
 +    private void waitForCreation( String keyspace ) {
 +
 +        while ( true ) {
 +            Map<String, List<String>> versions = cluster.describeSchemaVersions();
 +            // only 1 version, return
 +            if ( versions != null && versions.size() == 1 ) {
 +                return;
 +            }
 +            // sleep and try again
 +            try {
 +                Thread.sleep( 100 );
 +            }
 +            catch ( InterruptedException e ) {
 +            }
 +        }
 +    }
 +
 +
 +    /** Return true if the column family exists */
 +    public boolean cfExists( String keyspace, String cfName ) {
 +        KeyspaceDefinition ksDef = cluster.describeKeyspace( keyspace );
 +
 +        if ( ksDef == null ) {
 +            return false;
 +        }
 +
 +        for ( ColumnFamilyDefinition cf : ksDef.getCfDefs() ) {
 +            if ( cfName.equals( cf.getName() ) ) {
 +                return true;
 +            }
 +        }
 +
 +        return false;
 +    }
 +
 +
 +    /**
 +     * Gets the columns.
 +     *
 +     * @param keyspace the keyspace
 +     * @param columnFamily the column family
 +     * @param key the key
 +     *
 +     * @return columns
 +     *
 +     * @throws Exception the exception
 +     */
 +    public <N, V> List<HColumn<N, V>> getAllColumns( Keyspace ko, Object columnFamily, Object key,
 +                                                     Serializer<N> nameSerializer, Serializer<V> valueSerializer )
 +            throws Exception {
 +
 +        if ( db_logger.isInfoEnabled() ) {
 +            db_logger.info( "getColumns cf={} key={}", columnFamily, key );
 +        }
 +
 +        SliceQuery<ByteBuffer, N, V> q = createSliceQuery( ko, be, nameSerializer, valueSerializer );
 +        q.setColumnFamily( columnFamily.toString() );
 +        q.setKey( bytebuffer( key ) );
 +        q.setRange( null, null, false, ALL_COUNT );
 +        QueryResult<ColumnSlice<N, V>> r = q.execute();
 +        ColumnSlice<N, V> slice = r.get();
 +        List<HColumn<N, V>> results = slice.getColumns();
 +
 +        if ( db_logger.isInfoEnabled() ) {
 +            if ( results == null ) {
 +                db_logger.info( "getColumns returned null" );
 +            }
 +            else {
 +                db_logger.info( "getColumns returned {} columns", results.size() );
 +            }
 +        }
 +
 +        return results;
 +    }
 +
 +
 +    public List<HColumn<String, ByteBuffer>> getAllColumns( Keyspace ko, Object columnFamily, Object key )
 +            throws Exception {
 +        return getAllColumns( ko, columnFamily, key, se, be );
 +    }
 +
 +
 +    public Set<String> getAllColumnNames( Keyspace ko, Object columnFamily, Object key ) throws Exception {
 +        List<HColumn<String, ByteBuffer>> columns = getAllColumns( ko, columnFamily, key );
 +        Set<String> set = new LinkedHashSet<String>();
 +        for ( HColumn<String, ByteBuffer> column : columns ) {
 +            set.add( column.getName() );
 +        }
 +        return set;
 +    }
 +
 +
 +    /**
 +     * Gets the columns.
 +     *
 +     * @param keyspace the keyspace
 +     * @param columnFamily the column family
 +     * @param key the key
 +     * @param start the start
 +     * @param finish the finish
 +     * @param count the count
 +     * @param reversed the reversed
 +     *
 +     * @return columns
 +     *
 +     * @throws Exception the exception
 +     */
 +    public List<HColumn<ByteBuffer, ByteBuffer>> getColumns( Keyspace ko, Object columnFamily, Object key, Object start,
 +                                                             Object finish, int count, boolean reversed )
 +            throws Exception {
 +
 +        if ( db_logger.isDebugEnabled() ) {
 +            db_logger.debug( "getColumns cf=" + columnFamily + " key=" + key + " start=" + start + " finish=" + finish
 +                    + " count=" + count + " reversed=" + reversed );
 +        }
 +
 +        SliceQuery<ByteBuffer, ByteBuffer, ByteBuffer> q = createSliceQuery( ko, be, be, be );
 +        q.setColumnFamily( columnFamily.toString() );
 +        q.setKey( bytebuffer( key ) );
 +
 +        ByteBuffer start_bytes = null;
 +        if ( start instanceof DynamicComposite ) {
 +            start_bytes = ( ( DynamicComposite ) start ).serialize();
 +        }
 +        else if ( start instanceof List ) {
 +            start_bytes = DynamicComposite.toByteBuffer( ( List<?> ) start );
 +        }
 +        else {
 +            start_bytes = bytebuffer( start );
 +        }
 +
 +        ByteBuffer finish_bytes = null;
 +        if ( finish instanceof DynamicComposite ) {
 +            finish_bytes = ( ( DynamicComposite ) finish ).serialize();
 +        }
 +        else if ( finish instanceof List ) {
 +            finish_bytes = DynamicComposite.toByteBuffer( ( List<?> ) finish );
 +        }
 +        else {
 +            finish_bytes = bytebuffer( finish );
 +        }
 +
 +    /*
 +     * if (reversed) { q.setRange(finish_bytes, start_bytes, reversed, count); }
 +     * else { q.setRange(start_bytes, finish_bytes, reversed, count); }
 +     */
 +        q.setRange( start_bytes, finish_bytes, reversed, count );
 +        QueryResult<ColumnSlice<ByteBuffer, ByteBuffer>> r = q.execute();
 +        ColumnSlice<ByteBuffer, ByteBuffer> slice = r.get();
 +        List<HColumn<ByteBuffer, ByteBuffer>> results = slice.getColumns();
 +
 +        if ( db_logger.isDebugEnabled() ) {
 +            if ( results == null ) {
 +                db_logger.debug( "getColumns returned null" );
 +            }
 +            else {
 +                db_logger.debug( "getColumns returned " + results.size() + " columns" );
 +            }
 +        }
 +
 +        return results;
 +    }
 +
 +
 +    public Map<ByteBuffer, List<HColumn<ByteBuffer, ByteBuffer>>> multiGetColumns( Keyspace ko, Object columnFamily,
 +                                                                                   List<?> keys, Object start,
 +                                                                                   Object finish, int count,
 +                                                                                   boolean reversed ) throws Exception {
 +
 +        if ( db_logger.isDebugEnabled() ) {
 +            db_logger.debug( "multiGetColumns cf=" + columnFamily + " keys=" + keys + " start=" + start + " finish="
 +                    + finish + " count=" + count + " reversed=" + reversed );
 +        }
 +
 +        MultigetSliceQuery<ByteBuffer, ByteBuffer, ByteBuffer> q = createMultigetSliceQuery( ko, be, be, be );
 +        q.setColumnFamily( columnFamily.toString() );
 +        q.setKeys( bytebuffers( keys ) );
 +
 +        ByteBuffer start_bytes = null;
 +        if ( start instanceof DynamicComposite ) {
 +            start_bytes = ( ( DynamicComposite ) start ).serialize();
 +        }
 +        else if ( start instanceof List ) {
 +            start_bytes = DynamicComposite.toByteBuffer( ( List<?> ) start );
 +        }
 +        else {
 +            start_bytes = bytebuffer( start );
 +        }
 +
 +        ByteBuffer finish_bytes = null;
 +        if ( finish instanceof DynamicComposite ) {
 +            finish_bytes = ( ( DynamicComposite ) finish ).serialize();
 +        }
 +        else if ( finish instanceof List ) {
 +            finish_bytes = DynamicComposite.toByteBuffer( ( List<?> ) finish );
 +        }
 +        else {
 +            finish_bytes = bytebuffer( finish );
 +        }
 +
 +        q.setRange( start_bytes, finish_bytes, reversed, count );
 +        QueryResult<Rows<ByteBuffer, ByteBuffer, ByteBuffer>> r = q.execute();
 +        Rows<ByteBuffer, ByteBuffer, ByteBuffer> rows = r.get();
 +
 +        Map<ByteBuffer, List<HColumn<ByteBuffer, ByteBuffer>>> results =
 +                new LinkedHashMap<ByteBuffer, List<HColumn<ByteBuffer, ByteBuffer>>>();
 +        for ( Row<ByteBuffer, ByteBuffer, ByteBuffer> row : rows ) {
 +            results.put( row.getKey(), row.getColumnSlice().getColumns() );
 +        }
 +
 +        return results;
 +    }
 +
 +
 +    /**
 +     * Gets the columns.
 +     *
 +     * @param keyspace the keyspace
 +     * @param columnFamily the column family
 +     * @param keys the keys
 +     *
 +     * @return map of keys to columns
 +     *
 +     * @throws Exception the exception
 +     */
 +    public <K, N, V> Rows<K, N, V> getRows( Keyspace ko, Object columnFamily, Collection<K> keys,
 +                                            Serializer<K> keySerializer, Serializer<N> nameSerializer,
 +                                            Serializer<V> valueSerializer ) throws Exception {
 +
 +        if ( db_logger.isDebugEnabled() ) {
 +            db_logger.debug( "getColumns cf=" + columnFamily + " keys=" + keys );
 +        }
 +
 +        MultigetSliceQuery<K, N, V> q = createMultigetSliceQuery( ko, keySerializer, nameSerializer, valueSerializer );
 +        q.setColumnFamily( columnFamily.toString() );
 +        q.setKeys( keys );
 +        q.setRange( null, null, false, ALL_COUNT );
 +        QueryResult<Rows<K, N, V>> r = q.execute();
 +        Rows<K, N, V> results = r.get();
 +
 +        if ( db_logger.isInfoEnabled() ) {
 +            if ( results == null ) {
 +                db_logger.info( "getColumns returned null" );
 +            }
 +            else {
 +                db_logger.info( "getColumns returned " + results.getCount() + " columns" );
 +            }
 +        }
 +
 +        return results;
 +    }
 +
 +
 +    /**
 +     * Gets the columns.
 +     *
 +     * @param keyspace the keyspace
 +     * @param columnFamily the column family
 +     * @param key the key
 +     * @param columnNames the column names
 +     *
 +     * @return columns
 +     *
 +     * @throws Exception the exception
 +     */
 +    @SuppressWarnings("unchecked")
 +    public <N, V> List<HColumn<N, V>> getColumns( Keyspace ko, Object columnFamily, Object key, Set<String> columnNames,
 +                                                  Serializer<N> nameSerializer, Serializer<V> valueSerializer )
 +            throws Exception {
 +
 +        if ( db_logger.isDebugEnabled() ) {
 +            db_logger.debug( "getColumns cf=" + columnFamily + " key=" + key + " names=" + columnNames );
 +        }
 +
 +        SliceQuery<ByteBuffer, N, V> q = createSliceQuery( ko, be, nameSerializer, valueSerializer );
 +        q.setColumnFamily( columnFamily.toString() );
 +        q.setKey( bytebuffer( key ) );
 +        // q.setColumnNames(columnNames.toArray(new String[0]));
 +        q.setColumnNames( ( N[] ) nameSerializer.fromBytesSet( se.toBytesSet( new ArrayList<String>( columnNames ) ) )
 +                                                .toArray() );
 +
 +        QueryResult<ColumnSlice<N, V>> r = q.execute();
 +        ColumnSlice<N, V> slice = r.get();
 +        List<HColumn<N, V>> results = slice.getColumns();
 +
 +        if ( db_logger.isInfoEnabled() ) {
 +            if ( results == null ) {
 +                db_logger.info( "getColumns returned null" );
 +            }
 +            else {
 +                db_logger.info( "getColumns returned " + results.size() + " columns" );
 +            }
 +        }
 +
 +        return results;
 +    }
 +
 +
 +    /**
 +     * Gets the columns.
 +     *
 +     * @param keyspace the keyspace
 +     * @param columnFamily the column family
 +     * @param keys the keys
 +     * @param columnNames the column names
 +     *
 +     * @return map of keys to columns
 +     *
 +     * @throws Exception the exception
 +     */
 +    @SuppressWarnings("unchecked")
 +    public <K, N, V> Rows<K, N, V> getRows( Keyspace ko, Object columnFamily, Collection<K> keys,
 +                                            Collection<String> columnNames, Serializer<K> keySerializer,
 +                                            Serializer<N> nameSerializer, Serializer<V> valueSerializer )
 +            throws Exception {
 +
 +        if ( db_logger.isDebugEnabled() ) {
 +            db_logger.debug( "getColumns cf=" + columnFamily + " keys=" + keys + " names=" + columnNames );
 +        }
 +
 +        MultigetSliceQuery<K, N, V> q = createMultigetSliceQuery( ko, keySerializer, nameSerializer, valueSerializer );
 +        q.setColumnFamily( columnFamily.toString() );
 +        q.setKeys( keys );
 +        q.setColumnNames( ( N[] ) nameSerializer.fromBytesSet( se.toBytesSet( new ArrayList<String>( columnNames ) ) )
 +                                                .toArray() );
 +        QueryResult<Rows<K, N, V>> r = q.execute();
 +        Rows<K, N, V> results = r.get();
 +
 +        if ( db_logger.isInfoEnabled() ) {
 +            if ( results == null ) {
 +                db_logger.info( "getColumns returned null" );
 +            }
 +            else {
 +                db_logger.info( "getColumns returned " + results.getCount() + " columns" );
 +            }
 +        }
 +
 +        return results;
 +    }
 +
 +
 +    /**
 +     * Gets the column.
 +     *
 +     * @param keyspace the keyspace
 +     * @param columnFamily the column family
 +     * @param key the key
 +     * @param column the column
 +     *
 +     * @return column
 +     *
 +     * @throws Exception the exception
 +     */
 +    public <N, V> HColumn<N, V> getColumn( Keyspace ko, Object columnFamily, Object key, N column,
 +                                           Serializer<N> nameSerializer, Serializer<V> valueSerializer )
 +            throws Exception {
 +
 +        if ( db_logger.isDebugEnabled() ) {
 +            db_logger.debug( "getColumn cf=" + columnFamily + " key=" + key + " column=" + column );
 +        }
 +
 +    /*
 +     * ByteBuffer column_bytes = null; if (column instanceof List) {
 +     * column_bytes = Composite.serializeToByteBuffer((List<?>) column); } else
 +     * { column_bytes = bytebuffer(column); }
 +     */
 +
 +        ColumnQuery<ByteBuffer, N, V> q = HFactory.createColumnQuery( ko, be, nameSerializer, valueSerializer );
 +        QueryResult<HColumn<N, V>> r =
 +                q.setKey( bytebuffer( key ) ).setName( column ).setColumnFamily( columnFamily.toString() ).execute();
 +        HColumn<N, V> result = r.get();
 +
 +        if ( db_logger.isInfoEnabled() ) {
 +            if ( result == null ) {
 +                db_logger.info( "getColumn returned null" );
 +            }
 +        }
 +
 +        return result;
 +    }
 +
 +
 +    public <N, V> ColumnSlice<N, V> getColumns( Keyspace ko, Object columnFamily, Object key, N[] columns,
 +                                                Serializer<N> nameSerializer, Serializer<V> valueSerializer )
 +            throws Exception {
 +
 +        if ( db_logger.isDebugEnabled() ) {
 +            db_logger.debug( "getColumn cf=" + columnFamily + " key=" + key + " column=" + columns );
 +        }
 +
 +    /*
 +     * ByteBuffer column_bytes = null; if (column instanceof List) {
 +     * column_bytes = Composite.serializeToByteBuffer((List<?>) column); } else
 +     * { column_bytes = bytebuffer(column); }
 +     */
 +
 +        SliceQuery<ByteBuffer, N, V> q = HFactory.createSliceQuery( ko, be, nameSerializer, valueSerializer );
 +        QueryResult<ColumnSlice<N, V>> r =
 +                q.setKey( bytebuffer( key ) ).setColumnNames( columns ).setColumnFamily( columnFamily.toString() )
 +                 .execute();
 +        ColumnSlice<N, V> result = r.get();
 +
 +        if ( db_logger.isDebugEnabled() ) {
 +            if ( result == null ) {
 +                db_logger.debug( "getColumn returned null" );
 +            }
 +        }
 +
 +        return result;
 +    }
 +
 +
 +    public HColumn<String, ByteBuffer> getColumn( Keyspace ko, Object columnFamily, Object key, String column )
 +            throws Exception {
 +        return getColumn( ko, columnFamily, key, column, se, be );
 +    }
 +
 +
 +    public void setColumn( Keyspace ko, Object columnFamily, Object key, Object columnName, Object columnValue )
 +            throws Exception {
 +        this.setColumn( ko, columnFamily, key, columnName, columnValue, 0 );
 +    }
 +
 +
 +    public void setColumn( Keyspace ko, Object columnFamily, Object key, Object columnName, Object columnValue,
 +                           int ttl ) throws Exception {
 +
 +        if ( db_logger.isDebugEnabled() ) {
 +            db_logger.debug( "setColumn cf=" + columnFamily + " key=" + key + " name=" + columnName + " value="
 +                    + columnValue );
 +        }
 +
 +        ByteBuffer name_bytes = null;
 +        if ( columnName instanceof List ) {
 +            name_bytes = DynamicComposite.toByteBuffer( ( List<?> ) columnName );
 +        }
 +        else {
 +            name_bytes = bytebuffer( columnName );
 +        }
 +
 +        ByteBuffer value_bytes = null;
 +        if ( columnValue instanceof List ) {
 +            value_bytes = DynamicComposite.toByteBuffer( ( List<?> ) columnValue );
 +        }
 +        else {
 +            value_bytes = bytebuffer( columnValue );
 +        }
 +
 +        HColumn<ByteBuffer, ByteBuffer> col = createColumn( name_bytes, value_bytes, be, be );
 +        if ( ttl != 0 ) {
 +            col.setTtl( ttl );
 +        }
 +        Mutator<ByteBuffer> m = createMutator( ko, be );
 +        m.insert( bytebuffer( key ), columnFamily.toString(), col );
 +    }
 +
 +
 +    /**
 +     * Sets the columns.
 +     *
 +     * @param keyspace the keyspace
 +     * @param columnFamily the column family
 +     * @param key the key
 +     * @param map the map
 +     *
 +     * @throws Exception the exception
 +     */
 +    public void setColumns( Keyspace ko, Object columnFamily, byte[] key, Map<?, ?> map ) throws Exception {
 +        this.setColumns( ko, columnFamily, key, map, 0 );
 +    }
 +
 +
 +    public void setColumns( Keyspace ko, Object columnFamily, byte[] key, Map<?, ?> map, int ttl ) throws Exception {
 +
 +        if ( db_logger.isDebugEnabled() ) {
 +            db_logger.debug( "setColumns cf=" + columnFamily + " key=" + key + " map=" + map + ( ttl != 0 ?
 +                                                                                                 " ttl=" + ttl : "" ) );
 +        }
 +
 +        Mutator<ByteBuffer> m = createMutator( ko, be );
 +        long timestamp = createTimestamp();
 +
 +        for ( Object name : map.keySet() ) {
 +            Object value = map.get( name );
 +            if ( value != null ) {
 +
 +                ByteBuffer name_bytes = null;
 +                if ( name instanceof List ) {
 +                    name_bytes = DynamicComposite.toByteBuffer( ( List<?> ) name );
 +                }
 +                else {
 +                    name_bytes = bytebuffer( name );
 +                }
 +
 +                ByteBuffer value_bytes = null;
 +                if ( value instanceof List ) {
 +                    value_bytes = DynamicComposite.toByteBuffer( ( List<?> ) value );
 +                }
 +                else {
 +                    value_bytes = bytebuffer( value );
 +                }
 +
 +                HColumn<ByteBuffer, ByteBuffer> col = createColumn( name_bytes, value_bytes, timestamp, be, be );
 +                if ( ttl != 0 ) {
 +                    col.setTtl( ttl );
 +                }
 +                m.addInsertion( bytebuffer( key ), columnFamily.toString(),
 +                        createColumn( name_bytes, value_bytes, timestamp, be, be ) );
 +            }
 +        }
 +        batchExecute( m, CassandraService.RETRY_COUNT );
 +    }
 +
 +
 +    /**
 +     * Create a timestamp based on the TimeResolution set to the cluster.
 +     *
 +     * @return a timestamp
 +     */
 +    public long createTimestamp() {
 +        return chc.getClockResolution().createClock();
 +    }
 +
 +
 +    /**
 +     * Delete column.
 +     *
 +     * @param keyspace the keyspace
 +     * @param columnFamily the column family
 +     * @param key the key
 +     * @param column the column
 +     *
 +     * @throws Exception the exception
 +     */
 +    public void deleteColumn( Keyspace ko, Object columnFamily, Object key, Object column ) throws Exception {
 +
 +        if ( db_logger.isDebugEnabled() ) {
 +            db_logger.debug( "deleteColumn cf=" + columnFamily + " key=" + key + " name=" + column );
 +        }
 +
 +        Mutator<ByteBuffer> m = createMutator( ko, be );
 +        m.delete( bytebuffer( key ), columnFamily.toString(), bytebuffer( column ), be );
 +    }
 +
 +
 +    /**
 +     * Gets the row keys.
 +     *
 +     * @param keyspace the keyspace
 +     * @param columnFamily the column family
 +     *
 +     * @return set of keys
 +     *
 +     * @throws Exception the exception
 +     */
 +    public <K> Set<K> getRowKeySet( Keyspace ko, Object columnFamily, Serializer<K> keySerializer ) throws Exception {
 +
 +        if ( db_logger.isDebugEnabled() ) {
 +            db_logger.debug( "getRowKeys cf=" + columnFamily );
 +        }
 +
 +        RangeSlicesQuery<K, ByteBuffer, ByteBuffer> q = createRangeSlicesQuery( ko, keySerializer, be, be );
 +        q.setColumnFamily( columnFamily.toString() );
 +        q.setKeys( null, null );
 +        q.setColumnNames( new ByteBuffer[0] );
 +        QueryResult<OrderedRows<K, ByteBuffer, ByteBuffer>> r = q.execute();
 +        OrderedRows<K, ByteBuffer, ByteBuffer> rows = r.get();
 +
 +        Set<K> results = new LinkedHashSet<K>();
 +        for ( Row<K, ByteBuffer, ByteBuffer> row : rows ) {
 +            results.add( row.getKey() );
 +        }
 +
 +        if ( db_logger.isDebugEnabled() ) {
 +            {
 +                db_logger.debug( "getRowKeys returned " + results.size() + " rows" );
 +            }
 +        }
 +
 +        return results;
 +    }
 +
 +
 +    /**
 +     * Gets the row keys as uui ds.
 +     *
 +     * @param keyspace the keyspace
 +     * @param columnFamily the column family
 +     *
 +     * @return list of row key UUIDs
 +     *
 +     * @throws Exception the exception
 +     */
 +    public <K> List<K> getRowKeyList( Keyspace ko, Object columnFamily, Serializer<K> keySerializer ) throws Exception {
 +
 +        RangeSlicesQuery<K, ByteBuffer, ByteBuffer> q = createRangeSlicesQuery( ko, keySerializer, be, be );
 +        q.setColumnFamily( columnFamily.toString() );
 +        q.setKeys( null, null );
 +        q.setColumnNames( new ByteBuffer[0] );
 +        QueryResult<OrderedRows<K, ByteBuffer, ByteBuffer>> r = q.execute();
 +        OrderedRows<K, ByteBuffer, ByteBuffer> rows = r.get();
 +
 +        List<K> list = new ArrayList<K>();
 +        for ( Row<K, ByteBuffer, ByteBuffer> row : rows ) {
 +            list.add( row.getKey() );
 +            // K uuid = row.getKey();
 +            // if (uuid != UUIDUtils.ZERO_UUID) {
 +            // list.add(uuid);
 +            // }
 +        }
 +
 +        return list;
 +    }
 +
 +
 +    /**
 +     * Delete row.
 +     *
 +     * @param keyspace the keyspace
 +     * @param columnFamily the column family
 +     * @param key the key
 +     *
 +     * @throws Exception the exception
 +     */
 +    public void deleteRow( Keyspace ko, final Object columnFamily, final Object key ) throws Exception {
 +
 +        if ( db_logger.isDebugEnabled() ) {
 +            db_logger.debug( "deleteRow cf=" + columnFamily + " key=" + key );
 +        }
 +
 +        createMutator( ko, be ).addDeletion( bytebuffer( key ), columnFamily.toString() ).execute();
 +    }
 +
 +
 +    public void deleteRow( Keyspace ko, final Object columnFamily, final String key ) throws Exception {
 +
 +        if ( db_logger.isDebugEnabled() ) {
 +            db_logger.debug( "deleteRow cf=" + columnFamily + " key=" + key );
 +        }
 +
 +        createMutator( ko, se ).addDeletion( key, columnFamily.toString() ).execute();
 +    }
 +
 +
 +    /**
 +     * Delete row.
 +     *
 +     * @param keyspace the keyspace
 +     * @param columnFamily the column family
 +     * @param key the key
 +     * @param timestamp the timestamp
 +     *
 +     * @throws Exception the exception
 +     */
 +    public void deleteRow( Keyspace ko, final Object columnFamily, final Object key, final long timestamp )
 +            throws Exception {
 +
 +        if ( db_logger.isDebugEnabled() ) {
 +            db_logger.debug( "deleteRow cf=" + columnFamily + " key=" + key + " timestamp=" + timestamp );
 +        }
 +
 +        createMutator( ko, be ).addDeletion( bytebuffer( key ), columnFamily.toString(), timestamp ).execute();
 +    }
 +
 +
 +    /**
 +     * Gets the id list.
 +     *
 +     * @param ko the keyspace
 +     * @param key the key
 +     * @param start the start
 +     * @param finish the finish
 +     * @param count the count
 +     * @param reversed True if the scan should be reversed
 +     * @param locator The index locator instance
 +     * @param applicationId The applicationId
 +     * @param collectionName The name of the collection to get the Ids for
 +     *
 +     * @return list of columns as UUIDs
 +     *
 +     * @throws Exception the exception
 +     */
 +    public IndexScanner getIdList( Keyspace ko, Object key, UUID start, UUID finish, int count, boolean reversed,
-                                    IndexBucketLocator locator, UUID applicationId, String collectionName )
++                                   IndexBucketLocator locator, UUID applicationId, String collectionName, boolean keepFirst )
 +            throws Exception {
 +
 +        if ( count <= 0 ) {
 +            count = DEFAULT_COUNT;
 +        }
 +
 +        if ( NULL_ID.equals( start ) ) {
 +            start = null;
 +        }
 +
++
++        final boolean skipFirst = start != null && !keepFirst;
++
 +        IndexScanner scanner =
 +                new IndexBucketScanner( this, locator, ENTITY_ID_SETS, applicationId, IndexType.COLLECTION, key, start,
-                         finish, reversed, count, collectionName );
++                        finish, reversed, count, skipFirst, collectionName );
 +
 +        return scanner;
 +    }
 +
 +
 +    public int countColumns( Keyspace ko, Object columnFamily, Object key ) throws Exception {
 +
 +
 +        CountQuery<ByteBuffer, ByteBuffer> cq = HFactory.createCountQuery( ko, be, be );
 +        cq.setColumnFamily( columnFamily.toString() );
 +        cq.setKey( bytebuffer( key ) );
 +        cq.setRange( ByteBuffer.allocate( 0 ), ByteBuffer.allocate( 0 ), 100000000 );
 +        QueryResult<Integer> r = cq.execute();
 +        if ( r == null ) {
 +            return 0;
 +        }
 +        return r.get();
 +    }
 +
 +
 +    /**
 +     * Sets the id list.
 +     *
 +     * @param keyspace the keyspace
 +     * @param targetId the target id
 +     * @param columnFamily the column family
 +     * @param keyPrefix the key prefix
 +     * @param keySuffix the key suffix
 +     * @param keyIds the key ids
 +     * @param setColumnValue the set column value
 +     *
 +     * @throws Exception the exception
 +     */
 +    public void setIdList( Keyspace ko, UUID targetId, String keyPrefix, String keySuffix, List<UUID> keyIds )
 +            throws Exception {
 +        long timestamp = createTimestamp();
 +        Mutator<ByteBuffer> batch = createMutator( ko, be );
 +        batch = buildSetIdListMutator( batch, targetId, ENTITY_ID_SETS.toString(), keyPrefix, keySuffix, keyIds,
 +                timestamp );
 +        batchExecute( batch, CassandraService.RETRY_COUNT );
 +    }
 +
 +
 +    boolean clusterUp = false;
 +
 +
 +    public void startClusterHealthCheck() {
 +
 +        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
 +        executorService.scheduleWithFixedDelay( new Runnable() {
 +            @Override
 +            public void run() {
 +                if ( cluster != null ) {
 +                    HConnectionManager connectionManager = cluster.getConnectionManager();
 +                    if ( connectionManager != null ) {
 +                        clusterUp = !connectionManager.getHosts().isEmpty();
 +                    }
 +                }
 +            }
 +        }, 1, 5, TimeUnit.SECONDS );
 +    }
 +    
 +    public void destroy() throws Exception {
 +    	if (cluster != null) {
 +    		HConnectionManager connectionManager = cluster.getConnectionManager();
 +    		if (connectionManager != null) {
 +    			connectionManager.shutdown();
 +    		}
 +    	}
 +    	cluster = null;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9fec2baa/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryProcessor.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryProcessor.java
index 2c43b4e,0000000..c5bddaf
mode 100644,000000..100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryProcessor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryProcessor.java
@@@ -1,719 -1,0 +1,719 @@@
 +/*******************************************************************************
 + * Copyright 2012 Apigee Corporation
 + *
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + ******************************************************************************/
 +package org.apache.usergrid.persistence.cassandra;
 +
 +
 +import java.nio.ByteBuffer;
 +import java.util.ArrayList;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Stack;
 +import java.util.UUID;
 +
 +import org.apache.usergrid.persistence.EntityManager;
 +import org.apache.usergrid.persistence.Identifier;
 +import org.apache.usergrid.persistence.Query;
 +import org.apache.usergrid.persistence.Results;
 +import org.apache.usergrid.persistence.Schema;
 +import org.apache.usergrid.persistence.Query.SortDirection;
 +import org.apache.usergrid.persistence.Query.SortPredicate;
 +import org.apache.usergrid.persistence.entities.User;
 +import org.apache.usergrid.persistence.exceptions.NoFullTextIndexException;
 +import org.apache.usergrid.persistence.exceptions.NoIndexException;
 +import org.apache.usergrid.persistence.exceptions.PersistenceException;
 +import org.apache.usergrid.persistence.query.ir.AllNode;
 +import org.apache.usergrid.persistence.query.ir.AndNode;
 +import org.apache.usergrid.persistence.query.ir.EmailIdentifierNode;
 +import org.apache.usergrid.persistence.query.ir.NameIdentifierNode;
 +import org.apache.usergrid.persistence.query.ir.NotNode;
 +import org.apache.usergrid.persistence.query.ir.OrNode;
 +import org.apache.usergrid.persistence.query.ir.OrderByNode;
 +import org.apache.usergrid.persistence.query.ir.QueryNode;
 +import org.apache.usergrid.persistence.query.ir.QuerySlice;
 +import org.apache.usergrid.persistence.query.ir.SearchVisitor;
 +import org.apache.usergrid.persistence.query.ir.SliceNode;
 +import org.apache.usergrid.persistence.query.ir.UuidIdentifierNode;
 +import org.apache.usergrid.persistence.query.ir.WithinNode;
 +import org.apache.usergrid.persistence.query.ir.result.ResultIterator;
 +import org.apache.usergrid.persistence.query.ir.result.ResultsLoader;
 +import org.apache.usergrid.persistence.query.ir.result.ResultsLoaderFactory;
 +import org.apache.usergrid.persistence.query.ir.result.ScanColumn;
 +import org.apache.usergrid.persistence.query.tree.AndOperand;
 +import org.apache.usergrid.persistence.query.tree.ContainsOperand;
 +import org.apache.usergrid.persistence.query.tree.Equal;
 +import org.apache.usergrid.persistence.query.tree.EqualityOperand;
 +import org.apache.usergrid.persistence.query.tree.GreaterThan;
 +import org.apache.usergrid.persistence.query.tree.GreaterThanEqual;
 +import org.apache.usergrid.persistence.query.tree.LessThan;
 +import org.apache.usergrid.persistence.query.tree.LessThanEqual;
 +import org.apache.usergrid.persistence.query.tree.Literal;
 +import org.apache.usergrid.persistence.query.tree.NotOperand;
 +import org.apache.usergrid.persistence.query.tree.Operand;
 +import org.apache.usergrid.persistence.query.tree.OrOperand;
 +import org.apache.usergrid.persistence.query.tree.QueryVisitor;
 +import org.apache.usergrid.persistence.query.tree.StringLiteral;
 +import org.apache.usergrid.persistence.query.tree.WithinOperand;
 +import org.apache.usergrid.persistence.schema.CollectionInfo;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import me.prettyprint.cassandra.serializers.UUIDSerializer;
 +import static org.apache.usergrid.persistence.Schema.getDefaultSchema;
 +
 +
 +public class QueryProcessor {
 +
-     private static final int PAGE_SIZE = 1000;
++    public static final int PAGE_SIZE = 1000;
 +    private static final Logger logger = LoggerFactory.getLogger( QueryProcessor.class );
 +
 +    private static final Schema SCHEMA = getDefaultSchema();
 +
 +    private final CollectionInfo collectionInfo;
 +    private final EntityManager em;
 +    private final ResultsLoaderFactory loaderFactory;
 +
 +    private Operand rootOperand;
 +    private List<SortPredicate> sorts;
 +    private CursorCache cursorCache;
 +    private QueryNode rootNode;
 +    private String entityType;
 +
 +    private int size;
 +    private Query query;
 +    private int pageSizeHint;
 +
 +
 +    public QueryProcessor( Query query, CollectionInfo collectionInfo, EntityManager em,
 +                           ResultsLoaderFactory loaderFactory ) throws PersistenceException {
 +        setQuery( query );
 +        this.collectionInfo = collectionInfo;
 +        this.em = em;
 +        this.loaderFactory = loaderFactory;
 +        process();
 +    }
 +
 +
 +    public Query getQuery() {
 +        return query;
 +    }
 +
 +
 +    public void setQuery( Query query ) {
 +        this.sorts = query.getSortPredicates();
 +        this.cursorCache = new CursorCache( query.getCursor() );
 +        this.rootOperand = query.getRootOperand();
 +        this.entityType = query.getEntityType();
 +        this.size = query.getLimit();
 +        this.query = query;
 +    }
 +
 +
 +    public CollectionInfo getCollectionInfo() {
 +        return collectionInfo;
 +    }
 +
 +
 +    private void process() throws PersistenceException {
 +
 +        int opCount = 0;
 +
 +        // no operand. Check for sorts
 +        if ( rootOperand != null ) {
 +            // visit the tree
 +
 +            TreeEvaluator visitor = new TreeEvaluator();
 +
 +            rootOperand.visit( visitor );
 +
 +            rootNode = visitor.getRootNode();
 +
 +            opCount = visitor.getSliceCount();
 +        }
 +
 +        // see if we have sorts, if so, we can add them all as a single node at
 +        // the root
 +        if ( sorts.size() > 0 ) {
 +
 +            OrderByNode order = generateSorts( opCount );
 +
 +            opCount += order.getFirstPredicate().getAllSlices().size();
 +
 +            rootNode = order;
 +        }
 +
 +
 +        //if we still don't have a root node, no query nor order by was specified,
 +        // just use the all node or the identifiers
 +        if ( rootNode == null ) {
 +
 +
 +            //a name alias or email alias was specified
 +            if ( query.containsSingleNameOrEmailIdentifier() ) {
 +
 +                Identifier ident = query.getSingleIdentifier();
 +
 +                //an email was specified.  An edge case that only applies to users.  This is fulgy to put here,
 +                // but required
 +                if ( query.getEntityType().equals( User.ENTITY_TYPE ) && ident.isEmail() ) {
 +                    rootNode = new EmailIdentifierNode( ident );
 +                }
 +
 +                //use the ident with the default alias.  could be an email
 +                else {
 +                    rootNode = new NameIdentifierNode( ident.getName() );
 +                }
 +            }
 +            //a uuid was specified
 +            else if ( query.containsSingleUuidIdentifier() ) {
 +                rootNode = new UuidIdentifierNode( query.getSingleUuidIdentifier() );
 +            }
 +
 +
 +            //nothing was specified, order it by uuid
 +            else {
 +
 +
 +                //this is a bit ugly, but how we handle the start parameter
 +                UUID startResult = query.getStartResult();
 +
 +                boolean startResultSet = startResult != null;
 +
 +                AllNode allNode = new AllNode( 0, startResultSet );
 +
 +                if ( startResultSet ) {
 +                    cursorCache.setNextCursor( allNode.getSlice().hashCode(),
 +                            UUIDSerializer.get().toByteBuffer( startResult ) );
 +                }
 +
 +                rootNode = allNode;
 +            }
 +        }
 +
 +        if ( opCount > 1 ) {
 +            pageSizeHint = PAGE_SIZE;
 +        }
 +        else {
 +            pageSizeHint = Math.min( size, PAGE_SIZE );
 +        }
 +    }
 +
 +
 +    public QueryNode getFirstNode() {
 +        return rootNode;
 +    }
 +
 +
 +    /**
 +     * Apply cursor position and sort order to this slice. This should only be invoke at evaluation time to ensure that
 +     * the IR tree has already been fully constructed
 +     */
 +    public void applyCursorAndSort( QuerySlice slice ) {
 +        // apply the sort first, since this can change the hash code
 +        SortPredicate sort = getSort( slice.getPropertyName() );
 +
 +        if ( sort != null ) {
 +            boolean isReversed = sort.getDirection() == SortDirection.DESCENDING;
 +
 +            //we're reversing the direction of this slice, reverse the params as well
 +            if ( isReversed != slice.isReversed() ) {
 +                slice.reverse();
 +            }
 +        }
 +        // apply the cursor
 +        ByteBuffer cursor = cursorCache.getCursorBytes( slice.hashCode() );
 +
 +        if ( cursor != null ) {
 +            slice.setCursor( cursor );
 +        }
 +    }
 +
 +
 +    /**
 +     * Return the node id from the cursor cache
 +     * @param nodeId
 +     * @return
 +     */
 +    public ByteBuffer getCursorCache(int nodeId){
 +        return cursorCache.getCursorBytes( nodeId );
 +    }
 +
 +
 +    private SortPredicate getSort( String propertyName ) {
 +        for ( SortPredicate sort : sorts ) {
 +            if ( sort.getPropertyName().equals( propertyName ) ) {
 +                return sort;
 +            }
 +        }
 +        return null;
 +    }
 +
 +
 +    /** Return the iterator results, ordered if required */
 +    public Results getResults( SearchVisitor visitor ) throws Exception {
 +        // if we have no order by just load the results
 +
 +        if ( rootNode == null ) {
 +            return null;
 +        }
 +
 +        rootNode.visit( visitor );
 +
 +        ResultIterator itr = visitor.getResults();
 +
 +        List<ScanColumn> entityIds = new ArrayList<ScanColumn>( Math.min( size, Query.MAX_LIMIT ) );
 +
 +        CursorCache resultsCursor = new CursorCache();
 +
 +        while ( entityIds.size() < size && itr.hasNext() ) {
 +            entityIds.addAll( itr.next() );
 +        }
 +
 +        //set our cursor, we paged through more entities than we want to return
 +        if ( entityIds.size() > 0 ) {
 +            int resultSize = Math.min( entityIds.size(), size );
 +            entityIds = entityIds.subList( 0, resultSize );
 +
 +            if ( resultSize == size ) {
 +                itr.finalizeCursor( resultsCursor, entityIds.get( resultSize - 1 ).getUUID() );
 +            }
 +        }
 +        if (logger.isDebugEnabled()) {
 +        	logger.debug("Getting result for query: [{}],  returning entityIds size: {}", getQuery(), entityIds.size());
 +        }
 +
 +        final ResultsLoader loader = loaderFactory.getResultsLoader( em, query, query.getResultsLevel() );
 +        final Results results = loader.getResults( entityIds );
 +
 +        if ( results == null ) {
 +            return null;
 +        }
 +
 +        // now we need to set the cursor from our tree evaluation for return
 +        results.setCursor( resultsCursor.asString() );
 +
 +        results.setQuery( query );
 +        results.setQueryProcessor( this );
 +        results.setSearchVisitor( visitor );
 +
 +        return results;
 +    }
 +
 +
 +    private class TreeEvaluator implements QueryVisitor {
 +
 +        // stack for nodes that will be used to construct the tree and create
 +        // objects
 +        private CountingStack<QueryNode> nodes = new CountingStack<QueryNode>();
 +
 +
 +        private int contextCount = -1;
 +
 +
 +        /** Get the root node in our tree for runtime evaluation */
 +        public QueryNode getRootNode() {
 +            return nodes.peek();
 +        }
 +
 +
 +        /*
 +         * (non-Javadoc)
 +         *
 +         * @see org.apache.usergrid.persistence.query.tree.QueryVisitor#visit(org.apache.usergrid
 +         * .persistence.query.tree.AndOperand)
 +         */
 +        @Override
 +        public void visit( AndOperand op ) throws PersistenceException {
 +
 +            op.getLeft().visit( this );
 +
 +            QueryNode leftResult = nodes.peek();
 +
 +            op.getRight().visit( this );
 +
 +            QueryNode rightResult = nodes.peek();
 +
 +            // if the result of the left and right are the same, we don't want
 +            // to create an AND. We'll use the same SliceNode. Do nothing
 +            if ( leftResult == rightResult ) {
 +                return;
 +            }
 +
 +            // otherwise create a new AND node from the result of the visit
 +
 +            QueryNode right = nodes.pop();
 +            QueryNode left = nodes.pop();
 +
 +            AndNode newNode = new AndNode( left, right );
 +
 +            nodes.push( newNode );
 +        }
 +
 +
 +        /*
 +         * (non-Javadoc)
 +         *
 +         * @see org.apache.usergrid.persistence.query.tree.QueryVisitor#visit(org.apache.usergrid
 +         * .persistence.query.tree.OrOperand)
 +         */
 +        @Override
 +        public void visit( OrOperand op ) throws PersistenceException {
 +
 +            // we need to create a new slicenode for the children of this
 +            // operation
 +
 +            Operand left = op.getLeft();
 +            Operand right = op.getRight();
 +
 +            // we only create a new slice node if our children are && and ||
 +            // operations
 +            createNewSlice( left );
 +
 +            left.visit( this );
 +
 +            // we only create a new slice node if our children are && and ||
 +            // operations
 +            createNewSlice( right );
 +
 +            right.visit( this );
 +
 +            QueryNode rightResult = nodes.pop();
 +            QueryNode leftResult = nodes.pop();
 +
 +            // rewrite with the new Or operand
 +            OrNode orNode = new OrNode( leftResult, rightResult,  ++contextCount );
 +
 +            nodes.push( orNode );
 +        }
 +
 +
 +        /*
 +         * (non-Javadoc)
 +         *
 +         * @see org.apache.usergrid.persistence.query.tree.QueryVisitor#visit(org.apache.usergrid
 +         * .persistence.query.tree.NotOperand)
 +         */
 +        @Override
 +        public void visit( NotOperand op ) throws PersistenceException {
 +
 +            // create a new context since any child of NOT will need to be
 +            // evaluated independently
 +            Operand child = op.getOperation();
 +            createNewSlice( child );
 +            child.visit( this );
 +
 +            nodes.push( new NotNode( nodes.pop(), new AllNode( ++contextCount, false ) ) );
 +        }
 +
 +
 +        /*
 +         * (non-Javadoc)
 +         *
 +         * @see org.apache.usergrid.persistence.query.tree.QueryVisitor#visit(org.apache.usergrid
 +         * .persistence.query.tree.ContainsOperand)
 +         */
 +        @Override
 +        public void visit( ContainsOperand op ) throws NoFullTextIndexException {
 +
 +            String propertyName = op.getProperty().getValue();
 +
 +            if ( !SCHEMA.isPropertyFulltextIndexed( entityType, propertyName ) ) {
 +                throw new NoFullTextIndexException( entityType, propertyName );
 +            }
 +
 +            StringLiteral string = op.getString();
 +
 +            String indexName = op.getProperty().getIndexedValue();
 +
 +            SliceNode node = null;
 +
 +            // sdg - if left & right have same field name, we need to create a new
 +            // slice
 +            if ( !nodes.isEmpty() && nodes.peek() instanceof SliceNode
 +                    && ( ( SliceNode ) nodes.peek() ).getSlice( indexName ) != null ) {
 +                node = newSliceNode();
 +            }
 +            else {
 +                node = getUnionNode( op );
 +            }
 +
 +            String fieldName = op.getProperty().getIndexedValue();
 +
 +            node.setStart( fieldName, string.getValue(), true );
 +            node.setFinish( fieldName, string.getEndValue(), true );
 +        }
 +
 +
 +        /*
 +         * (non-Javadoc)
 +         *
 +         * @see org.apache.usergrid.persistence.query.tree.QueryVisitor#visit(org.apache.usergrid
 +         * .persistence.query.tree.WithinOperand)
 +         */
 +        @Override
 +        public void visit( WithinOperand op ) {
 +
 +            // change the property name to coordinates
 +            nodes.push( new WithinNode( op.getProperty().getIndexedName(), op.getDistance().getFloatValue(),
 +                    op.getLattitude().getFloatValue(), op.getLongitude().getFloatValue(), ++contextCount ) );
 +        }
 +
 +
 +        /*
 +         * (non-Javadoc)
 +         *
 +         * @see org.apache.usergrid.persistence.query.tree.QueryVisitor#visit(org.apache.usergrid
 +         * .persistence.query.tree.LessThan)
 +         */
 +        @Override
 +        public void visit( LessThan op ) throws NoIndexException {
 +            String propertyName = op.getProperty().getValue();
 +
 +            checkIndexed( propertyName );
 +
 +            getUnionNode( op ).setFinish( propertyName, op.getLiteral().getValue(), false );
 +        }
 +
 +
 +        /*
 +         * (non-Javadoc)
 +         *
 +         * @see org.apache.usergrid.persistence.query.tree.QueryVisitor#visit(org.apache.usergrid
 +         * .persistence.query.tree.LessThanEqual)
 +         */
 +        @Override
 +        public void visit( LessThanEqual op ) throws NoIndexException {
 +
 +            String propertyName = op.getProperty().getValue();
 +
 +            checkIndexed( propertyName );
 +
 +            getUnionNode( op ).setFinish( propertyName, op.getLiteral().getValue(), true );
 +        }
 +
 +
 +        /*
 +         * (non-Javadoc)
 +         *
 +         * @see org.apache.usergrid.persistence.query.tree.QueryVisitor#visit(org.apache.usergrid
 +         * .persistence.query.tree.Equal)
 +         */
 +        @Override
 +        public void visit( Equal op ) throws NoIndexException {
 +            String fieldName = op.getProperty().getValue();
 +
 +            checkIndexed( fieldName );
 +
 +            Literal<?> literal = op.getLiteral();
 +            SliceNode node = getUnionNode( op );
 +
 +            // this is an edge case. If we get more edge cases, we need to push
 +            // this down into the literals and let the objects
 +            // handle this
 +            if ( literal instanceof StringLiteral ) {
 +
 +                StringLiteral stringLiteral = ( StringLiteral ) literal;
 +
 +                String endValue = stringLiteral.getEndValue();
 +
 +                if ( endValue != null ) {
 +                    node.setFinish( fieldName, endValue, true );
 +                }
 +            }
 +            else {
 +                node.setFinish( fieldName, literal.getValue(), true );
 +            }
 +
 +            node.setStart( fieldName, literal.getValue(), true );
 +        }
 +
 +
 +        /*
 +         * (non-Javadoc)
 +         *
 +         * @see org.apache.usergrid.persistence.query.tree.QueryVisitor#visit(org.apache.usergrid
 +         * .persistence.query.tree.GreaterThan)
 +         */
 +        @Override
 +        public void visit( GreaterThan op ) throws NoIndexException {
 +            String propertyName = op.getProperty().getValue();
 +
 +            checkIndexed( propertyName );
 +
 +            getUnionNode( op ).setStart( propertyName, op.getLiteral().getValue(), false );
 +        }
 +
 +
 +        /*
 +         * (non-Javadoc)
 +         *
 +         * @see org.apache.usergrid.persistence.query.tree.QueryVisitor#visit(org.apache.usergrid
 +         * .persistence.query.tree.GreaterThanEqual)
 +         */
 +        @Override
 +        public void visit( GreaterThanEqual op ) throws NoIndexException {
 +            String propertyName = op.getProperty().getValue();
 +
 +            checkIndexed( propertyName );
 +
 +            getUnionNode( op ).setStart( propertyName, op.getLiteral().getValue(), true );
 +        }
 +
 +
 +        /**
 +         * Return the current leaf node to add to if it exists. This means that we can compress multiple 'AND'
 +         * operations and ranges into a single node. Otherwise a new node is created and pushed to the stack
 +         *
 +         * @param current The current operand node
 +         */
 +        private SliceNode getUnionNode( EqualityOperand current ) {
 +
 +            /**
 +             * we only create a new slice node in 3 situations 1. No nodes exist 2.
 +             * The parent node is not an AND node. Meaning we can't add this slice to
 +             * the current set of slices 3. Our current top of stack is not a slice
 +             * node.
 +             */
 +            // no nodes exist
 +            if ( nodes.size() == 0 || !( nodes.peek() instanceof SliceNode ) ) {
 +                return newSliceNode();
 +            }
 +
 +            return ( SliceNode ) nodes.peek();
 +        }
 +
 +
 +        /** The new slice node */
 +        private SliceNode newSliceNode() {
 +            SliceNode sliceNode = new SliceNode( ++contextCount );
 +
 +            nodes.push( sliceNode );
 +
 +            return sliceNode;
 +        }
 +
 +
 +        /** Create a new slice if one will be required within the context of this node */
 +        private void createNewSlice( Operand child ) {
 +            if ( child instanceof EqualityOperand || child instanceof AndOperand || child instanceof ContainsOperand ) {
 +                newSliceNode();
 +            }
 +        }
 +
 +
 +        public int getSliceCount() {
 +            return nodes.getSliceCount();
 +        }
 +    }
 +
 +
 +    private static class CountingStack<T> extends Stack<T> {
 +
 +        private int count = 0;
 +
 +        /**
 +         *
 +         */
 +        private static final long serialVersionUID = 1L;
 +
 +
 +        /* (non-Javadoc)
 +         * @see java.util.Stack#pop()
 +         */
 +        @Override
 +        public synchronized T pop() {
 +            T entry = super.pop();
 +
 +            if ( entry instanceof SliceNode ) {
 +                count += ( ( SliceNode ) entry ).getAllSlices().size();
 +            }
 +
 +            return entry;
 +        }
 +
 +
 +        public int getSliceCount() {
 +
 +            Iterator<T> itr = this.iterator();
 +
 +            T entry;
 +
 +            while ( itr.hasNext() ) {
 +                entry = itr.next();
 +
 +                if ( entry instanceof SliceNode ) {
 +                    count += ( ( SliceNode ) entry ).getAllSlices().size();
 +                }
 +            }
 +
 +            return count;
 +        }
 +    }
 +
 +
 +    /** @return the pageSizeHint */
 +    public int getPageSizeHint( QueryNode node ) {
 +        /*****
 +         * DO NOT REMOVE THIS PIECE OF CODE!!!!!!!!!!!
 +         * It is crucial that the root iterator only needs the result set size per page
 +         * otherwise our cursor logic will fail when passing cursor data to the leaf nodes
 +         *******/
 +        if(node == rootNode){
 +            return size;
 +        }
 +
 +        return pageSizeHint;
 +    }
 +
 +
 +    /** Generate a slice node with scan ranges for all the properties in our sort cache */
 +    private OrderByNode generateSorts( int opCount ) throws NoIndexException {
 +
 +        // the value is irrelevant since we'll only ever have 1 slice node
 +        // if this is called
 +        SliceNode slice = new SliceNode( opCount );
 +
 +        SortPredicate first = sorts.get( 0 );
 +
 +        String propertyName = first.getPropertyName();
 +
 +        checkIndexed( propertyName );
 +
 +        slice.setStart( propertyName, null, true );
 +        slice.setFinish( propertyName, null, true );
 +
 +
 +        for ( int i = 1; i < sorts.size(); i++ ) {
 +            checkIndexed( sorts.get( i ).getPropertyName() );
 +        }
 +
 +
 +        return new OrderByNode( slice, sorts.subList( 1, sorts.size() ), rootNode );
 +    }
 +
 +
 +    private void checkIndexed( String propertyName ) throws NoIndexException {
 +
 +        if ( propertyName == null || propertyName.isEmpty() || ( !SCHEMA.isPropertyIndexed( entityType, propertyName )
 +                && collectionInfo != null ) ) {
 +            throw new NoIndexException( entityType, propertyName );
 +        }
 +    }
 +
 +
 +    public EntityManager getEntityManager() {
 +        return em;
 +    }
 +}