You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/01/30 16:21:04 UTC

[28/34] update to master

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9fec2baa/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
index b110178,0000000..8c19989
mode 100644,000000..100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
@@@ -1,2337 -1,0 +1,2327 @@@
 +/*******************************************************************************
 + * Copyright 2012 Apigee Corporation
 + *
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + ******************************************************************************/
 +package org.apache.usergrid.persistence.cassandra;
 +
 +
 +import java.nio.ByteBuffer;
 +import java.util.AbstractMap;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.Iterator;
 +import java.util.LinkedHashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +import java.util.TreeMap;
 +import java.util.TreeSet;
 +import java.util.UUID;
 +
 +import org.apache.usergrid.persistence.CollectionRef;
 +import org.apache.usergrid.persistence.ConnectedEntityRef;
 +import org.apache.usergrid.persistence.ConnectionRef;
 +import org.apache.usergrid.persistence.Entity;
 +import org.apache.usergrid.persistence.EntityRef;
 +import org.apache.usergrid.persistence.IndexBucketLocator;
 +import org.apache.usergrid.persistence.PagingResultsIterator;
 +import org.apache.usergrid.persistence.Query;
 +import org.apache.usergrid.persistence.RelationManager;
 +import org.apache.usergrid.persistence.Results;
 +import org.apache.usergrid.persistence.RoleRef;
 +import org.apache.usergrid.persistence.Schema;
 +import org.apache.usergrid.persistence.SimpleCollectionRef;
 +import org.apache.usergrid.persistence.SimpleEntityRef;
 +import org.apache.usergrid.persistence.SimpleRoleRef;
 +import org.apache.usergrid.persistence.IndexBucketLocator.IndexType;
 +import org.apache.usergrid.persistence.Results.Level;
 +import org.apache.usergrid.persistence.cassandra.IndexUpdate.IndexEntry;
 +import org.apache.usergrid.persistence.cassandra.index.ConnectedIndexScanner;
 +import org.apache.usergrid.persistence.cassandra.index.IndexBucketScanner;
 +import org.apache.usergrid.persistence.cassandra.index.IndexScanner;
 +import org.apache.usergrid.persistence.cassandra.index.NoOpIndexScanner;
 +import org.apache.usergrid.persistence.entities.Group;
 +import org.apache.usergrid.persistence.geo.CollectionGeoSearch;
 +import org.apache.usergrid.persistence.geo.ConnectionGeoSearch;
 +import org.apache.usergrid.persistence.geo.EntityLocationRef;
 +import org.apache.usergrid.persistence.geo.model.Point;
 +import org.apache.usergrid.persistence.query.ir.AllNode;
 +import org.apache.usergrid.persistence.query.ir.NameIdentifierNode;
 +import org.apache.usergrid.persistence.query.ir.QueryNode;
 +import org.apache.usergrid.persistence.query.ir.QuerySlice;
 +import org.apache.usergrid.persistence.query.ir.SearchVisitor;
 +import org.apache.usergrid.persistence.query.ir.WithinNode;
 +import org.apache.usergrid.persistence.query.ir.result.CollectionResultsLoaderFactory;
 +import org.apache.usergrid.persistence.query.ir.result.ConnectionIndexSliceParser;
 +import org.apache.usergrid.persistence.query.ir.result.ConnectionResultsLoaderFactory;
 +import org.apache.usergrid.persistence.query.ir.result.ConnectionTypesIterator;
 +import org.apache.usergrid.persistence.query.ir.result.EmptyIterator;
 +import org.apache.usergrid.persistence.query.ir.result.GeoIterator;
 +import org.apache.usergrid.persistence.query.ir.result.SliceIterator;
 +import org.apache.usergrid.persistence.query.ir.result.StaticIdIterator;
 +import org.apache.usergrid.persistence.query.ir.result.UUIDIndexSliceParser;
 +import org.apache.usergrid.persistence.schema.CollectionInfo;
 +import org.apache.usergrid.utils.IndexUtils;
 +import org.apache.usergrid.utils.MapUtils;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +import org.springframework.util.Assert;
 +
 +import com.yammer.metrics.annotation.Metered;
 +
 +import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
 +import me.prettyprint.cassandra.serializers.StringSerializer;
 +import me.prettyprint.cassandra.serializers.UUIDSerializer;
 +import me.prettyprint.hector.api.Keyspace;
 +import me.prettyprint.hector.api.beans.DynamicComposite;
 +import me.prettyprint.hector.api.beans.HColumn;
 +import me.prettyprint.hector.api.mutation.Mutator;
 +import static java.lang.String.CASE_INSENSITIVE_ORDER;
 +import static java.util.Arrays.asList;
 +import static me.prettyprint.hector.api.factory.HFactory.createMutator;
 +import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES;
 +import static org.apache.usergrid.persistence.Schema.DICTIONARY_COLLECTIONS;
 +import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_ENTITIES;
 +import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_TYPES;
 +import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTING_ENTITIES;
 +import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTING_TYPES;
 +import static org.apache.usergrid.persistence.Schema.INDEX_CONNECTIONS;
 +import static org.apache.usergrid.persistence.Schema.PROPERTY_COLLECTION_NAME;
 +import static org.apache.usergrid.persistence.Schema.PROPERTY_INACTIVITY;
 +import static org.apache.usergrid.persistence.Schema.PROPERTY_ITEM;
 +import static org.apache.usergrid.persistence.Schema.PROPERTY_ITEM_TYPE;
 +import static org.apache.usergrid.persistence.Schema.PROPERTY_NAME;
 +import static org.apache.usergrid.persistence.Schema.PROPERTY_TITLE;
 +import static org.apache.usergrid.persistence.Schema.PROPERTY_TYPE;
 +import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
 +import static org.apache.usergrid.persistence.Schema.TYPE_ENTITY;
 +import static org.apache.usergrid.persistence.Schema.TYPE_MEMBER;
 +import static org.apache.usergrid.persistence.Schema.TYPE_ROLE;
 +import static org.apache.usergrid.persistence.Schema.defaultCollectionName;
 +import static org.apache.usergrid.persistence.Schema.getDefaultSchema;
 +import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_COMPOSITE_DICTIONARIES;
 +import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_DICTIONARIES;
 +import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_ID_SETS;
 +import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_INDEX;
 +import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_INDEX_ENTRIES;
 +import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.addDeleteToMutator;
 +import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.addInsertToMutator;
 +import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.batchExecute;
 +import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.key;
 +import static org.apache.usergrid.persistence.cassandra.CassandraService.INDEX_ENTRY_LIST_COUNT;
 +import static org.apache.usergrid.persistence.cassandra.ConnectionRefImpl.CONNECTION_ENTITY_CONNECTION_TYPE;
 +import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchDeleteLocationInConnectionsIndex;
 +import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchRemoveLocationFromCollectionIndex;
 +import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchStoreLocationInCollectionIndex;
 +import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchStoreLocationInConnectionsIndex;
 +import static org.apache.usergrid.persistence.cassandra.IndexUpdate.indexValueCode;
 +import static org.apache.usergrid.persistence.cassandra.IndexUpdate.toIndexableValue;
 +import static org.apache.usergrid.persistence.cassandra.IndexUpdate.validIndexableValue;
 +import static org.apache.usergrid.utils.ClassUtils.cast;
 +import static org.apache.usergrid.utils.CompositeUtils.setGreaterThanEqualityFlag;
 +import static org.apache.usergrid.utils.ConversionUtils.string;
 +import static org.apache.usergrid.utils.InflectionUtils.singularize;
 +import static org.apache.usergrid.utils.MapUtils.addMapSet;
 +import static org.apache.usergrid.utils.UUIDUtils.getTimestampInMicros;
 +import static org.apache.usergrid.utils.UUIDUtils.newTimeUUID;
 +
 +
 +public class RelationManagerImpl implements RelationManager {
 +
 +    private static final Logger logger = LoggerFactory.getLogger( RelationManagerImpl.class );
 +
 +    private EntityManagerImpl em;
 +    private CassandraService cass;
 +    private UUID applicationId;
 +    private EntityRef headEntity;
 +    private IndexBucketLocator indexBucketLocator;
 +
 +    public static final StringSerializer se = new StringSerializer();
 +    public static final ByteBufferSerializer be = new ByteBufferSerializer();
 +    public static final UUIDSerializer ue = new UUIDSerializer();
 +
 +
 +    public RelationManagerImpl() {
 +    }
 +
 +
 +    public RelationManagerImpl init( EntityManagerImpl em, CassandraService cass, UUID applicationId,
 +                                     EntityRef headEntity, IndexBucketLocator indexBucketLocator ) {
 +
 +        Assert.notNull( em, "Entity manager cannot be null" );
 +        Assert.notNull( cass, "Cassandra service cannot be null" );
 +        Assert.notNull( applicationId, "Application Id cannot be null" );
 +        Assert.notNull( headEntity, "Head entity cannot be null" );
 +        Assert.notNull( headEntity.getUuid(), "Head entity uuid cannot be null" );
 +        Assert.notNull( indexBucketLocator, "Index bucket locator cannot be null" );
 +
 +        this.em = em;
 +        this.applicationId = applicationId;
 +        this.cass = cass;
 +        this.headEntity = headEntity;
 +        this.indexBucketLocator = indexBucketLocator;
 +
 +        return this;
 +    }
 +
 +
 +    private RelationManagerImpl getRelationManager( EntityRef headEntity ) {
 +        RelationManagerImpl rmi = new RelationManagerImpl();
 +        rmi.init( em, cass, applicationId, headEntity, indexBucketLocator );
 +        return rmi;
 +    }
 +
 +
 +    /** side effect: converts headEntity into an Entity if it is an EntityRef! */
 +    private Entity getHeadEntity() throws Exception {
 +        Entity entity = null;
 +        if ( headEntity instanceof Entity ) {
 +            entity = ( Entity ) headEntity;
 +        }
 +        else {
 +            entity = em.get( headEntity );
 +            headEntity = entity;
 +        }
 +        return entity;
 +    }
 +
 +
 +    /**
 +     * Batch update collection index.
 +     *
 +     * @param indexUpdate The update to apply
 +     * @param owner The entity that is the owner context of this entity update.  Can either be an application, or
 +     * another entity
 +     * @param collectionName the collection name
 +     *
 +     * @return The indexUpdate with batch mutations
 +     *
 +     * @throws Exception the exception
 +     */
 +    @Metered(group = "core", name = "RelationManager_batchUpdateCollectionIndex")
 +    public IndexUpdate batchUpdateCollectionIndex( IndexUpdate indexUpdate, EntityRef owner, String collectionName )
 +            throws Exception {
 +
 +        logger.debug( "batchUpdateCollectionIndex" );
 +
 +        Entity indexedEntity = indexUpdate.getEntity();
 +
 +        String bucketId = indexBucketLocator
 +                .getBucket( applicationId, IndexType.COLLECTION, indexedEntity.getUuid(), indexedEntity.getType(),
 +                        indexUpdate.getEntryName() );
 +
 +        // the root name without the bucket
 +        // entity_id,collection_name,prop_name,
 +        Object index_name = null;
 +        // entity_id,collection_name,prop_name, bucketId
 +        Object index_key = null;
 +
 +        // entity_id,collection_name,collected_entity_id,prop_name
 +
 +        for ( IndexEntry entry : indexUpdate.getPrevEntries() ) {
 +
 +            if ( entry.getValue() != null ) {
 +
 +                index_name = key( owner.getUuid(), collectionName, entry.getPath() );
 +
 +                index_key = key( index_name, bucketId );
 +
 +                addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, index_key, entry.getIndexComposite(),
 +                        indexUpdate.getTimestamp() );
 +
 +                if ( "location.coordinates".equals( entry.getPath() ) ) {
 +                    EntityLocationRef loc = new EntityLocationRef( indexUpdate.getEntity(), entry.getTimestampUuid(),
 +                            entry.getValue().toString() );
 +                    batchRemoveLocationFromCollectionIndex( indexUpdate.getBatch(), indexBucketLocator, applicationId,
 +                            index_name, loc );
 +                }
 +            }
 +            else {
 +                logger.error( "Unexpected condition - deserialized property value is null" );
 +            }
 +        }
 +
 +        if ( ( indexUpdate.getNewEntries().size() > 0 ) && ( !indexUpdate.isMultiValue() || ( indexUpdate.isMultiValue()
 +                && !indexUpdate.isRemoveListEntry() ) ) ) {
 +
 +            for ( IndexEntry indexEntry : indexUpdate.getNewEntries() ) {
 +
 +                // byte valueCode = indexEntry.getValueCode();
 +
 +                index_name = key( owner.getUuid(), collectionName, indexEntry.getPath() );
 +
 +                index_key = key( index_name, bucketId );
 +
 +                // int i = 0;
 +
 +                addInsertToMutator( indexUpdate.getBatch(), ENTITY_INDEX, index_key, indexEntry.getIndexComposite(),
 +                        null, indexUpdate.getTimestamp() );
 +
 +                if ( "location.coordinates".equals( indexEntry.getPath() ) ) {
 +                    EntityLocationRef loc =
 +                            new EntityLocationRef( indexUpdate.getEntity(), indexEntry.getTimestampUuid(),
 +                                    indexEntry.getValue().toString() );
 +                    batchStoreLocationInCollectionIndex( indexUpdate.getBatch(), indexBucketLocator, applicationId,
 +                            index_name, indexedEntity.getUuid(), loc );
 +                }
 +
 +                // i++;
 +            }
 +        }
 +
 +        for ( String index : indexUpdate.getIndexesSet() ) {
 +            addInsertToMutator( indexUpdate.getBatch(), ENTITY_DICTIONARIES,
 +                    key( owner.getUuid(), collectionName, Schema.DICTIONARY_INDEXES ), index, null,
 +                    indexUpdate.getTimestamp() );
 +        }
 +
 +        return indexUpdate;
 +    }
 +
 +
 +    @Override
 +    @Metered(group = "core", name = "RelationManager_getCollectionIndexes")
 +    public Set<String> getCollectionIndexes( String collectionName ) throws Exception {
 +
 +        // TODO TN, read all buckets here
 +        List<HColumn<String, String>> results =
 +                cass.getAllColumns( cass.getApplicationKeyspace( applicationId ), ENTITY_DICTIONARIES,
 +                        key( headEntity.getUuid(), collectionName, Schema.DICTIONARY_INDEXES ), se, se );
 +        Set<String> indexes = new TreeSet<String>();
 +        if ( results != null ) {
 +            for ( HColumn<String, String> column : results ) {
 +                String propertyName = column.getName();
 +                if ( !propertyName.endsWith( ".keywords" ) ) {
 +                    indexes.add( column.getName() );
 +                }
 +            }
 +        }
 +        return indexes;
 +    }
 +
 +
 +    public Map<EntityRef, Set<String>> getContainingCollections() throws Exception {
 +        Map<EntityRef, Set<String>> results = new LinkedHashMap<EntityRef, Set<String>>();
 +
 +        Keyspace ko = cass.getApplicationKeyspace( applicationId );
 +
 +        // TODO TN get all buckets here
 +
 +        List<HColumn<DynamicComposite, ByteBuffer>> containers = cass.getAllColumns( ko, ENTITY_COMPOSITE_DICTIONARIES,
 +                key( headEntity.getUuid(), Schema.DICTIONARY_CONTAINER_ENTITIES ), EntityManagerFactoryImpl.dce, be );
 +        if ( containers != null ) {
 +            for ( HColumn<DynamicComposite, ByteBuffer> container : containers ) {
 +                DynamicComposite composite = container.getName();
 +                if ( composite != null ) {
 +                    String ownerType = ( String ) composite.get( 0 );
 +                    String collectionName = ( String ) composite.get( 1 );
 +                    UUID ownerId = ( UUID ) composite.get( 2 );
 +                    addMapSet( results, new SimpleEntityRef( ownerType, ownerId ), collectionName );
 +                    if ( logger.isDebugEnabled() ) {
 +                        logger.debug( " {} ( {} ) is in collection {} ( {} ).", new Object[] {
 +                                headEntity.getType(), headEntity.getUuid(), ownerType, collectionName, ownerId
 +                        } );
 +                    }
 +                }
 +            }
 +        }
 +        EntityRef applicationRef = new SimpleEntityRef( TYPE_APPLICATION, applicationId );
 +        if ( !results.containsKey( applicationRef ) ) {
 +            addMapSet( results, applicationRef, defaultCollectionName( headEntity.getType() ) );
 +        }
 +        return results;
 +    }
 +
 +
 +    @SuppressWarnings("unchecked")
 +    public void batchCreateCollectionMembership( Mutator<ByteBuffer> batch, EntityRef ownerRef, String collectionName,
 +                                                 EntityRef itemRef, EntityRef membershipRef, UUID timestampUuid )
 +            throws Exception {
 +
 +        long timestamp = getTimestampInMicros( timestampUuid );
 +
 +        if ( membershipRef == null ) {
 +            membershipRef = new SimpleCollectionRef( ownerRef, collectionName, itemRef );
 +        }
 +
 +        Map<String, Object> properties = new TreeMap<String, Object>( CASE_INSENSITIVE_ORDER );
 +        properties.put( PROPERTY_TYPE, membershipRef.getType() );
 +        properties.put( PROPERTY_COLLECTION_NAME, collectionName );
 +        properties.put( PROPERTY_ITEM, itemRef.getUuid() );
 +        properties.put( PROPERTY_ITEM_TYPE, itemRef.getType() );
 +
 +        em.batchCreate( batch, membershipRef.getType(), null, properties, membershipRef.getUuid(), timestampUuid );
 +
 +        addInsertToMutator( batch, ENTITY_COMPOSITE_DICTIONARIES,
 +                key( membershipRef.getUuid(), Schema.DICTIONARY_CONTAINER_ENTITIES ),
 +                asList( ownerRef.getType(), collectionName, ownerRef.getUuid() ), membershipRef.getUuid(), timestamp );
 +    }
 +
 +
 +    /**
 +     * Batch add to collection.
 +     *
 +     * @param batch the batch
 +     * @param collectionName the collection name
 +     * @param entity The entity to add to the batch
 +     * @param timestampUuid The timestamp of this update in a time uuid
 +     *
 +     * @return batch
 +     *
 +     * @throws Exception the exception
 +     */
 +    public Mutator<ByteBuffer> batchAddToCollection( Mutator<ByteBuffer> batch, String collectionName, Entity entity,
 +                                                     UUID timestampUuid ) throws Exception {
 +        List<UUID> ids = new ArrayList<UUID>( 1 );
 +        ids.add( headEntity.getUuid() );
 +        return batchAddToCollections( batch, headEntity.getType(), ids, collectionName, entity, timestampUuid );
 +    }
 +
 +
 +    @SuppressWarnings("unchecked")
 +    @Metered(group = "core", name = "RelationManager_batchAddToCollections")
 +    public Mutator<ByteBuffer> batchAddToCollections( Mutator<ByteBuffer> batch, String ownerType, List<UUID> ownerIds,
 +                                                      String collectionName, Entity entity, UUID timestampUuid )
 +            throws Exception {
 +
 +        long timestamp = getTimestampInMicros( timestampUuid );
 +
 +        if ( Schema.isAssociatedEntityType( entity.getType() ) ) {
 +            logger.error( "Cant add an extended type to any collection", new Throwable() );
 +            return batch;
 +        }
 +
 +        Map<UUID, CollectionRef> membershipRefs = new LinkedHashMap<UUID, CollectionRef>();
 +
 +        for ( UUID ownerId : ownerIds ) {
 +
 +            CollectionRef membershipRef =
 +                    new SimpleCollectionRef( new SimpleEntityRef( ownerType, ownerId ), collectionName, entity );
 +
 +            membershipRefs.put( ownerId, membershipRef );
 +
 +            // get the bucket this entityId needs to be inserted into
 +            String bucketId = indexBucketLocator
 +                    .getBucket( applicationId, IndexType.COLLECTION, entity.getUuid(), collectionName );
 +
 +            Object collections_key = key( ownerId, Schema.DICTIONARY_COLLECTIONS, collectionName, bucketId );
 +
 +            // Insert in main collection
 +
 +            addInsertToMutator( batch, ENTITY_ID_SETS, collections_key, entity.getUuid(), membershipRef.getUuid(),
 +                    timestamp );
 +
 +            addInsertToMutator( batch, ENTITY_COMPOSITE_DICTIONARIES,
 +                    key( entity.getUuid(), Schema.DICTIONARY_CONTAINER_ENTITIES ),
 +                    asList( ownerType, collectionName, ownerId ), membershipRef.getUuid(), timestamp );
 +        }
 +
 +
 +        Schema schema = getDefaultSchema();
 +
 +        // Add property indexes
 +        for ( String propertyName : entity.getProperties().keySet() ) {
 +            boolean indexed_property = schema.isPropertyIndexed( entity.getType(), propertyName );
 +            if ( indexed_property ) {
 +                boolean collection_indexes_property =
 +                        schema.isPropertyIndexedInCollection( ownerType, collectionName, propertyName );
 +                boolean item_schema_has_property = schema.hasProperty( entity.getType(), propertyName );
 +                boolean fulltext_indexed = schema.isPropertyFulltextIndexed( entity.getType(), propertyName );
 +                if ( collection_indexes_property || !item_schema_has_property ) {
 +                    Object propertyValue = entity.getProperty( propertyName );
 +                    IndexUpdate indexUpdate =
 +                            batchStartIndexUpdate( batch, entity, propertyName, propertyValue, timestampUuid,
 +                                    item_schema_has_property, false, false, fulltext_indexed, true );
 +                    for ( UUID ownerId : ownerIds ) {
 +                        EntityRef owner = new SimpleEntityRef( ownerType, ownerId );
 +                        batchUpdateCollectionIndex( indexUpdate, owner, collectionName );
 +                    }
 +                }
 +            }
 +        }
 +
 +        // Add set property indexes
 +
 +        Set<String> dictionaryNames = em.getDictionaryNames( entity );
 +
 +        for ( String dictionaryName : dictionaryNames ) {
 +            boolean has_dictionary = schema.hasDictionary( entity.getType(), dictionaryName );
 +            boolean dictionary_indexed =
 +                    schema.isDictionaryIndexedInCollection( ownerType, collectionName, dictionaryName );
 +
 +            if ( dictionary_indexed || !has_dictionary ) {
 +                Set<Object> elementValues = em.getDictionaryAsSet( entity, dictionaryName );
 +                for ( Object elementValue : elementValues ) {
 +                    IndexUpdate indexUpdate =
 +                            batchStartIndexUpdate( batch, entity, dictionaryName, elementValue, timestampUuid,
 +                                    has_dictionary, true, false, false, true );
 +                    for ( UUID ownerId : ownerIds ) {
 +                        EntityRef owner = new SimpleEntityRef( ownerType, ownerId );
 +                        batchUpdateCollectionIndex( indexUpdate, owner, collectionName );
 +                    }
 +                }
 +            }
 +        }
 +
 +        for ( UUID ownerId : ownerIds ) {
 +            EntityRef owner = new SimpleEntityRef( ownerType, ownerId );
 +            batchCreateCollectionMembership( batch, owner, collectionName, entity, membershipRefs.get( ownerId ),
 +                    timestampUuid );
 +        }
 +
 +        return batch;
 +    }
 +
 +
 +    /**
 +     * Batch remove from collection.
 +     * <p/>
 +     * * Batch add to collection.
 +     *
 +     * @param batch the batch
 +     * @param collectionName the collection name
 +     * @param entity The entity to add to the batch
 +     * @param timestampUuid The timestamp of this update in a time uuid
 +     *
 +     * @return The mutation with the delete operations added
 +     *
 +     * @throws Exception the exception
 +     */
 +    public Mutator<ByteBuffer> batchRemoveFromCollection( Mutator<ByteBuffer> batch, String collectionName,
 +                                                          Entity entity, UUID timestampUuid ) throws Exception {
 +        return this.batchRemoveFromCollection( batch, collectionName, entity, false, timestampUuid );
 +    }
 +
 +
 +    @SuppressWarnings("unchecked")
 +    @Metered(group = "core", name = "RelationManager_batchRemoveFromCollection")
 +    public Mutator<ByteBuffer> batchRemoveFromCollection( Mutator<ByteBuffer> batch, String collectionName,
 +                                                          Entity entity, boolean force, UUID timestampUuid )
 +            throws Exception {
 +
 +        long timestamp = getTimestampInMicros( timestampUuid );
 +
 +        if ( !force && headEntity.getUuid().equals( applicationId ) ) {
 +            // Can't remove entities from root collections
 +            return batch;
 +        }
 +
 +        Object collections_key = key( headEntity.getUuid(), Schema.DICTIONARY_COLLECTIONS, collectionName,
 +                indexBucketLocator.getBucket( applicationId, IndexType.COLLECTION, entity.getUuid(), collectionName ) );
 +
 +        // Remove property indexes
 +
 +        Schema schema = getDefaultSchema();
 +        for ( String propertyName : entity.getProperties().keySet() ) {
 +            boolean collection_indexes_property =
 +                    schema.isPropertyIndexedInCollection( headEntity.getType(), collectionName, propertyName );
 +            boolean item_schema_has_property = schema.hasProperty( entity.getType(), propertyName );
 +            boolean fulltext_indexed = schema.isPropertyFulltextIndexed( entity.getType(), propertyName );
 +            if ( collection_indexes_property || !item_schema_has_property ) {
 +                IndexUpdate indexUpdate = batchStartIndexUpdate( batch, entity, propertyName, null, timestampUuid,
 +                        item_schema_has_property, false, false, fulltext_indexed );
 +                batchUpdateCollectionIndex( indexUpdate, headEntity, collectionName );
 +            }
 +        }
 +
 +        // Remove set indexes
 +
 +        Set<String> dictionaryNames = em.getDictionaryNames( entity );
 +
 +        for ( String dictionaryName : dictionaryNames ) {
 +            boolean has_dictionary = schema.hasDictionary( entity.getType(), dictionaryName );
 +            boolean dictionary_indexed =
 +                    schema.isDictionaryIndexedInCollection( headEntity.getType(), collectionName, dictionaryName );
 +
 +            if ( dictionary_indexed || !has_dictionary ) {
 +                Set<Object> elementValues = em.getDictionaryAsSet( entity, dictionaryName );
 +                for ( Object elementValue : elementValues ) {
 +                    IndexUpdate indexUpdate =
 +                            batchStartIndexUpdate( batch, entity, dictionaryName, elementValue, timestampUuid,
 +                                    has_dictionary, true, true, false );
 +                    batchUpdateCollectionIndex( indexUpdate, headEntity, collectionName );
 +                }
 +            }
 +        }
 +
 +        // Delete actual property
 +
 +        addDeleteToMutator( batch, ENTITY_ID_SETS, collections_key, entity.getUuid(), timestamp );
 +
 +        addDeleteToMutator( batch, ENTITY_COMPOSITE_DICTIONARIES,
 +                key( entity.getUuid(), Schema.DICTIONARY_CONTAINER_ENTITIES ),
 +                asList( headEntity.getType(), collectionName, headEntity.getUuid() ), timestamp );
 +
 +        if ( !headEntity.getType().equalsIgnoreCase( TYPE_APPLICATION ) && !Schema
 +                .isAssociatedEntityType( entity.getType() ) ) {
 +            em.deleteEntity( new SimpleCollectionRef( headEntity, collectionName, entity ).getUuid() );
 +        }
 +
 +        return batch;
 +    }
 +
 +
 +    @Metered(group = "core", name = "RelationManager_batchDeleteConnectionIndexEntries")
 +    public Mutator<ByteBuffer> batchDeleteConnectionIndexEntries( IndexUpdate indexUpdate, IndexEntry entry,
 +                                                                  ConnectionRefImpl connection, UUID[] index_keys )
 +            throws Exception {
 +
 +        // entity_id,prop_name
 +        Object property_index_key = key( index_keys[ConnectionRefImpl.ALL], INDEX_CONNECTIONS, entry.getPath(),
 +                indexBucketLocator.getBucket( applicationId, IndexType.CONNECTION, index_keys[ConnectionRefImpl.ALL],
 +                        entry.getPath() ) );
 +
 +        // entity_id,entity_type,prop_name
 +        Object entity_type_prop_index_key =
 +                key( index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(),
 +                        indexBucketLocator.getBucket( applicationId, IndexType.CONNECTION,
 +                                index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], entry.getPath() ) );
 +
 +        // entity_id,connection_type,prop_name
 +        Object connection_type_prop_index_key =
 +                key( index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], INDEX_CONNECTIONS, entry.getPath(),
 +                        indexBucketLocator.getBucket( applicationId, IndexType.CONNECTION,
 +                                index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], entry.getPath() ) );
 +
 +        // entity_id,connection_type,entity_type,prop_name
 +        Object connection_type_and_entity_type_prop_index_key =
 +                key( index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(),
 +                        indexBucketLocator.getBucket( applicationId, IndexType.CONNECTION,
 +                                index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], entry.getPath() ) );
 +
 +        // composite(property_value,connected_entity_id,connection_type,entity_type,entry_timestamp)
 +        addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, property_index_key,
 +                entry.getIndexComposite( connection.getConnectedEntityId(), connection.getConnectionType(),
 +                        connection.getConnectedEntityType() ), indexUpdate.getTimestamp() );
 +
 +        // composite(property_value,connected_entity_id,connection_type,entry_timestamp)
 +        addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, entity_type_prop_index_key,
 +                entry.getIndexComposite( connection.getConnectedEntityId(), connection.getConnectionType() ),
 +                indexUpdate.getTimestamp() );
 +
 +        // composite(property_value,connected_entity_id,entity_type,entry_timestamp)
 +        addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, connection_type_prop_index_key,
 +                entry.getIndexComposite( connection.getConnectedEntityId(), connection.getConnectedEntityType() ),
 +                indexUpdate.getTimestamp() );
 +
 +        // composite(property_value,connected_entity_id,entry_timestamp)
 +        addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, connection_type_and_entity_type_prop_index_key,
 +                entry.getIndexComposite( connection.getConnectedEntityId() ), indexUpdate.getTimestamp() );
 +
 +        return indexUpdate.getBatch();
 +    }
 +
 +
 +    @Metered(group = "core", name = "RelationManager_batchAddConnectionIndexEntries")
 +    public Mutator<ByteBuffer> batchAddConnectionIndexEntries( IndexUpdate indexUpdate, IndexEntry entry,
 +                                                               ConnectionRefImpl connection, UUID[] index_keys ) {
 +
 +        // entity_id,prop_name
 +        Object property_index_key = key( index_keys[ConnectionRefImpl.ALL], INDEX_CONNECTIONS, entry.getPath(),
 +                indexBucketLocator.getBucket( applicationId, IndexType.CONNECTION, index_keys[ConnectionRefImpl.ALL],
 +                        entry.getPath() ) );
 +
 +        // entity_id,entity_type,prop_name
 +        Object entity_type_prop_index_key =
 +                key( index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(),
 +                        indexBucketLocator.getBucket( applicationId, IndexType.CONNECTION,
 +                                index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], entry.getPath() ) );
 +
 +        // entity_id,connection_type,prop_name
 +        Object connection_type_prop_index_key =
 +                key( index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], INDEX_CONNECTIONS, entry.getPath(),
 +                        indexBucketLocator.getBucket( applicationId, IndexType.CONNECTION,
 +                                index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], entry.getPath() ) );
 +
 +        // entity_id,connection_type,entity_type,prop_name
 +        Object connection_type_and_entity_type_prop_index_key =
 +                key( index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(),
 +                        indexBucketLocator.getBucket( applicationId, IndexType.CONNECTION,
 +                                index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], entry.getPath() ) );
 +
 +        // composite(property_value,connected_entity_id,connection_type,entity_type,entry_timestamp)
 +        addInsertToMutator( indexUpdate.getBatch(), ENTITY_INDEX, property_index_key,
 +                entry.getIndexComposite( connection.getConnectedEntityId(), connection.getConnectionType(),
 +                        connection.getConnectedEntityType() ), connection.getUuid(), indexUpdate.getTimestamp() );
 +
 +        // composite(property_value,connected_entity_id,connection_type,entry_timestamp)
 +        addInsertToMutator( indexUpdate.getBatch(), ENTITY_INDEX, entity_type_prop_index_key,
 +                entry.getIndexComposite( connection.getConnectedEntityId(), connection.getConnectionType() ),
 +                connection.getUuid(), indexUpdate.getTimestamp() );
 +
 +        // composite(property_value,connected_entity_id,entity_type,entry_timestamp)
 +        addInsertToMutator( indexUpdate.getBatch(), ENTITY_INDEX, connection_type_prop_index_key,
 +                entry.getIndexComposite( connection.getConnectedEntityId(), connection.getConnectedEntityType() ),
 +                connection.getUuid(), indexUpdate.getTimestamp() );
 +
 +        // composite(property_value,connected_entity_id,entry_timestamp)
 +        addInsertToMutator( indexUpdate.getBatch(), ENTITY_INDEX, connection_type_and_entity_type_prop_index_key,
 +                entry.getIndexComposite( connection.getConnectedEntityId() ), connection.getUuid(),
 +                indexUpdate.getTimestamp() );
 +
 +        return indexUpdate.getBatch();
 +    }
 +
 +
 +    /**
 +     * Batch update connection index.
 +     *
 +     * @param indexUpdate The update operation to perform
 +     * @param connection The connection to update
 +     *
 +     * @return The index with the batch mutation udpated
 +     *
 +     * @throws Exception the exception
 +     */
 +    @Metered(group = "core", name = "RelationManager_batchUpdateConnectionIndex")
 +    public IndexUpdate batchUpdateConnectionIndex( IndexUpdate indexUpdate, ConnectionRefImpl connection )
 +            throws Exception {
 +
 +        // UUID connection_id = connection.getUuid();
 +
 +        UUID[] index_keys = connection.getIndexIds();
 +
 +        // Delete all matching entries from entry list
 +        for ( IndexEntry entry : indexUpdate.getPrevEntries() ) {
 +
 +            if ( entry.getValue() != null ) {
 +
 +                batchDeleteConnectionIndexEntries( indexUpdate, entry, connection, index_keys );
 +
 +                if ( "location.coordinates".equals( entry.getPath() ) ) {
 +                    EntityLocationRef loc = new EntityLocationRef( indexUpdate.getEntity(), entry.getTimestampUuid(),
 +                            entry.getValue().toString() );
 +                    batchDeleteLocationInConnectionsIndex( indexUpdate.getBatch(), indexBucketLocator, applicationId,
 +                            index_keys, entry.getPath(), loc );
 +                }
 +            }
 +            else {
 +                logger.error( "Unexpected condition - deserialized property value is null" );
 +            }
 +        }
 +
 +        if ( ( indexUpdate.getNewEntries().size() > 0 ) && ( !indexUpdate.isMultiValue() || ( indexUpdate.isMultiValue()
 +                && !indexUpdate.isRemoveListEntry() ) ) ) {
 +
 +            for ( IndexEntry indexEntry : indexUpdate.getNewEntries() ) {
 +
 +                batchAddConnectionIndexEntries( indexUpdate, indexEntry, connection, index_keys );
 +
 +                if ( "location.coordinates".equals( indexEntry.getPath() ) ) {
 +                    EntityLocationRef loc =
 +                            new EntityLocationRef( indexUpdate.getEntity(), indexEntry.getTimestampUuid(),
 +                                    indexEntry.getValue().toString() );
 +                    batchStoreLocationInConnectionsIndex( indexUpdate.getBatch(), indexBucketLocator, applicationId,
 +                            index_keys, indexEntry.getPath(), loc );
 +                }
 +            }
 +
 +      /*
 +       * addInsertToMutator(batch, EntityCF.SETS, key(connection_id,
 +       * Schema.INDEXES_SET), indexEntry.getKey(), null, false, timestamp); }
 +       * 
 +       * addInsertToMutator(batch, EntityCF.SETS, key(connection_id,
 +       * Schema.INDEXES_SET), entryName, null, false, timestamp);
 +       */
 +        }
 +
 +        for ( String index : indexUpdate.getIndexesSet() ) {
 +            addInsertToMutator( indexUpdate.getBatch(), ENTITY_DICTIONARIES,
 +                    key( connection.getConnectingIndexId(), Schema.DICTIONARY_INDEXES ), index, null,
 +                    indexUpdate.getTimestamp() );
 +        }
 +
 +        return indexUpdate;
 +    }
 +
 +
 +    public Set<String> getConnectionIndexes( ConnectionRefImpl connection ) throws Exception {
 +        List<HColumn<String, String>> results =
 +                cass.getAllColumns( cass.getApplicationKeyspace( applicationId ), ENTITY_DICTIONARIES,
 +                        key( connection.getConnectingIndexId(), Schema.DICTIONARY_INDEXES ), se, se );
 +        Set<String> indexes = new TreeSet<String>();
 +        if ( results != null ) {
 +            for ( HColumn<String, String> column : results ) {
 +                String propertyName = column.getName();
 +                if ( !propertyName.endsWith( ".keywords" ) ) {
 +                    indexes.add( column.getName() );
 +                }
 +            }
 +        }
 +        return indexes;
 +    }
 +
 +
 +    /**
 +     * Batch update backward connections property indexes.
 +     *
 +     * @param indexUpdate The update to run for incoming connections
 +     *
 +     * @return The index update to run
 +     *
 +     * @throws Exception the exception
 +     */
 +    @Metered(group = "core", name = "RelationManager_batchUpdateBackwardConnectionsPropertyIndexes")
 +    public IndexUpdate batchUpdateBackwardConnectionsPropertyIndexes( IndexUpdate indexUpdate ) throws Exception {
 +
 +        logger.debug( "batchUpdateBackwordConnectionsPropertyIndexes" );
 +
 +        boolean entitySchemaHasProperty = indexUpdate.isSchemaHasProperty();
 +
 +        if ( entitySchemaHasProperty ) {
 +            if ( !getDefaultSchema()
 +                    .isPropertyIndexed( indexUpdate.getEntity().getType(), indexUpdate.getEntryName() ) ) {
 +                return indexUpdate;
 +            }
 +        }
 +
 +
 +        return doBackwardConnectionsUpdate( indexUpdate );
 +    }
 +
 +
 +    /**
 +     * Search each reverse connection type in the graph for connections.  If one is found, update the index
 +     * appropriately
 +     *
 +     * @param indexUpdate The index update to use
 +     *
 +     * @return The updated index update
 +     */
 +    private IndexUpdate doBackwardConnectionsUpdate( IndexUpdate indexUpdate ) throws Exception {
 +        final Entity targetEntity = indexUpdate.getEntity();
 +
 +        final ConnectionTypesIterator connectionTypes =
 +                new ConnectionTypesIterator( cass, applicationId, targetEntity.getUuid(), false, 100 );
 +
 +        for ( String connectionType : connectionTypes ) {
 +
 +            PagingResultsIterator itr = getReversedConnectionsIterator( targetEntity, connectionType );
 +
 +            for ( Object connection : itr ) {
 +
 +                final ConnectedEntityRef sourceEntity = ( ConnectedEntityRef ) connection;
 +
 +                //we need to create a connection ref from the source entity (found via reverse edge) to the entity
 +                // we're about to update.  This is the index that needs updated
 +                final ConnectionRefImpl connectionRef =
 +                        new ConnectionRefImpl( sourceEntity, connectionType, indexUpdate.getEntity() );
 +
 +                batchUpdateConnectionIndex( indexUpdate, connectionRef );
 +            }
 +        }
 +
 +        return indexUpdate;
 +    }
 +
 +
 +    /**
 +     * Get a paging results iterator.  Should return an iterator for all results
 +     *
 +     * @param targetEntity The target entity search connections from
 +     *
 +     * @return connectionType The name of the edges to search
 +     */
 +    private PagingResultsIterator getReversedConnectionsIterator( EntityRef targetEntity, String connectionType )
 +            throws Exception {
 +        return new PagingResultsIterator( getConnectingEntities( targetEntity, connectionType, null, Level.REFS ) );
 +    }
 +
 +
 +    /**
 +     * Batch update backward connections set indexes.
 +     *
 +     * @param indexUpdate The index to update in the dictionary
 +     *
 +     * @return The index update
 +     *
 +     * @throws Exception the exception
 +     */
 +    @Metered(group = "core", name = "RelationManager_batchUpdateBackwardConnectionsDictionaryIndexes")
 +    public IndexUpdate batchUpdateBackwardConnectionsDictionaryIndexes( IndexUpdate indexUpdate ) throws Exception {
 +
 +        logger.debug( "batchUpdateBackwardConnectionsListIndexes" );
 +
 +        boolean entityHasDictionary = getDefaultSchema()
 +                .isDictionaryIndexedInConnections( indexUpdate.getEntity().getType(), indexUpdate.getEntryName() );
 +
 +        if ( !entityHasDictionary ) {
 +            return indexUpdate;
 +        }
 +
 +
 +        return doBackwardConnectionsUpdate( indexUpdate );
 +    }
 +
 +
 +    @SuppressWarnings("unchecked")
 +    @Metered(group = "core", name = "RelationManager_batchUpdateEntityConnection")
 +    public Mutator<ByteBuffer> batchUpdateEntityConnection( Mutator<ByteBuffer> batch, boolean disconnect,
 +                                                            ConnectionRefImpl connection, UUID timestampUuid )
 +            throws Exception {
 +
 +        long timestamp = getTimestampInMicros( timestampUuid );
 +
 +        Entity connectedEntity = em.get( connection.getConnectedEntityId() );
 +
 +        if ( connectedEntity == null ) {
 +            return batch;
 +        }
 +
 +        // Create connection for requested params
 +
 +
 +        if ( disconnect ) {
 +            addDeleteToMutator( batch, ENTITY_COMPOSITE_DICTIONARIES,
 +                    key( connection.getConnectingEntityId(), DICTIONARY_CONNECTED_ENTITIES,
 +                            connection.getConnectionType() ),
 +                    asList( connection.getConnectedEntityId(), connection.getConnectedEntityType() ), timestamp );
 +
 +            addDeleteToMutator( batch, ENTITY_COMPOSITE_DICTIONARIES,
 +                    key( connection.getConnectedEntityId(), DICTIONARY_CONNECTING_ENTITIES,
 +                            connection.getConnectionType() ),
 +                    asList( connection.getConnectingEntityId(), connection.getConnectingEntityType() ), timestamp );
 +
 +            // delete the connection path if there will be no connections left
 +
 +            boolean delete = true;
 +
 +            //check out outbound edges of the given type.  If we have more than the 1 specified,
 +            // we shouldn't delete the connection types from our outbound index
 +            PagingResultsIterator itr = new PagingResultsIterator(
 +                    getConnectedEntities( connection.getConnectingEntity(), connection.getConnectionType(), null,
 +                            Level.REFS ) );
 +
 +            ConnectedEntityRef c;
 +
 +            while ( itr.hasNext() ) {
 +                c = ( ConnectedEntityRef ) itr.next();
 +
 +                if ( !connection.getConnectedEntityId().equals( c.getUuid() ) ) {
 +                    delete = false;
 +                    break;
 +                }
 +
 +
 +                //        c = (ConnectionRef) itr.next();
 +                //        if (c.getConnectedEntity().getConnectionType().equals(connection.getConnectedEntity()
 +                // .getConnectionType()) &&!c.getConnectedEntity().getUuid().equals(connection.getConnectedEntity()
 +                // .getUuid())) {
 +                //            delete = false;
 +                //            break;
 +                //        }
 +
 +            }
 +            //      for (ConnectionRefImpl c : getConnectionsWithEntity(connection.getConnectingEntityId())) {
 +            //        if (c.getConnectedEntity().getConnectionType().equals(connection.getConnectedEntity()
 +            // .getConnectionType())) {
 +            //          if (!c.getConnectedEntity().getUuid().equals(connection.getConnectedEntity().getUuid())) {
 +            //            delete = false;
 +            //            break;
 +            //          }
 +            //        }
 +            //      }
 +            if ( delete ) {
 +                addDeleteToMutator( batch, ENTITY_DICTIONARIES,
 +                        key( connection.getConnectingEntityId(), DICTIONARY_CONNECTED_TYPES ),
 +                        connection.getConnectionType(), timestamp );
 +            }
 +
 +            // delete the connection path if there will be no connections left
 +            delete = true;
 +
 +
 +            //check out inbound edges of the given type.  If we have more than the 1 specified,
 +            // we shouldn't delete the connection types from our outbound index
 +            itr = new PagingResultsIterator(
 +                    getConnectingEntities( connection.getConnectingEntity(), connection.getConnectionType(), null,
 +                            Level.REFS ) );
 +
 +            while ( itr.hasNext() ) {
 +                c = ( ConnectedEntityRef ) itr.next();
 +
 +                if ( !connection.getConnectedEntityId().equals( c.getUuid() ) ) {
 +                    delete = false;
 +                    break;
 +                }
 +                //        if (c.getConnectedEntity().getConnectionType().equals(connection.getConnectedEntity()
 +                // .getConnectionType()) && !c.getConnectingEntity().getUuid().equals(connection.getConnectingEntity
 +                // ().getUuid())) {
 +                //            delete = false;
 +                //            break;
 +                //        }
 +
 +            }
 +
 +            //      for (ConnectionRefImpl c : getConnectionsWithEntity(connection.getConnectedEntityId())) {
 +            //        if (c.getConnectedEntity().getConnectionType().equals(connection.getConnectedEntity()
 +            // .getConnectionType())) {
 +            //          if (!c.getConnectingEntity().getUuid().equals(connection.getConnectingEntity().getUuid())) {
 +            //            delete = false;
 +            //            break;
 +            //          }
 +            //        }
 +            //      }
 +            if ( delete ) {
 +                addDeleteToMutator( batch, ENTITY_DICTIONARIES,
 +                        key( connection.getConnectedEntityId(), DICTIONARY_CONNECTING_TYPES ),
 +                        connection.getConnectionType(), timestamp );
 +            }
 +        }
 +        else {
 +            addInsertToMutator( batch, ENTITY_COMPOSITE_DICTIONARIES,
 +                    key( connection.getConnectingEntityId(), DICTIONARY_CONNECTED_ENTITIES,
 +                            connection.getConnectionType() ),
 +                    asList( connection.getConnectedEntityId(), connection.getConnectedEntityType() ), timestamp,
 +                    timestamp );
 +
 +            addInsertToMutator( batch, ENTITY_COMPOSITE_DICTIONARIES,
 +                    key( connection.getConnectedEntityId(), DICTIONARY_CONNECTING_ENTITIES,
 +                            connection.getConnectionType() ),
 +                    asList( connection.getConnectingEntityId(), connection.getConnectingEntityType() ), timestamp,
 +                    timestamp );
 +
 +            // Add connection type to connections set
 +            addInsertToMutator( batch, ENTITY_DICTIONARIES,
 +                    key( connection.getConnectingEntityId(), DICTIONARY_CONNECTED_TYPES ),
 +                    connection.getConnectionType(), null, timestamp );
 +
 +            // Add connection type to connections set
 +            addInsertToMutator( batch, ENTITY_DICTIONARIES,
 +                    key( connection.getConnectedEntityId(), DICTIONARY_CONNECTING_TYPES ),
 +                    connection.getConnectionType(), null, timestamp );
 +        }
 +
 +        // Add property indexes
 +
 +        // Iterate though all the properties of the connected entity
 +
 +        Schema schema = getDefaultSchema();
 +        for ( String propertyName : connectedEntity.getProperties().keySet() ) {
 +            Object propertyValue = connectedEntity.getProperties().get( propertyName );
 +
 +            boolean indexed = schema.isPropertyIndexed( connectedEntity.getType(), propertyName );
 +
 +            boolean connection_indexes_property = schema.isPropertyIndexed( connectedEntity.getType(), propertyName );
 +            boolean item_schema_has_property = schema.hasProperty( connectedEntity.getType(), propertyName );
 +            boolean fulltext_indexed = schema.isPropertyFulltextIndexed( connectedEntity.getType(), propertyName );
 +            // For each property, if the schema says it's indexed, update its
 +            // index
 +
 +            if ( indexed && ( connection_indexes_property || !item_schema_has_property ) ) {
 +                IndexUpdate indexUpdate =
 +                        batchStartIndexUpdate( batch, connectedEntity, propertyName, disconnect ? null : propertyValue,
 +                                timestampUuid, item_schema_has_property, false, false, fulltext_indexed );
 +                batchUpdateConnectionIndex( indexUpdate, connection );
 +            }
 +        }
 +
 +        // Add indexes for the connected entity's list properties
 +
 +        // Get the names of the list properties in the connected entity
 +        Set<String> dictionaryNames = em.getDictionaryNames( connectedEntity );
 +
 +        // For each list property, get the values in the list and
 +        // update the index with those values
 +
 +        for ( String dictionaryName : dictionaryNames ) {
 +            boolean has_dictionary = schema.hasDictionary( connectedEntity.getType(), dictionaryName );
 +            boolean dictionary_indexed =
 +                    schema.isDictionaryIndexedInConnections( connectedEntity.getType(), dictionaryName );
 +
 +            if ( dictionary_indexed || !has_dictionary ) {
 +                Set<Object> elementValues = em.getDictionaryAsSet( connectedEntity, dictionaryName );
 +                for ( Object elementValue : elementValues ) {
 +                    IndexUpdate indexUpdate =
 +                            batchStartIndexUpdate( batch, connectedEntity, dictionaryName, elementValue, timestampUuid,
 +                                    has_dictionary, true, disconnect, false );
 +                    batchUpdateConnectionIndex( indexUpdate, connection );
 +                }
 +            }
 +        }
 +
 +        return batch;
 +    }
 +
 +
 +    public void updateEntityConnection( boolean disconnect, ConnectionRefImpl connection ) throws Exception {
 +
 +        UUID timestampUuid = newTimeUUID();
 +        Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
 +
 +        // Make or break the connection
 +
 +        batchUpdateEntityConnection( batch, disconnect, connection, timestampUuid );
 +
 +        // Make or break a connection from the connecting entity
 +        // to the connection itself
 +
 +        ConnectionRefImpl loopback = connection.getConnectionToConnectionEntity();
 +        if ( !disconnect ) {
 +            em.insertEntity( CONNECTION_ENTITY_CONNECTION_TYPE, loopback.getConnectedEntityId() );
 +        }
 +
 +        batchUpdateEntityConnection( batch, disconnect, loopback, timestampUuid );
 +
 +        batchExecute( batch, CassandraService.RETRY_COUNT );
 +    }
 +
 +
 +    @Metered(group = "core", name = "RelationManager_batchDisconnect")
 +    public void batchDisconnect( Mutator<ByteBuffer> batch, UUID timestampUuid ) throws Exception {
 +
 +
 +        PagingResultsIterator itr =
 +                new PagingResultsIterator( getConnectingEntities( headEntity, null, null, Level.REFS ) );
 +
 +        ConnectionRefImpl connection;
 +
 +        while ( itr.hasNext() ) {
 +            connection = ( ConnectionRefImpl ) itr.next();
 +
 +            batchUpdateEntityConnection( batch, true, connection, timestampUuid );
 +        }
 +        //
 +        //    List<ConnectionRefImpl> connections = getConnectionsWithEntity(headEntity.getUuid());
 +        //    if (connections != null) {
 +        //      for (ConnectionRefImpl connection : connections) {
 +        //        batchUpdateEntityConnection(batch, true, connection, timestampUuid);
 +        //      }
 +        //    }
 +    }
 +
 +
 +    public IndexUpdate batchStartIndexUpdate( Mutator<ByteBuffer> batch, Entity entity, String entryName,
 +                                              Object entryValue, UUID timestampUuid, boolean schemaHasProperty,
 +                                              boolean isMultiValue, boolean removeListEntry, boolean fulltextIndexed )
 +            throws Exception {
 +        return batchStartIndexUpdate( batch, entity, entryName, entryValue, timestampUuid, schemaHasProperty,
 +                isMultiValue, removeListEntry, fulltextIndexed, false );
 +    }
 +
 +
 +    @Metered(group = "core", name = "RelationManager_batchStartIndexUpdate")
 +    public IndexUpdate batchStartIndexUpdate( Mutator<ByteBuffer> batch, Entity entity, String entryName,
 +                                              Object entryValue, UUID timestampUuid, boolean schemaHasProperty,
 +                                              boolean isMultiValue, boolean removeListEntry, boolean fulltextIndexed,
 +                                              boolean skipRead ) throws Exception {
 +
 +        long timestamp = getTimestampInMicros( timestampUuid );
 +
 +        IndexUpdate indexUpdate =
 +                new IndexUpdate( batch, entity, entryName, entryValue, schemaHasProperty, isMultiValue, removeListEntry,
 +                        timestampUuid );
 +
 +        // entryName = entryName.toLowerCase();
 +
 +        // entity_id,connection_type,connected_entity_id,prop_name
 +
 +        if ( !skipRead ) {
 +
 +            List<HColumn<ByteBuffer, ByteBuffer>> entries = null;
 +
 +            if ( isMultiValue && validIndexableValue( entryValue ) ) {
 +                entries = cass.getColumns( cass.getApplicationKeyspace( applicationId ), ENTITY_INDEX_ENTRIES,
 +                        entity.getUuid(),
 +                        new DynamicComposite( entryName, indexValueCode( entryValue ), toIndexableValue( entryValue ) ),
 +                        setGreaterThanEqualityFlag( new DynamicComposite( entryName, indexValueCode( entryValue ),
 +                                toIndexableValue( entryValue ) ) ), INDEX_ENTRY_LIST_COUNT, false );
 +            }
 +            else {
 +                entries = cass.getColumns( cass.getApplicationKeyspace( applicationId ), ENTITY_INDEX_ENTRIES,
 +                        entity.getUuid(), new DynamicComposite( entryName ),
 +                        setGreaterThanEqualityFlag( new DynamicComposite( entryName ) ), INDEX_ENTRY_LIST_COUNT,
 +                        false );
 +            }
 +
 +            if ( logger.isDebugEnabled() ) {
 +                logger.debug( "Found {} previous index entries for {} of entity {}", new Object[] {
 +                        entries.size(), entryName, entity.getUuid()
 +                } );
 +            }
 +
 +            // Delete all matching entries from entry list
 +            for ( HColumn<ByteBuffer, ByteBuffer> entry : entries ) {
 +                UUID prev_timestamp = null;
 +                Object prev_value = null;
 +                String prev_obj_path = null;
 +
 +                // new format:
 +                // composite(entryName,
 +                // value_code,prev_value,prev_timestamp,prev_obj_path) = null
 +                DynamicComposite composite = DynamicComposite.fromByteBuffer( entry.getName().duplicate() );
 +                prev_value = composite.get( 2 );
 +                prev_timestamp = ( UUID ) composite.get( 3 );
 +                if ( composite.size() > 4 ) {
 +                    prev_obj_path = ( String ) composite.get( 4 );
 +                }
 +
 +                if ( prev_value != null ) {
 +
 +                    String entryPath = entryName;
 +                    if ( ( prev_obj_path != null ) && ( prev_obj_path.length() > 0 ) ) {
 +                        entryPath = entryName + "." + prev_obj_path;
 +                    }
 +
 +                    indexUpdate.addPrevEntry( entryPath, prev_value, prev_timestamp, entry.getName().duplicate() );
 +
 +                    // composite(property_value,connected_entity_id,entry_timestamp)
 +                    // addDeleteToMutator(batch, ENTITY_INDEX_ENTRIES,
 +                    // entity.getUuid(), entry.getName(), timestamp);
 +
 +                }
 +                else {
 +                    logger.error( "Unexpected condition - deserialized property value is null" );
 +                }
 +            }
 +        }
 +
 +        if ( !isMultiValue || ( isMultiValue && !removeListEntry ) ) {
 +
 +            List<Map.Entry<String, Object>> list = IndexUtils.getKeyValueList( entryName, entryValue, fulltextIndexed );
 +
 +            if ( entryName.equalsIgnoreCase( "location" ) && ( entryValue instanceof Map ) ) {
 +                @SuppressWarnings("rawtypes") double latitude =
 +                        MapUtils.getDoubleValue( ( Map ) entryValue, "latitude" );
 +                @SuppressWarnings("rawtypes") double longitude =
 +                        MapUtils.getDoubleValue( ( Map ) entryValue, "longitude" );
 +                list.add( new AbstractMap.SimpleEntry<String, Object>( "location.coordinates",
 +                        latitude + "," + longitude ) );
 +            }
 +
 +            for ( Map.Entry<String, Object> indexEntry : list ) {
 +
 +                if ( validIndexableValue( indexEntry.getValue() ) ) {
 +                    indexUpdate.addNewEntry( indexEntry.getKey(), toIndexableValue( indexEntry.getValue() ) );
 +                }
 +            }
 +
 +            if ( isMultiValue ) {
 +                addInsertToMutator( batch, ENTITY_INDEX_ENTRIES, entity.getUuid(),
 +                        asList( entryName, indexValueCode( entryValue ), toIndexableValue( entryValue ),
 +                                indexUpdate.getTimestampUuid() ), null, timestamp );
 +            }
 +            else {
 +                // int i = 0;
 +
 +                for ( Map.Entry<String, Object> indexEntry : list ) {
 +
 +                    String name = indexEntry.getKey();
 +                    if ( name.startsWith( entryName + "." ) ) {
 +                        name = name.substring( entryName.length() + 1 );
 +                    }
 +                    else if ( name.startsWith( entryName ) ) {
 +                        name = name.substring( entryName.length() );
 +                    }
 +
 +                    byte code = indexValueCode( indexEntry.getValue() );
 +                    Object val = toIndexableValue( indexEntry.getValue() );
 +                    addInsertToMutator( batch, ENTITY_INDEX_ENTRIES, entity.getUuid(),
 +                            asList( entryName, code, val, indexUpdate.getTimestampUuid(), name ), null, timestamp );
 +
 +                    indexUpdate.addIndex( indexEntry.getKey() );
 +                }
 +            }
 +
 +            indexUpdate.addIndex( entryName );
 +        }
 +
 +        return indexUpdate;
 +    }
 +
 +
 +    @Metered(group = "core", name = "RelationManager_batchUpdatePropertyIndexes")
 +    public void batchUpdatePropertyIndexes( Mutator<ByteBuffer> batch, String propertyName, Object propertyValue,
 +                                            boolean entitySchemaHasProperty, boolean noRead, UUID timestampUuid )
 +            throws Exception {
 +
 +        Entity entity = getHeadEntity();
 +
 +        UUID associatedId = null;
 +        String associatedType = null;
 +
 +        if ( Schema.isAssociatedEntityType( entity.getType() ) ) {
 +            Object item = entity.getProperty( PROPERTY_ITEM );
 +            if ( ( item instanceof UUID ) && ( entity.getProperty( PROPERTY_COLLECTION_NAME ) instanceof String ) ) {
 +                associatedId = ( UUID ) item;
 +                associatedType = string( entity.getProperty( PROPERTY_ITEM_TYPE ) );
 +                String entryName = TYPE_MEMBER + "." + propertyName;
 +                if ( logger.isDebugEnabled() ) {
 +                    logger.debug( "Extended property {} ( {} ).{} indexed as {} ({})." + entryName, new Object[] {
 +                            entity.getType(), entity.getUuid(), propertyName, associatedType, associatedId
 +                    } );
 +                }
 +                propertyName = entryName;
 +            }
 +        }
 +
 +        IndexUpdate indexUpdate = batchStartIndexUpdate( batch, entity, propertyName, propertyValue, timestampUuid,
 +                entitySchemaHasProperty, false, false,
 +                getDefaultSchema().isPropertyFulltextIndexed( entity.getType(), propertyName ), noRead );
 +
 +        // Update collections
 +
 +        String effectiveType = entity.getType();
 +        if ( associatedType != null ) {
 +            indexUpdate.setAssociatedId( associatedId );
 +            effectiveType = associatedType;
 +        }
 +
 +        Map<String, Set<CollectionInfo>> containers = getDefaultSchema().getContainers( effectiveType );
 +        if ( containers != null ) {
 +
 +            Map<EntityRef, Set<String>> containerEntities = null;
 +            if ( noRead ) {
 +                containerEntities = new LinkedHashMap<EntityRef, Set<String>>();
 +                EntityRef applicationRef = new SimpleEntityRef( TYPE_APPLICATION, applicationId );
 +                addMapSet( containerEntities, applicationRef, defaultCollectionName( entity.getType() ) );
 +            }
 +            else {
 +                containerEntities = getContainingCollections();
 +            }
 +
 +            for ( EntityRef containerEntity : containerEntities.keySet() ) {
 +                if ( containerEntity.getType().equals( TYPE_APPLICATION ) && Schema
 +                        .isAssociatedEntityType( entity.getType() ) ) {
 +                    logger.debug( "Extended properties for {} not indexed by application", entity.getType() );
 +                    continue;
 +                }
 +                Set<String> collectionNames = containerEntities.get( containerEntity );
 +                Set<CollectionInfo> collections = containers.get( containerEntity.getType() );
 +
 +                if ( collections != null ) {
 +                    for ( CollectionInfo collection : collections ) {
 +                        if ( collectionNames.contains( collection.getName() ) ) {
 +                            batchUpdateCollectionIndex( indexUpdate, containerEntity, collection.getName() );
 +                        }
 +                    }
 +                }
 +            }
 +        }
 +
 +        if ( !noRead ) {
 +            batchUpdateBackwardConnectionsPropertyIndexes( indexUpdate );
 +        }
 +
 +        /**
 +         * We've updated the properties, add the deletes to the ledger
 +         *
 +         */
 +
 +        for ( IndexEntry entry : indexUpdate.getPrevEntries() ) {
 +            addDeleteToMutator( batch, ENTITY_INDEX_ENTRIES, entity.getUuid(), entry.getLedgerColumn(),
 +                    indexUpdate.getTimestamp() );
 +        }
 +    }
 +
 +
 +    public void batchUpdateSetIndexes( Mutator<ByteBuffer> batch, String setName, Object elementValue,
 +                                       boolean removeFromSet, UUID timestampUuid ) throws Exception {
 +
 +        Entity entity = getHeadEntity();
 +
 +        elementValue = getDefaultSchema().validateEntitySetValue( entity.getType(), setName, elementValue );
 +
 +        IndexUpdate indexUpdate =
 +                batchStartIndexUpdate( batch, entity, setName, elementValue, timestampUuid, true, true, removeFromSet,
 +                        false );
 +
 +        // Update collections
 +        Map<String, Set<CollectionInfo>> containers =
 +                getDefaultSchema().getContainersIndexingDictionary( entity.getType(), setName );
 +
 +        if ( containers != null ) {
 +            Map<EntityRef, Set<String>> containerEntities = getContainingCollections();
 +            for ( EntityRef containerEntity : containerEntities.keySet() ) {
 +                if ( containerEntity.getType().equals( TYPE_APPLICATION ) && Schema
 +                        .isAssociatedEntityType( entity.getType() ) ) {
 +                    logger.debug( "Extended properties for {} not indexed by application", entity.getType() );
 +                    continue;
 +                }
 +                Set<String> collectionNames = containerEntities.get( containerEntity );
 +                Set<CollectionInfo> collections = containers.get( containerEntity.getType() );
 +
 +                if ( collections != null ) {
 +
 +                    for ( CollectionInfo collection : collections ) {
 +                        if ( collectionNames.contains( collection.getName() ) ) {
 +
 +                            batchUpdateCollectionIndex( indexUpdate, containerEntity, collection.getName() );
 +                        }
 +                    }
 +                }
 +            }
 +        }
 +
 +        batchUpdateBackwardConnectionsDictionaryIndexes( indexUpdate );
 +    }
 +
 +
 +    private IndexScanner searchIndex( Object indexKey, QuerySlice slice, int pageSize ) throws Exception {
 +
 +        DynamicComposite[] range = slice.getRange();
 +
 +        Object keyPrefix = key( indexKey, slice.getPropertyName() );
 +
 +        IndexScanner scanner =
 +                new IndexBucketScanner( cass, indexBucketLocator, ENTITY_INDEX, applicationId, IndexType.CONNECTION,
-                         keyPrefix, range[0], range[1], slice.isReversed(), pageSize, slice.getPropertyName() );
++                        keyPrefix, range[0], range[1], slice.isReversed(), pageSize, slice.hasCursor(), slice.getPropertyName() );
 +
 +        return scanner;
 +    }
 +
 +
 +    /**
 +     * Search the collection index using all the buckets for the given collection
 +     *
 +     * @param indexKey The index key to read
 +     * @param slice Slice set in the query
 +     * @param collectionName The name of the collection to search
 +     * @param pageSize The page size to load when iterating
 +     */
 +    private IndexScanner searchIndexBuckets( Object indexKey, QuerySlice slice, String collectionName, int pageSize )
 +            throws Exception {
 +
 +        DynamicComposite[] range = slice.getRange();
 +
 +        Object keyPrefix = key( indexKey, slice.getPropertyName() );
 +
-         // we have a cursor, so the first record should be discarded
-         if ( slice.hasCursor() ) {
-             pageSize++;
-         }
- 
 +        IndexScanner scanner =
 +                new IndexBucketScanner( cass, indexBucketLocator, ENTITY_INDEX, applicationId, IndexType.COLLECTION,
-                         keyPrefix, range[0], range[1], slice.isReversed(), pageSize, collectionName );
++                        keyPrefix, range[0], range[1], slice.isReversed(), pageSize, slice.hasCursor(), collectionName );
 +
 +        return scanner;
 +    }
 +
 +
 +    @SuppressWarnings("unchecked")
 +    @Override
 +    @Metered(group = "core", name = "RelationManager_isOwner")
 +    public boolean isCollectionMember( String collectionName, EntityRef entity ) throws Exception {
 +
 +        Keyspace ko = cass.getApplicationKeyspace( applicationId );
 +
 +        ByteBuffer col = DynamicComposite
 +                .toByteBuffer( asList( this.headEntity.getType(), collectionName, headEntity.getUuid() ) );
 +
 +        HColumn<ByteBuffer, ByteBuffer> result = cass.getColumn( ko, ENTITY_COMPOSITE_DICTIONARIES,
 +                key( entity.getUuid(), Schema.DICTIONARY_CONTAINER_ENTITIES ), col, be, be );
 +
 +        return result != null;
 +    }
 +
 +
 +    /** @param connectionName The name of hte connection */
 +    public boolean isConnectionMember( String connectionName, EntityRef entity ) throws Exception {
 +        Keyspace ko = cass.getApplicationKeyspace( applicationId );
 +
 +        Object key = key( this.headEntity.getUuid(), DICTIONARY_CONNECTED_ENTITIES, connectionName );
 +
 +        DynamicComposite start = new DynamicComposite( entity.getUuid() );
 +
 +        List<HColumn<ByteBuffer, ByteBuffer>> cols =
 +                cass.getColumns( ko, ENTITY_COMPOSITE_DICTIONARIES, key, start, null, 1, false );
 +
 +        if ( cols == null || cols.size() == 0 ) {
 +            return false;
 +        }
 +
 +        UUID returnedUUID = ( UUID ) DynamicComposite.fromByteBuffer( cols.get( 0 ).getName() ).get( 0 );
 +
 +        return entity.getUuid().equals( returnedUUID );
 +
 +
 +        //    addDeleteToMutator(batch, ENTITY_COMPOSITE_DICTIONARIES,
 +        //        key(connection.getConnectedEntityId(), DICTIONARY_CONNECTING_ENTITIES,
 +        // connection.getConnectionType()),
 +        //        asList(connection.getConnectingEntityId(), connection.getConnectingEntityType()), timestamp);
 +        //
 +        //
 +        //    ConnectionRefImpl ref = new ConnectionRefImpl(this.headEntity, connectionName, entity);
 +        //
 +        //
 +        //
 +        //
 +        //
 +        //
 +        //    HColumn<String, UUID> col = cass.getColumn(ko, ENTITY_CONNECTIONS, ref.getUuid(),
 +        //        ConnectionRefImpl.CONNECTED_ENTITY_ID, se, ue);
 +        //
 +        //
 +        //    getConnectedEntities(this.headEntity, connectionName, )
 +        //
 +        //    return col != null && entity.getUuid().equals(col.getValue());
 +    }
 +
 +
 +    @Override
 +    @Metered(group = "core", name = "RelationManager_getOwners")
 +    public Map<String, Map<UUID, Set<String>>> getOwners() throws Exception {
 +        Map<EntityRef, Set<String>> containerEntities = getContainingCollections();
 +        Map<String, Map<UUID, Set<String>>> owners = new LinkedHashMap<String, Map<UUID, Set<String>>>();
 +
 +        for ( EntityRef owner : containerEntities.keySet() ) {
 +            Set<String> collections = containerEntities.get( owner );
 +            for ( String collection : collections ) {
 +                MapUtils.addMapMapSet( owners, owner.getType(), owner.getUuid(), collection );
 +            }
 +        }
 +
 +        return owners;
 +    }
 +
 +
 +    @Override
 +    @Metered(group = "core", name = "RelationManager_getCollections")
 +    public Set<String> getCollections() throws Exception {
 +
 +        Map<String, CollectionInfo> collections = getDefaultSchema().getCollections( headEntity.getType() );
 +        if ( collections == null ) {
 +            return null;
 +        }
 +
 +        return collections.keySet();
 +    }
 +
 +
 +    @Override
 +    @Metered(group = "core", name = "RelationManager_getCollection_start_result")
 +    public Results getCollection( String collectionName, UUID startResult, int count, Results.Level resultsLevel,
 +                                  boolean reversed ) throws Exception {
 +        // changed intentionally to delegate to search so that behavior is
 +        // consistent across all index access.
 +
 +        // TODO T.N fix cursor parsing here so startResult can be used in this
 +        // context. Needs a bit of refactor
 +        // for accommodating cursor I/O USERGRID-1750. A bit hacky, but until a
 +        // furthur refactor this works.
 +
 +        Query query = new Query().withResultsLevel( resultsLevel ).withReversed( reversed ).withLimit( count )
 +                                 .withStartResult( startResult );
 +
 +        return searchCollection( collectionName, query );
 +    }
 +
 +
 +    @Override
 +    @Metered(group = "core", name = "RelationManager_getCollecitonForQuery")
 +    public Results getCollection( String collectionName, Query query, Results.Level resultsLevel ) throws Exception {
 +
 +        // changed intentionally to delegate to search so that behavior is
 +        // consistent across all index access.
 +
 +        return searchCollection( collectionName, query );
 +    }
 +
 +
 +    @Override
 +    @Metered(group = "core", name = "RelationManager_addToCollection")
 +    public Entity addToCollection( String collectionName, EntityRef itemRef ) throws Exception {
 +
 +        Entity itemEntity = em.get( itemRef );
 +
 +        if ( itemEntity == null ) {
 +            return null;
 +        }
 +
 +        CollectionInfo collection = getDefaultSchema().getCollection( headEntity.getType(), collectionName );
 +        if ( ( collection != null ) && !collection.getType().equals( itemRef.getType() ) ) {
 +            return null;
 +        }
 +
 +        UUID timestampUuid = newTimeUUID();
 +        Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
 +
 +        batchAddToCollection( batch, collectionName, itemEntity, timestampUuid );
 +
 +        if ( collection.getLinkedCollection() != null ) {
 +            getRelationManager( itemEntity )
 +                    .batchAddToCollection( batch, collection.getLinkedCollection(), getHeadEntity(), timestampUuid );
 +        }
 +
 +        batchExecute( batch, CassandraService.RETRY_COUNT );
 +
 +        return itemEntity;
 +    }
 +
 +
 +    @Override
 +    @Metered(group = "core", name = "RelationManager_addToCollections")
 +    public Entity addToCollections( List<EntityRef> owners, String collectionName ) throws Exception {
 +
 +        Entity itemEntity = getHeadEntity();
 +
 +        Map<String, List<UUID>> collectionsByType = new LinkedHashMap<String, List<UUID>>();
 +        for ( EntityRef owner : owners ) {
 +            MapUtils.addMapList( collectionsByType, owner.getType(), owner.getUuid() );
 +        }
 +
 +        UUID timestampUuid = newTimeUUID();
 +        Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
 +
 +        Schema schema = getDefaultSchema();
 +        for ( Entry<String, List<UUID>> entry : collectionsByType.entrySet() ) {
 +            CollectionInfo collection = schema.getCollection( entry.getKey(), collectionName );
 +            if ( ( collection != null ) && !collection.getType().equals( headEntity.getType() ) ) {
 +                continue;
 +            }
 +            batchAddToCollections( batch, entry.getKey(), entry.getValue(), collectionName, itemEntity, timestampUuid );
 +
 +            if ( collection.getLinkedCollection() != null ) {
 +                logger.error(
 +                        "Bulk add to collections used on a linked collection, linked connection will not be updated" );
 +            }
 +        }
 +
 +        batchExecute( batch, CassandraService.RETRY_COUNT );
 +
 +        return null;
 +    }
 +
 +
 +    @Override
 +    @Metered(group = "core", name = "RelationManager_createItemInCollection")
 +    public Entity createItemInCollection( String collectionName, String itemType, Map<String, Object> properties )
 +            throws Exception {
 +
 +        if ( headEntity.getUuid().equals( applicationId ) ) {
 +            if ( itemType.equals( TYPE_ENTITY ) ) {
 +                itemType = singularize( collectionName );
 +            }
 +            if ( itemType.equals( TYPE_ROLE ) ) {
 +                Long inactivity = ( Long ) properties.get( PROPERTY_INACTIVITY );
 +                if ( inactivity == null ) {
 +                    inactivity = 0L;
 +                }
 +                return em.createRole( ( String ) properties.get( PROPERTY_NAME ),
 +                        ( String ) properties.get( PROPERTY_TITLE ), inactivity );
 +            }
 +            return em.create( itemType, properties );
 +        }
 +        else if ( headEntity.getType().equals( Group.ENTITY_TYPE ) && ( collectionName.equals( COLLECTION_ROLES ) ) ) {
 +            UUID groupId = headEntity.getUuid();
 +            String roleName = ( String ) properties.get( PROPERTY_NAME );
 +            return em.createGroupRole( groupId, roleName, ( Long ) properties.get( PROPERTY_INACTIVITY ) );
 +        }
 +
 +        CollectionInfo collection = getDefaultSchema().getCollection( headEntity.getType(), collectionName );
 +        if ( ( collection != null ) && !collection.getType().equals( itemType ) ) {
 +            return null;
 +        }
 +
 +        properties = getDefaultSchema().cleanUpdatedProperties( itemType, properties, true );
 +
 +        Entity itemEntity = em.create( itemType, properties );
 +
 +        if ( itemEntity != null ) {
 +            UUID timestampUuid = newTimeUUID();
 +            Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
 +
 +            batchAddToCollection( batch, collectionName, itemEntity, timestampUuid );
 +
 +            if ( collection.getLinkedCollection() != null ) {
 +                getRelationManager( itemEntity )
 +                        .batchAddToCollection( batch, collection.getLinkedCollection(), getHeadEntity(),
 +                                timestampUuid );
 +            }
 +
 +            batchExecute( batch, CassandraService.RETRY_COUNT );
 +        }
 +
 +        return itemEntity;
 +    }
 +
 +
 +    @Override
 +    @Metered(group = "core", name = "RelationManager_removeFromCollection")
 +    public void removeFromCollection( String collectionName, EntityRef itemRef ) throws Exception {
 +
 +        if ( headEntity.getUuid().equals( applicationId ) ) {
 +            if ( collectionName.equals( COLLECTION_ROLES ) ) {
 +                Entity itemEntity = em.get( itemRef );
 +                if ( itemEntity != null ) {
 +                    RoleRef roleRef = SimpleRoleRef.forRoleEntity( itemEntity );
 +                    em.deleteRole( roleRef.getApplicationRoleName() );
 +                    return;
 +                }
 +                em.delete( itemEntity );
 +                return;
 +            }
 +            em.delete( itemRef );
 +            return;
 +        }
 +
 +        Entity itemEntity = em.get( itemRef );
 +
 +        if ( itemEntity == null ) {
 +            return;
 +        }
 +
 +        UUID timestampUuid = newTimeUUID();
 +        Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
 +
 +        batchRemoveFromCollection( batch, collectionName, itemEntity, timestampUuid );
 +
 +        CollectionInfo collection = getDefaultSchema().getCollection( headEntity.getType(), collectionName );
 +        if ( ( collection != null ) && ( collection.getLinkedCollection() != null ) ) {
 +            getRelationManager( itemEntity )
 +                    .batchRemoveFromCollection( batch, collection.getLinkedCollection(), getHeadEntity(),
 +                            timestampUuid );
 +        }
 +
 +        batchExecute( batch, CassandraService.RETRY_COUNT );
 +
 +        if ( headEntity.getType().equals( Group.ENTITY_TYPE ) ) {
 +            if ( collectionName.equals( COLLECTION_ROLES ) ) {
 +                String path = ( String ) ( ( Entity ) itemRef ).getMetadata( "path" );
 +                if ( path.startsWith( "/roles/" ) ) {
 +                    RoleRef roleRef = SimpleRoleRef.forRoleEntity( itemEntity );
 +                    em.deleteRole( roleRef.getApplicationRoleName() );
 +                }
 +            }
 +        }
 +    }
 +
 +
 +    @Metered(group = "core", name = "RelationManager_batchRemoveFromContainers")
 +    public void batchRemoveFromContainers( Mutator<ByteBuffer> m, UUID timestampUuid ) throws Exception {
 +        Entity entity = getHeadEntity();
 +        // find all the containing collections
 +        Map<EntityRef, Set<String>> containers = getContainingCollections();
 +        if ( containers != null ) {
 +            for ( Entry<EntityRef, Set<String>> container : containers.entrySet() ) {
 +                for ( String collectionName : container.getValue() ) {
 +                    getRelationManager( container.getKey() )
 +                            .batchRemoveFromCollection( m, collectionName, entity, true, timestampUuid );
 +                }
 +            }
 +        }
 +    }
 +
 +
 +    @Override
 +    @Metered(group = "core", name = "RelationManager_copyRelationships")
 +    public void copyRelationships( String srcRelationName, EntityRef dstEntityRef, String dstRelationName )
 +            throws Exception {
 +
 +        headEntity = em.validate( headEntity );
 +        dstEntityRef = em.validate( dstEntityRef );
 +
 +        CollectionInfo srcCollection = getDefaultSchema().getCollection( headEntity.getType(), srcRelationName );
 +
 +        CollectionInfo dstCollection = getDefaultSchema().getCollection( dstEntityRef.getType(), dstRelationName );
 +
 +        Results results = null;
 +        do {
 +            if ( srcCollection != null ) {
 +                results = em.getCollection( headEntity, srcRelationName, null, 5000, Level.REFS, false );
 +            }
 +            else {
 +                results = em.getConnectedEntities( headEntity.getUuid(), srcRelationName, null, Level.REFS );
 +            }
 +
 +            if ( ( results != null ) && ( results.size() > 0 ) ) {
 +                List<EntityRef> refs = results.getRefs();
 +                for ( EntityRef ref : refs ) {
 +                    if ( dstCollection != null ) {
 +                        em.addToCollection( dstEntityRef, dstRelationName, ref );
 +                    }
 +                    else {
 +                        em.createConnection( dstEntityRef, dstRelationName, ref );
 +                    }
 +                }
 +            }
 +        }
 +        while ( ( results != null ) && ( results.hasMoreResults() ) );
 +    }
 +
 +
 +    @Override
 +    @Metered(group = "core", name = "RelationManager_searchCollection")
 +    public Results searchCollection( String collectionName, Query query ) throws Exception {
 +
 +        if ( query == null ) {
 +            query = new Query();
 +        }
 +
 +        headEntity = em.validate( headEntity );
 +
 +        CollectionInfo collection = getDefaultSchema().getCollection( headEntity.getType(), collectionName );
 +
 +        query.setEntityType( collection.getType() );
 +
 +        final CollectionResultsLoaderFactory factory = new CollectionResultsLoaderFactory();
 +
 +        // we have something to search with, visit our tree and evaluate the
 +        // results
 +        QueryProcessor qp = new QueryProcessor( query, collection, em, factory );
 +        SearchCollectionVisitor visitor = new SearchCollectionVisitor( qp );
 +
 +        return qp.getResults( visitor );
 +    }
 +
 +
 +    @Override
 +    @Metered(group = "core", name = "RelationManager_createConnection_connection_ref")
 +    public ConnectionRef createConnection( ConnectionRef connection ) throws Exception {
 +        ConnectionRefImpl connectionImpl = new ConnectionRefImpl( connection );
 +
 +        updateEntityConnection( false, connectionImpl );
 +
 +        return connection;
 +    }
 +
 +
 +    @Override
 +    @Metered(group = "core", name = "RelationManager_createConnection_connectionType")
 +    public ConnectionRef createConnection( String connectionType, EntityRef connectedEntityRef ) throws Exception {
 +
 +        headEntity = em.validate( headEntity );
 +        connectedEntityRef = em.validate( connectedEntityRef );
 +
 +        ConnectionRefImpl connection = new ConnectionRefImpl( headEntity, connectionType, connectedEntityRef );
 +
 +        updateEntityConnection( false, connection );
 +
 +        return connection;
 +    }
 +
 +
 +    @Override
 +    @Metered(group = "core", name = "RelationManager_createConnection_paired_connection_type")
 +    public ConnectionRef createConnection( String pairedConnectionType, EntityRef pairedEntity, String connectionType,
 +                                           EntityRef connectedEntityRef ) throws Exception {
 +
 +        ConnectionRefImpl connection =
 +                new ConnectionRefImpl( headEntity, new ConnectedEntityRefImpl( pairedConnectionType, pairedEntity ),
 +                        new ConnectedEntityRefImpl( connectionType, connectedEntityRef ) );
 +
 +        updateEntityConnection( false, connection );
 +
 +        return connection;
 +    }
 +
 +
 +    @Override
 +    @Metered(group = "core", name = "RelationManager_createConnection_connected_entity_ref")
 +    public ConnectionRef createConnection( ConnectedEntityRef... connections ) throws Exception {
 +
 +        ConnectionRefImpl connection = new ConnectionRefImpl( headEntity, connections );
 +
 +        updateEntityConnection( false, connection );
 +
 +        return connection;
 +    }
 +
 +
 +    @Override
 +    @Metered(group = "core", name = "RelationManager_connectionRef_type_entity")
 +    public ConnectionRef connectionRef( String connectionType, EntityRef connectedEntityRef ) throws Exception {
 +
 +        ConnectionRef connection = new ConnectionRefImpl( headEntity, connectionType, connectedEntityRef );
 +
 +        return connection;
 +    }
 +
 +
 +    @Override
 +    @Metered(group = "core", name = "RelationManager_connectionRef_entity_to_entity")
 +    public ConnectionRef connectionRef( String pairedConnectionType, EntityRef pairedEntity, String connectionType,
 +                                        EntityRef connectedEntityRef ) throws Exception {
 +
 +        ConnectionRef connection =
 +                new ConnectionRefImpl( headEntity, new ConnectedEntityRefImpl( pairedConnectionType, pairedEntity ),
 +                        new ConnectedEntityRefImpl( connectionType, connectedEntityRef ) );
 +
 +        return connection;
 +    }
 +
 +
 +    @Override
 +    @Metered(group = "core", name = "RelationManager_connectionRef_connections")
 +    public ConnectionRef connectionRef( ConnectedEntityRef... connections ) {
 +
 +        ConnectionRef connection = new ConnectionRefImpl( headEntity, connections );
 +
 +        return connection;
 +    }
 +
 +
 +    @Override
 +    @Metered(group = "core", name = "RelationManager_deleteConnection")
 +    public void deleteConnection( ConnectionRef connectionRef ) throws Exception {
 +        updateEntityConnection( true, new ConnectionRefImpl( connectionRef ) );
 +    }
 +
 +
 +    @Override
 +    @Metered(group = "core", 

<TRUNCATED>