You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/02/13 00:16:01 UTC

[06/10] incubator-usergrid git commit: First pass at removing unnecessary 1.0 files.

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bd743734/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
deleted file mode 100644
index be84176..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
+++ /dev/null
@@ -1,2338 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.usergrid.persistence.cassandra;
-
-
-import java.nio.ByteBuffer;
-import java.util.AbstractMap;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.util.Assert;
-import org.apache.usergrid.persistence.CollectionRef;
-import org.apache.usergrid.persistence.ConnectedEntityRef;
-import org.apache.usergrid.persistence.ConnectionRef;
-import org.apache.usergrid.persistence.Entity;
-import org.apache.usergrid.persistence.EntityRef;
-import org.apache.usergrid.persistence.IndexBucketLocator;
-import org.apache.usergrid.persistence.IndexBucketLocator.IndexType;
-import org.apache.usergrid.persistence.PagingResultsIterator;
-import org.apache.usergrid.persistence.index.query.Query;
-import org.apache.usergrid.persistence.RelationManager;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.RoleRef;
-import org.apache.usergrid.persistence.Schema;
-import org.apache.usergrid.persistence.SimpleCollectionRef;
-import org.apache.usergrid.persistence.SimpleEntityRef;
-import org.apache.usergrid.persistence.SimpleRoleRef;
-import org.apache.usergrid.persistence.cassandra.IndexUpdate.IndexEntry;
-import org.apache.usergrid.persistence.cassandra.index.ConnectedIndexScanner;
-import org.apache.usergrid.persistence.cassandra.index.IndexBucketScanner;
-import org.apache.usergrid.persistence.cassandra.index.IndexScanner;
-import org.apache.usergrid.persistence.cassandra.index.NoOpIndexScanner;
-import org.apache.usergrid.persistence.entities.Group;
-import org.apache.usergrid.persistence.geo.CollectionGeoSearch;
-import org.apache.usergrid.persistence.geo.ConnectionGeoSearch;
-import org.apache.usergrid.persistence.geo.EntityLocationRef;
-import org.apache.usergrid.persistence.geo.model.Point;
-import org.apache.usergrid.persistence.hector.CountingMutator;
-import org.apache.usergrid.persistence.query.ir.AllNode;
-import org.apache.usergrid.persistence.query.ir.NameIdentifierNode;
-import org.apache.usergrid.persistence.query.ir.QueryNode;
-import org.apache.usergrid.persistence.query.ir.QuerySlice;
-import org.apache.usergrid.persistence.query.ir.SearchVisitor;
-import org.apache.usergrid.persistence.query.ir.WithinNode;
-import org.apache.usergrid.persistence.query.ir.result.CollectionResultsLoaderFactory;
-import org.apache.usergrid.persistence.query.ir.result.ConnectionIndexSliceParser;
-import org.apache.usergrid.persistence.query.ir.result.ConnectionResultsLoaderFactory;
-import org.apache.usergrid.persistence.query.ir.result.ConnectionTypesIterator;
-import org.apache.usergrid.persistence.query.ir.result.EmptyIterator;
-import org.apache.usergrid.persistence.query.ir.result.GeoIterator;
-import org.apache.usergrid.persistence.query.ir.result.SliceIterator;
-import org.apache.usergrid.persistence.query.ir.result.StaticIdIterator;
-import org.apache.usergrid.persistence.query.ir.result.UUIDIndexSliceParser;
-import org.apache.usergrid.persistence.schema.CollectionInfo;
-import org.apache.usergrid.utils.IndexUtils;
-import org.apache.usergrid.utils.MapUtils;
-
-import com.google.common.base.Preconditions;
-import com.yammer.metrics.annotation.Metered;
-
-import me.prettyprint.hector.api.Keyspace;
-import me.prettyprint.hector.api.beans.DynamicComposite;
-import me.prettyprint.hector.api.beans.HColumn;
-import me.prettyprint.hector.api.mutation.Mutator;
-
-import static java.lang.String.CASE_INSENSITIVE_ORDER;
-import static java.util.Arrays.asList;
-import static me.prettyprint.hector.api.factory.HFactory.createMutator;
-import org.apache.usergrid.persistence.EntityManager;
-import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES;
-import static org.apache.usergrid.persistence.Schema.DICTIONARY_COLLECTIONS;
-import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_ENTITIES;
-import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_TYPES;
-import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTING_ENTITIES;
-import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTING_TYPES;
-import static org.apache.usergrid.persistence.Schema.INDEX_CONNECTIONS;
-import static org.apache.usergrid.persistence.Schema.PROPERTY_COLLECTION_NAME;
-import static org.apache.usergrid.persistence.Schema.PROPERTY_INACTIVITY;
-import static org.apache.usergrid.persistence.Schema.PROPERTY_ITEM;
-import static org.apache.usergrid.persistence.Schema.PROPERTY_ITEM_TYPE;
-import static org.apache.usergrid.persistence.Schema.PROPERTY_NAME;
-import static org.apache.usergrid.persistence.Schema.PROPERTY_TITLE;
-import static org.apache.usergrid.persistence.Schema.PROPERTY_TYPE;
-import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
-import static org.apache.usergrid.persistence.Schema.TYPE_ENTITY;
-import static org.apache.usergrid.persistence.Schema.TYPE_MEMBER;
-import static org.apache.usergrid.persistence.Schema.TYPE_ROLE;
-import static org.apache.usergrid.persistence.Schema.defaultCollectionName;
-import static org.apache.usergrid.persistence.Schema.getDefaultSchema;
-import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_COMPOSITE_DICTIONARIES;
-import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_DICTIONARIES;
-import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_ID_SETS;
-import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_INDEX;
-import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_INDEX_ENTRIES;
-import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.addDeleteToMutator;
-import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.addInsertToMutator;
-import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.batchExecute;
-import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.key;
-import static org.apache.usergrid.persistence.cassandra.CassandraService.INDEX_ENTRY_LIST_COUNT;
-import static org.apache.usergrid.persistence.cassandra.ConnectionRefImpl.CONNECTION_ENTITY_CONNECTION_TYPE;
-import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchDeleteLocationInConnectionsIndex;
-import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchRemoveLocationFromCollectionIndex;
-import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchStoreLocationInCollectionIndex;
-import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchStoreLocationInConnectionsIndex;
-import static org.apache.usergrid.persistence.cassandra.IndexUpdate.indexValueCode;
-import static org.apache.usergrid.persistence.cassandra.IndexUpdate.toIndexableValue;
-import static org.apache.usergrid.persistence.cassandra.IndexUpdate.validIndexableValue;
-import static org.apache.usergrid.utils.ClassUtils.cast;
-import static org.apache.usergrid.utils.CompositeUtils.setGreaterThanEqualityFlag;
-import static org.apache.usergrid.utils.ConversionUtils.string;
-import static org.apache.usergrid.utils.InflectionUtils.singularize;
-import static org.apache.usergrid.utils.MapUtils.addMapSet;
-import static org.apache.usergrid.utils.UUIDUtils.getTimestampInMicros;
-import static org.apache.usergrid.utils.UUIDUtils.newTimeUUID;
-
-import org.apache.usergrid.persistence.entities.Application;
-import org.apache.usergrid.persistence.index.query.Query.Level;
-
-
-public class RelationManagerImpl implements RelationManager {
-
-    private static final Logger logger = LoggerFactory.getLogger( RelationManagerImpl.class );
-
-    private EntityManager em;
-    private CassandraService cass;
-    private UUID applicationId;
-    private EntityRef headEntity;
-    private IndexBucketLocator indexBucketLocator;
-
-
-    public RelationManagerImpl() {
-    }
-
-
-    public RelationManagerImpl init( EntityManager em, CassandraService cass, UUID applicationId,
-                                     EntityRef headEntity, IndexBucketLocator indexBucketLocator ) {
-
-        Assert.notNull( em, "Entity manager cannot be null" );
-        Assert.notNull( cass, "Cassandra service cannot be null" );
-        Assert.notNull( applicationId, "Application Id cannot be null" );
-        Assert.notNull( headEntity, "Head entity cannot be null" );
-        Assert.notNull( headEntity.getUuid(), "Head entity uuid cannot be null" );
-        Assert.notNull( indexBucketLocator, "Index bucket locator cannot be null" );
-
-        this.em = em;
-        this.applicationId = applicationId;
-        this.cass = cass;
-        this.headEntity = headEntity;
-        this.indexBucketLocator = indexBucketLocator;
-
-        return this;
-    }
-
-
-    private RelationManagerImpl getRelationManager( EntityRef headEntity ) {
-        RelationManagerImpl rmi = new RelationManagerImpl();
-        rmi.init( em, cass, applicationId, headEntity, indexBucketLocator );
-        return rmi;
-    }
-
-
-    /** side effect: converts headEntity into an Entity if it is an EntityRef! */
-    private Entity getHeadEntity() throws Exception {
-        Entity entity = null;
-        if ( headEntity instanceof Entity ) {
-            entity = ( Entity ) headEntity;
-        }
-        else {
-            entity = em.get( headEntity );
-            headEntity = entity;
-        }
-        return entity;
-    }
-
-
-    /**
-     * Batch update collection index.
-     *
-     * @param indexUpdate The update to apply
-     * @param owner The entity that is the owner context of this entity update.  Can either be an application, or
-     * another entity
-     * @param collectionName the collection name
-     *
-     * @return The indexUpdate with batch mutations
-     *
-     * @throws Exception the exception
-     */
-    @Metered(group = "core", name = "RelationManager_batchUpdateCollectionIndex")
-    public IndexUpdate batchUpdateCollectionIndex( IndexUpdate indexUpdate, EntityRef owner, String collectionName )
-            throws Exception {
-
-        logger.debug( "batchUpdateCollectionIndex" );
-
-        Entity indexedEntity = indexUpdate.getEntity();
-
-        String bucketId = indexBucketLocator
-                .getBucket( applicationId, IndexType.COLLECTION, indexedEntity.getUuid(), indexedEntity.getType(),
-                        indexUpdate.getEntryName() );
-
-        // the root name without the bucket
-        // entity_id,collection_name,prop_name,
-        Object index_name = null;
-        // entity_id,collection_name,prop_name, bucketId
-        Object index_key = null;
-
-        // entity_id,collection_name,collected_entity_id,prop_name
-
-        for ( IndexEntry entry : indexUpdate.getPrevEntries() ) {
-
-            if ( entry.getValue() != null ) {
-
-                index_name = key( owner.getUuid(), collectionName, entry.getPath() );
-
-                index_key = key( index_name, bucketId );
-
-                addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, index_key, entry.getIndexComposite(),
-                        indexUpdate.getTimestamp() );
-
-                if ( "location.coordinates".equals( entry.getPath() ) ) {
-                    EntityLocationRef loc = new EntityLocationRef( indexUpdate.getEntity(), entry.getTimestampUuid(),
-                            entry.getValue().toString() );
-                    batchRemoveLocationFromCollectionIndex( indexUpdate.getBatch(), indexBucketLocator, applicationId,
-                            index_name, loc );
-                }
-            }
-            else {
-                logger.error( "Unexpected condition - deserialized property value is null" );
-            }
-        }
-
-        if ( ( indexUpdate.getNewEntries().size() > 0 ) && ( !indexUpdate.isMultiValue() || ( indexUpdate.isMultiValue()
-                && !indexUpdate.isRemoveListEntry() ) ) ) {
-
-            for ( IndexEntry indexEntry : indexUpdate.getNewEntries() ) {
-
-                // byte valueCode = indexEntry.getValueCode();
-
-                index_name = key( owner.getUuid(), collectionName, indexEntry.getPath() );
-
-                index_key = key( index_name, bucketId );
-
-                // int i = 0;
-
-                addInsertToMutator( indexUpdate.getBatch(), ENTITY_INDEX, index_key, indexEntry.getIndexComposite(),
-                        null, indexUpdate.getTimestamp() );
-
-                if ( "location.coordinates".equals( indexEntry.getPath() ) ) {
-                    EntityLocationRef loc =
-                            new EntityLocationRef( indexUpdate.getEntity(), indexEntry.getTimestampUuid(),
-                                    indexEntry.getValue().toString() );
-                    batchStoreLocationInCollectionIndex( indexUpdate.getBatch(), indexBucketLocator, applicationId,
-                            index_name, indexedEntity.getUuid(), loc );
-                }
-
-                // i++;
-            }
-        }
-
-        for ( String index : indexUpdate.getIndexesSet() ) {
-            addInsertToMutator( indexUpdate.getBatch(), ENTITY_DICTIONARIES,
-                    key( owner.getUuid(), collectionName, Schema.DICTIONARY_INDEXES ), index, null,
-                    indexUpdate.getTimestamp() );
-        }
-
-        return indexUpdate;
-    }
-
-
-    @Override
-    @Metered(group = "core", name = "RelationManager_getCollectionIndexes")
-    public Set<String> getCollectionIndexes( String collectionName ) throws Exception {
-
-        // TODO TN, read all buckets here
-        List<HColumn<String, String>> results =
-                cass.getAllColumns( cass.getApplicationKeyspace( applicationId ), ENTITY_DICTIONARIES,
-                        key( headEntity.getUuid(), collectionName, Schema.DICTIONARY_INDEXES ), Serializers.se, Serializers.se );
-        Set<String> indexes = new TreeSet<String>();
-        if ( results != null ) {
-            for ( HColumn<String, String> column : results ) {
-                String propertyName = column.getName();
-                if ( !propertyName.endsWith( ".keywords" ) ) {
-                    indexes.add( column.getName() );
-                }
-            }
-        }
-        return indexes;
-    }
-
-
-    public Map<EntityRef, Set<String>> getContainingCollections() throws Exception {
-        Map<EntityRef, Set<String>> results = new LinkedHashMap<EntityRef, Set<String>>();
-
-        Keyspace ko = cass.getApplicationKeyspace( applicationId );
-
-        // TODO TN get all buckets here
-
-        List<HColumn<DynamicComposite, ByteBuffer>> containers = cass.getAllColumns( ko, ENTITY_COMPOSITE_DICTIONARIES,
-                key( headEntity.getUuid(), Schema.DICTIONARY_CONTAINER_ENTITIES ), Serializers.dce, Serializers.be );
-        if ( containers != null ) {
-            for ( HColumn<DynamicComposite, ByteBuffer> container : containers ) {
-                DynamicComposite composite = container.getName();
-                if ( composite != null ) {
-                    String ownerType = ( String ) composite.get( 0 );
-                    String collectionName = ( String ) composite.get( 1 );
-                    UUID ownerId = ( UUID ) composite.get( 2 );
-                    addMapSet( results, new SimpleEntityRef( ownerType, ownerId ), collectionName );
-                    if ( logger.isDebugEnabled() ) {
-                        logger.debug( " {} ( {} ) is in collection {} ( {} ).", new Object[] {
-                                headEntity.getType(), headEntity.getUuid(), ownerType, collectionName, ownerId
-                        } );
-                    }
-                }
-            }
-        }
-        EntityRef applicationRef = new SimpleEntityRef( TYPE_APPLICATION, applicationId );
-        if ( !results.containsKey( applicationRef ) ) {
-            addMapSet( results, applicationRef, defaultCollectionName( headEntity.getType() ) );
-        }
-        return results;
-    }
-
-
-    @SuppressWarnings("unchecked")
-    public void batchCreateCollectionMembership( Mutator<ByteBuffer> batch, EntityRef ownerRef, String collectionName,
-                                                 EntityRef itemRef, EntityRef membershipRef, UUID timestampUuid )
-            throws Exception {
-
-        long timestamp = getTimestampInMicros( timestampUuid );
-
-        if ( membershipRef == null ) {
-            membershipRef = new SimpleCollectionRef( ownerRef, collectionName, itemRef );
-        }
-
-        Map<String, Object> properties = new TreeMap<String, Object>( CASE_INSENSITIVE_ORDER );
-        properties.put( PROPERTY_TYPE, membershipRef.getType() );
-        properties.put( PROPERTY_COLLECTION_NAME, collectionName );
-        properties.put( PROPERTY_ITEM, itemRef.getUuid() );
-        properties.put( PROPERTY_ITEM_TYPE, itemRef.getType() );
-
-        em.batchCreate( batch, membershipRef.getType(), null, properties, membershipRef.getUuid(), timestampUuid );
-
-        addInsertToMutator( batch, ENTITY_COMPOSITE_DICTIONARIES,
-                key( membershipRef.getUuid(), Schema.DICTIONARY_CONTAINER_ENTITIES ),
-                asList( ownerRef.getType(), collectionName, ownerRef.getUuid() ), membershipRef.getUuid(), timestamp );
-    }
-
-
-    /**
-     * Batch add to collection.
-     *
-     * @param batch the batch
-     * @param collectionName the collection name
-     * @param entity The entity to add to the batch
-     * @param timestampUuid The timestamp of this update in a time uuid
-     *
-     * @return batch
-     *
-     * @throws Exception the exception
-     */
-    public Mutator<ByteBuffer> batchAddToCollection( Mutator<ByteBuffer> batch, String collectionName, Entity entity,
-                                                     UUID timestampUuid ) throws Exception {
-        List<UUID> ids = new ArrayList<UUID>( 1 );
-        ids.add( headEntity.getUuid() );
-        return batchAddToCollections( batch, headEntity.getType(), ids, collectionName, entity, timestampUuid );
-    }
-
-
-    @SuppressWarnings("unchecked")
-    @Metered(group = "core", name = "RelationManager_batchAddToCollections")
-    public Mutator<ByteBuffer> batchAddToCollections( Mutator<ByteBuffer> batch, String ownerType, List<UUID> ownerIds,
-                                                      String collectionName, Entity entity, UUID timestampUuid )
-            throws Exception {
-
-        long timestamp = getTimestampInMicros( timestampUuid );
-
-        if ( Schema.isAssociatedEntityType( entity.getType() ) ) {
-            logger.error( "Cant add an extended type to any collection", new Throwable() );
-            return batch;
-        }
-
-        Map<UUID, CollectionRef> membershipRefs = new LinkedHashMap<UUID, CollectionRef>();
-
-        for ( UUID ownerId : ownerIds ) {
-
-            CollectionRef membershipRef =
-                    new SimpleCollectionRef( new SimpleEntityRef( ownerType, ownerId ), collectionName, entity );
-
-            membershipRefs.put( ownerId, membershipRef );
-
-            // get the bucket this entityId needs to be inserted into
-            String bucketId = indexBucketLocator
-                    .getBucket( applicationId, IndexType.COLLECTION, entity.getUuid(), collectionName );
-
-            Object collections_key = key( ownerId, Schema.DICTIONARY_COLLECTIONS, collectionName, bucketId );
-
-            // Insert in main collection
-
-            addInsertToMutator( batch, ENTITY_ID_SETS, collections_key, entity.getUuid(), membershipRef.getUuid(),
-                    timestamp );
-
-            addInsertToMutator( batch, ENTITY_COMPOSITE_DICTIONARIES,
-                    key( entity.getUuid(), Schema.DICTIONARY_CONTAINER_ENTITIES ),
-                    asList( ownerType, collectionName, ownerId ), membershipRef.getUuid(), timestamp );
-        }
-
-
-        Schema schema = getDefaultSchema();
-
-        // Add property indexes
-        for ( String propertyName : entity.getProperties().keySet() ) {
-            boolean indexed_property = schema.isPropertyIndexed( entity.getType(), propertyName );
-            if ( indexed_property ) {
-                boolean collection_indexes_property =
-                        schema.isPropertyIndexedInCollection( ownerType, collectionName, propertyName );
-                boolean item_schema_has_property = schema.hasProperty( entity.getType(), propertyName );
-                boolean fulltext_indexed = schema.isPropertyFulltextIndexed( entity.getType(), propertyName );
-                if ( collection_indexes_property || !item_schema_has_property ) {
-                    Object propertyValue = entity.getProperty( propertyName );
-                    IndexUpdate indexUpdate =
-                            batchStartIndexUpdate( batch, entity, propertyName, propertyValue, timestampUuid,
-                                    item_schema_has_property, false, false, fulltext_indexed, true );
-                    for ( UUID ownerId : ownerIds ) {
-                        EntityRef owner = new SimpleEntityRef( ownerType, ownerId );
-                        batchUpdateCollectionIndex( indexUpdate, owner, collectionName );
-                    }
-                }
-            }
-        }
-
-        // Add set property indexes
-
-        Set<String> dictionaryNames = em.getDictionaryNames( entity );
-
-        for ( String dictionaryName : dictionaryNames ) {
-            boolean has_dictionary = schema.hasDictionary( entity.getType(), dictionaryName );
-            boolean dictionary_indexed =
-                    schema.isDictionaryIndexedInCollection( ownerType, collectionName, dictionaryName );
-
-            if ( dictionary_indexed || !has_dictionary ) {
-                Set<Object> elementValues = em.getDictionaryAsSet( entity, dictionaryName );
-                for ( Object elementValue : elementValues ) {
-                    IndexUpdate indexUpdate =
-                            batchStartIndexUpdate( batch, entity, dictionaryName, elementValue, timestampUuid,
-                                    has_dictionary, true, false, false, true );
-                    for ( UUID ownerId : ownerIds ) {
-                        EntityRef owner = new SimpleEntityRef( ownerType, ownerId );
-                        batchUpdateCollectionIndex( indexUpdate, owner, collectionName );
-                    }
-                }
-            }
-        }
-
-        for ( UUID ownerId : ownerIds ) {
-            EntityRef owner = new SimpleEntityRef( ownerType, ownerId );
-            batchCreateCollectionMembership( batch, owner, collectionName, entity, membershipRefs.get( ownerId ),
-                    timestampUuid );
-        }
-
-        return batch;
-    }
-
-
-    /**
-     * Batch remove from collection.
-     * <p/>
-     * * Batch add to collection.
-     *
-     * @param batch the batch
-     * @param collectionName the collection name
-     * @param entity The entity to add to the batch
-     * @param timestampUuid The timestamp of this update in a time uuid
-     *
-     * @return The mutation with the delete operations added
-     *
-     * @throws Exception the exception
-     */
-    public Mutator<ByteBuffer> batchRemoveFromCollection( Mutator<ByteBuffer> batch, String collectionName,
-                                                          Entity entity, UUID timestampUuid ) throws Exception {
-        return this.batchRemoveFromCollection( batch, collectionName, entity, false, timestampUuid );
-    }
-
-
-    @SuppressWarnings("unchecked")
-    @Metered(group = "core", name = "RelationManager_batchRemoveFromCollection")
-    public Mutator<ByteBuffer> batchRemoveFromCollection( Mutator<ByteBuffer> batch, String collectionName,
-                                                          Entity entity, boolean force, UUID timestampUuid )
-            throws Exception {
-
-        long timestamp = getTimestampInMicros( timestampUuid );
-
-        if ( !force && headEntity.getUuid().equals( applicationId ) ) {
-            // Can't remove entities from root collections
-            return batch;
-        }
-
-        Object collections_key = key( headEntity.getUuid(), Schema.DICTIONARY_COLLECTIONS, collectionName,
-                indexBucketLocator.getBucket( applicationId, IndexType.COLLECTION, entity.getUuid(), collectionName ) );
-
-        // Remove property indexes
-
-        Schema schema = getDefaultSchema();
-        for ( String propertyName : entity.getProperties().keySet() ) {
-            boolean collection_indexes_property =
-                    schema.isPropertyIndexedInCollection( headEntity.getType(), collectionName, propertyName );
-            boolean item_schema_has_property = schema.hasProperty( entity.getType(), propertyName );
-            boolean fulltext_indexed = schema.isPropertyFulltextIndexed( entity.getType(), propertyName );
-            if ( collection_indexes_property || !item_schema_has_property ) {
-                IndexUpdate indexUpdate = batchStartIndexUpdate( batch, entity, propertyName, null, timestampUuid,
-                        item_schema_has_property, false, false, fulltext_indexed );
-                batchUpdateCollectionIndex( indexUpdate, headEntity, collectionName );
-            }
-        }
-
-        // Remove set indexes
-
-        Set<String> dictionaryNames = em.getDictionaryNames( entity );
-
-        for ( String dictionaryName : dictionaryNames ) {
-            boolean has_dictionary = schema.hasDictionary( entity.getType(), dictionaryName );
-            boolean dictionary_indexed =
-                    schema.isDictionaryIndexedInCollection( headEntity.getType(), collectionName, dictionaryName );
-
-            if ( dictionary_indexed || !has_dictionary ) {
-                Set<Object> elementValues = em.getDictionaryAsSet( entity, dictionaryName );
-                for ( Object elementValue : elementValues ) {
-                    IndexUpdate indexUpdate =
-                            batchStartIndexUpdate( batch, entity, dictionaryName, elementValue, timestampUuid,
-                                    has_dictionary, true, true, false );
-                    batchUpdateCollectionIndex( indexUpdate, headEntity, collectionName );
-                }
-            }
-        }
-
-        // Delete actual property
-
-        addDeleteToMutator( batch, ENTITY_ID_SETS, collections_key, entity.getUuid(), timestamp );
-
-        addDeleteToMutator( batch, ENTITY_COMPOSITE_DICTIONARIES,
-                key( entity.getUuid(), Schema.DICTIONARY_CONTAINER_ENTITIES ),
-                asList( headEntity.getType(), collectionName, headEntity.getUuid() ), timestamp );
-
-        if ( !headEntity.getType().equalsIgnoreCase( TYPE_APPLICATION ) && !Schema
-                .isAssociatedEntityType( entity.getType() ) ) {
-
-            CollectionRef cref = new SimpleCollectionRef( headEntity, collectionName, entity ); 
-            em.delete( new SimpleEntityRef( cref.getType(), cref.getUuid() ) );
-        }
-
-        return batch;
-    }
-
-
-    @Metered(group = "core", name = "RelationManager_batchDeleteConnectionIndexEntries")
-    public Mutator<ByteBuffer> batchDeleteConnectionIndexEntries( IndexUpdate indexUpdate, IndexEntry entry,
-                                                                  ConnectionRefImpl connection, UUID[] index_keys )
-            throws Exception {
-
-        // entity_id,prop_name
-        Object property_index_key = key( index_keys[ConnectionRefImpl.ALL], INDEX_CONNECTIONS, entry.getPath(),
-                indexBucketLocator.getBucket( applicationId, IndexType.CONNECTION, index_keys[ConnectionRefImpl.ALL],
-                        entry.getPath() ) );
-
-        // entity_id,entity_type,prop_name
-        Object entity_type_prop_index_key =
-                key( index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(),
-                        indexBucketLocator.getBucket( applicationId, IndexType.CONNECTION,
-                                index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], entry.getPath() ) );
-
-        // entity_id,connection_type,prop_name
-        Object connection_type_prop_index_key =
-                key( index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], INDEX_CONNECTIONS, entry.getPath(),
-                        indexBucketLocator.getBucket( applicationId, IndexType.CONNECTION,
-                                index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], entry.getPath() ) );
-
-        // entity_id,connection_type,entity_type,prop_name
-        Object connection_type_and_entity_type_prop_index_key =
-                key( index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(),
-                        indexBucketLocator.getBucket( applicationId, IndexType.CONNECTION,
-                                index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], entry.getPath() ) );
-
-        // composite(property_value,connected_entity_id,connection_type,entity_type,entry_timestamp)
-        addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, property_index_key,
-                entry.getIndexComposite( connection.getConnectedEntityId(), connection.getConnectionType(),
-                        connection.getConnectedEntityType() ), indexUpdate.getTimestamp() );
-
-        // composite(property_value,connected_entity_id,connection_type,entry_timestamp)
-        addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, entity_type_prop_index_key,
-                entry.getIndexComposite( connection.getConnectedEntityId(), connection.getConnectionType() ),
-                indexUpdate.getTimestamp() );
-
-        // composite(property_value,connected_entity_id,entity_type,entry_timestamp)
-        addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, connection_type_prop_index_key,
-                entry.getIndexComposite( connection.getConnectedEntityId(), connection.getConnectedEntityType() ),
-                indexUpdate.getTimestamp() );
-
-        // composite(property_value,connected_entity_id,entry_timestamp)
-        addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, connection_type_and_entity_type_prop_index_key,
-                entry.getIndexComposite( connection.getConnectedEntityId() ), indexUpdate.getTimestamp() );
-
-        return indexUpdate.getBatch();
-    }
-
-
-    @Metered(group = "core", name = "RelationManager_batchAddConnectionIndexEntries")
-    public Mutator<ByteBuffer> batchAddConnectionIndexEntries( IndexUpdate indexUpdate, IndexEntry entry,
-                                                               ConnectionRefImpl connection, UUID[] index_keys ) {
-
-        // entity_id,prop_name
-        Object property_index_key = key( index_keys[ConnectionRefImpl.ALL], INDEX_CONNECTIONS, entry.getPath(),
-                indexBucketLocator.getBucket( applicationId, IndexType.CONNECTION, index_keys[ConnectionRefImpl.ALL],
-                        entry.getPath() ) );
-
-        // entity_id,entity_type,prop_name
-        Object entity_type_prop_index_key =
-                key( index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(),
-                        indexBucketLocator.getBucket( applicationId, IndexType.CONNECTION,
-                                index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], entry.getPath() ) );
-
-        // entity_id,connection_type,prop_name
-        Object connection_type_prop_index_key =
-                key( index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], INDEX_CONNECTIONS, entry.getPath(),
-                        indexBucketLocator.getBucket( applicationId, IndexType.CONNECTION,
-                                index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], entry.getPath() ) );
-
-        // entity_id,connection_type,entity_type,prop_name
-        Object connection_type_and_entity_type_prop_index_key =
-                key( index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(),
-                        indexBucketLocator.getBucket( applicationId, IndexType.CONNECTION,
-                                index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], entry.getPath() ) );
-
-        // composite(property_value,connected_entity_id,connection_type,entity_type,entry_timestamp)
-        addInsertToMutator( indexUpdate.getBatch(), ENTITY_INDEX, property_index_key,
-                entry.getIndexComposite( connection.getConnectedEntityId(), connection.getConnectionType(),
-                        connection.getConnectedEntityType() ), connection.getUuid(), indexUpdate.getTimestamp() );
-
-        // composite(property_value,connected_entity_id,connection_type,entry_timestamp)
-        addInsertToMutator( indexUpdate.getBatch(), ENTITY_INDEX, entity_type_prop_index_key,
-                entry.getIndexComposite( connection.getConnectedEntityId(), connection.getConnectionType() ),
-                connection.getUuid(), indexUpdate.getTimestamp() );
-
-        // composite(property_value,connected_entity_id,entity_type,entry_timestamp)
-        addInsertToMutator( indexUpdate.getBatch(), ENTITY_INDEX, connection_type_prop_index_key,
-                entry.getIndexComposite( connection.getConnectedEntityId(), connection.getConnectedEntityType() ),
-                connection.getUuid(), indexUpdate.getTimestamp() );
-
-        // composite(property_value,connected_entity_id,entry_timestamp)
-        addInsertToMutator( indexUpdate.getBatch(), ENTITY_INDEX, connection_type_and_entity_type_prop_index_key,
-                entry.getIndexComposite( connection.getConnectedEntityId() ), connection.getUuid(),
-                indexUpdate.getTimestamp() );
-
-        return indexUpdate.getBatch();
-    }
-
-
-    /**
-     * Batch update connection index.
-     *
-     * @param indexUpdate The update operation to perform
-     * @param connection The connection to update
-     *
-     * @return The index with the batch mutation udpated
-     *
-     * @throws Exception the exception
-     */
-    @Metered(group = "core", name = "RelationManager_batchUpdateConnectionIndex")
-    public IndexUpdate batchUpdateConnectionIndex( IndexUpdate indexUpdate, ConnectionRefImpl connection )
-            throws Exception {
-
-        // UUID connection_id = connection.getUuid();
-
-        UUID[] index_keys = connection.getIndexIds();
-
-        // Delete all matching entries from entry list
-        for ( IndexEntry entry : indexUpdate.getPrevEntries() ) {
-
-            if ( entry.getValue() != null ) {
-
-                batchDeleteConnectionIndexEntries( indexUpdate, entry, connection, index_keys );
-
-                if ( "location.coordinates".equals( entry.getPath() ) ) {
-                    EntityLocationRef loc = new EntityLocationRef( indexUpdate.getEntity(), entry.getTimestampUuid(),
-                            entry.getValue().toString() );
-                    batchDeleteLocationInConnectionsIndex( indexUpdate.getBatch(), indexBucketLocator, applicationId,
-                            index_keys, entry.getPath(), loc );
-                }
-            }
-            else {
-                logger.error( "Unexpected condition - deserialized property value is null" );
-            }
-        }
-
-        if ( ( indexUpdate.getNewEntries().size() > 0 ) && ( !indexUpdate.isMultiValue() || ( indexUpdate.isMultiValue()
-                && !indexUpdate.isRemoveListEntry() ) ) ) {
-
-            for ( IndexEntry indexEntry : indexUpdate.getNewEntries() ) {
-
-                batchAddConnectionIndexEntries( indexUpdate, indexEntry, connection, index_keys );
-
-                if ( "location.coordinates".equals( indexEntry.getPath() ) ) {
-                    EntityLocationRef loc =
-                            new EntityLocationRef( indexUpdate.getEntity(), indexEntry.getTimestampUuid(),
-                                    indexEntry.getValue().toString() );
-                    batchStoreLocationInConnectionsIndex( indexUpdate.getBatch(), indexBucketLocator, applicationId,
-                            index_keys, indexEntry.getPath(), loc );
-                }
-            }
-
-      /*
-       * addInsertToMutator(batch, EntityCF.SETS, key(connection_id,
-       * Schema.INDEXES_SET), indexEntry.getKey(), null, false, timestamp); }
-       * 
-       * addInsertToMutator(batch, EntityCF.SETS, key(connection_id,
-       * Schema.INDEXES_SET), entryName, null, false, timestamp);
-       */
-        }
-
-        for ( String index : indexUpdate.getIndexesSet() ) {
-            addInsertToMutator( indexUpdate.getBatch(), ENTITY_DICTIONARIES,
-                    key( connection.getConnectingIndexId(), Schema.DICTIONARY_INDEXES ), index, null,
-                    indexUpdate.getTimestamp() );
-        }
-
-        return indexUpdate;
-    }
-
-
-    public Set<String> getConnectionIndexes( ConnectionRefImpl connection ) throws Exception {
-        List<HColumn<String, String>> results =
-                cass.getAllColumns( cass.getApplicationKeyspace( applicationId ), ENTITY_DICTIONARIES,
-                        key( connection.getConnectingIndexId(), Schema.DICTIONARY_INDEXES ), Serializers.se, Serializers.se );
-        Set<String> indexes = new TreeSet<String>();
-        if ( results != null ) {
-            for ( HColumn<String, String> column : results ) {
-                String propertyName = column.getName();
-                if ( !propertyName.endsWith( ".keywords" ) ) {
-                    indexes.add( column.getName() );
-                }
-            }
-        }
-        return indexes;
-    }
-
-
-    /**
-     * Batch update backward connections property indexes.
-     *
-     * @param indexUpdate The update to run for incoming connections
-     *
-     * @return The index update to run
-     *
-     * @throws Exception the exception
-     */
-    @Metered(group = "core", name = "RelationManager_batchUpdateBackwardConnectionsPropertyIndexes")
-    public IndexUpdate batchUpdateBackwardConnectionsPropertyIndexes( IndexUpdate indexUpdate ) throws Exception {
-
-        logger.debug( "batchUpdateBackwordConnectionsPropertyIndexes" );
-
-        boolean entitySchemaHasProperty = indexUpdate.isSchemaHasProperty();
-
-        if ( entitySchemaHasProperty ) {
-            if ( !getDefaultSchema()
-                    .isPropertyIndexed( indexUpdate.getEntity().getType(), indexUpdate.getEntryName() ) ) {
-                return indexUpdate;
-            }
-        }
-
-
-        return doBackwardConnectionsUpdate( indexUpdate );
-    }
-
-
-    /**
-     * Search each reverse connection type in the graph for connections.  If one is found, update the index
-     * appropriately
-     *
-     * @param indexUpdate The index update to use
-     *
-     * @return The updated index update
-     */
-    private IndexUpdate doBackwardConnectionsUpdate( IndexUpdate indexUpdate ) throws Exception {
-        final Entity targetEntity = indexUpdate.getEntity();
-
-        final ConnectionTypesIterator connectionTypes =
-                new ConnectionTypesIterator( cass, applicationId, targetEntity.getUuid(), false, 100 );
-
-        for ( String connectionType : connectionTypes ) {
-
-            PagingResultsIterator itr = getReversedConnectionsIterator( targetEntity, connectionType );
-
-            for ( Object connection : itr ) {
-
-                final ConnectedEntityRef sourceEntity = ( ConnectedEntityRef ) connection;
-
-                //we need to create a connection ref from the source entity (found via reverse edge) to the entity
-                // we're about to update.  This is the index that needs updated
-                final ConnectionRefImpl connectionRef =
-                        new ConnectionRefImpl( sourceEntity, connectionType, indexUpdate.getEntity() );
-
-                batchUpdateConnectionIndex( indexUpdate, connectionRef );
-            }
-        }
-
-        return indexUpdate;
-    }
-
-
-    /**
-     * Get a paging results iterator.  Should return an iterator for all results
-     *
-     * @param targetEntity The target entity search connections from
-     *
-     * @return connectionType The name of the edges to search
-     */
-    private PagingResultsIterator getReversedConnectionsIterator( EntityRef targetEntity, String connectionType )
-            throws Exception {
-        return new PagingResultsIterator( getConnectingEntities( targetEntity, connectionType, null, Level.REFS ) );
-    }
-
-
-    /**
-     * Batch update backward connections set indexes.
-     *
-     * @param indexUpdate The index to update in the dictionary
-     *
-     * @return The index update
-     *
-     * @throws Exception the exception
-     */
-    @Metered(group = "core", name = "RelationManager_batchUpdateBackwardConnectionsDictionaryIndexes")
-    public IndexUpdate batchUpdateBackwardConnectionsDictionaryIndexes( IndexUpdate indexUpdate ) throws Exception {
-
-        logger.debug( "batchUpdateBackwardConnectionsListIndexes" );
-
-        boolean entityHasDictionary = getDefaultSchema()
-                .isDictionaryIndexedInConnections( indexUpdate.getEntity().getType(), indexUpdate.getEntryName() );
-
-        if ( !entityHasDictionary ) {
-            return indexUpdate;
-        }
-
-
-        return doBackwardConnectionsUpdate( indexUpdate );
-    }
-
-
-    @SuppressWarnings("unchecked")
-    @Metered(group = "core", name = "RelationManager_batchUpdateEntityConnection")
-    public Mutator<ByteBuffer> batchUpdateEntityConnection( Mutator<ByteBuffer> batch, 
-        boolean disconnect, ConnectionRefImpl connection, UUID timestampUuid ) throws Exception {
-
-        long timestamp = getTimestampInMicros( timestampUuid );
-
-        Entity connectedEntity = em.get( new SimpleEntityRef( 
-                connection.getConnectedEntityType(), connection.getConnectedEntityId()) );
-
-        if ( connectedEntity == null ) {
-            return batch;
-        }
-
-        // Create connection for requested params
-
-
-        if ( disconnect ) {
-            addDeleteToMutator( batch, ENTITY_COMPOSITE_DICTIONARIES,
-                    key( connection.getConnectingEntityId(), DICTIONARY_CONNECTED_ENTITIES,
-                            connection.getConnectionType() ),
-                    asList( connection.getConnectedEntityId(), connection.getConnectedEntityType() ), timestamp );
-
-            addDeleteToMutator( batch, ENTITY_COMPOSITE_DICTIONARIES,
-                    key( connection.getConnectedEntityId(), DICTIONARY_CONNECTING_ENTITIES,
-                            connection.getConnectionType() ),
-                    asList( connection.getConnectingEntityId(), connection.getConnectingEntityType() ), timestamp );
-
-            // delete the connection path if there will be no connections left
-
-            boolean delete = true;
-
-            //check out outbound edges of the given type.  If we have more than the 1 specified,
-            // we shouldn't delete the connection types from our outbound index
-            PagingResultsIterator itr = new PagingResultsIterator(
-                    getConnectedEntities( connection.getConnectingEntity(), connection.getConnectionType(), null,
-                            Level.REFS ) );
-
-            ConnectedEntityRef c;
-
-            while ( itr.hasNext() ) {
-                c = ( ConnectedEntityRef ) itr.next();
-
-                if ( !connection.getConnectedEntityId().equals( c.getUuid() ) ) {
-                    delete = false;
-                    break;
-                }
-
-
-                //        c = (ConnectionRef) itr.next();
-                //        if (c.getConnectedEntity().getConnectionType().equals(connection.getConnectedEntity()
-                // .getConnectionType()) &&!c.getConnectedEntity().getUuid().equals(connection.getConnectedEntity()
-                // .getUuid())) {
-                //            delete = false;
-                //            break;
-                //        }
-
-            }
-            //      for (ConnectionRefImpl c : getConnectionsWithEntity(connection.getConnectingEntityId())) {
-            //        if (c.getConnectedEntity().getConnectionType().equals(connection.getConnectedEntity()
-            // .getConnectionType())) {
-            //          if (!c.getConnectedEntity().getUuid().equals(connection.getConnectedEntity().getUuid())) {
-            //            delete = false;
-            //            break;
-            //          }
-            //        }
-            //      }
-            if ( delete ) {
-                addDeleteToMutator( batch, ENTITY_DICTIONARIES,
-                        key( connection.getConnectingEntityId(), DICTIONARY_CONNECTED_TYPES ),
-                        connection.getConnectionType(), timestamp );
-            }
-
-            // delete the connection path if there will be no connections left
-            delete = true;
-
-
-            //check out inbound edges of the given type.  If we have more than the 1 specified,
-            // we shouldn't delete the connection types from our outbound index
-            itr = new PagingResultsIterator(
-                    getConnectingEntities( connection.getConnectingEntity(), connection.getConnectionType(), null,
-                            Level.REFS ) );
-
-            while ( itr.hasNext() ) {
-                c = ( ConnectedEntityRef ) itr.next();
-
-                if ( !connection.getConnectedEntityId().equals( c.getUuid() ) ) {
-                    delete = false;
-                    break;
-                }
-                //        if (c.getConnectedEntity().getConnectionType().equals(connection.getConnectedEntity()
-                // .getConnectionType()) && !c.getConnectingEntity().getUuid().equals(connection.getConnectingEntity
-                // ().getUuid())) {
-                //            delete = false;
-                //            break;
-                //        }
-
-            }
-
-            //      for (ConnectionRefImpl c : getConnectionsWithEntity(connection.getConnectedEntityId())) {
-            //        if (c.getConnectedEntity().getConnectionType().equals(connection.getConnectedEntity()
-            // .getConnectionType())) {
-            //          if (!c.getConnectingEntity().getUuid().equals(connection.getConnectingEntity().getUuid())) {
-            //            delete = false;
-            //            break;
-            //          }
-            //        }
-            //      }
-            if ( delete ) {
-                addDeleteToMutator( batch, ENTITY_DICTIONARIES,
-                        key( connection.getConnectedEntityId(), DICTIONARY_CONNECTING_TYPES ),
-                        connection.getConnectionType(), timestamp );
-            }
-        }
-        else {
-            addInsertToMutator( batch, ENTITY_COMPOSITE_DICTIONARIES,
-                    key( connection.getConnectingEntityId(), DICTIONARY_CONNECTED_ENTITIES,
-                            connection.getConnectionType() ),
-                    asList( connection.getConnectedEntityId(), connection.getConnectedEntityType() ), timestamp,
-                    timestamp );
-
-            addInsertToMutator( batch, ENTITY_COMPOSITE_DICTIONARIES,
-                    key( connection.getConnectedEntityId(), DICTIONARY_CONNECTING_ENTITIES,
-                            connection.getConnectionType() ),
-                    asList( connection.getConnectingEntityId(), connection.getConnectingEntityType() ), timestamp,
-                    timestamp );
-
-            // Add connection type to connections set
-            addInsertToMutator( batch, ENTITY_DICTIONARIES,
-                    key( connection.getConnectingEntityId(), DICTIONARY_CONNECTED_TYPES ),
-                    connection.getConnectionType(), null, timestamp );
-
-            // Add connection type to connections set
-            addInsertToMutator( batch, ENTITY_DICTIONARIES,
-                    key( connection.getConnectedEntityId(), DICTIONARY_CONNECTING_TYPES ),
-                    connection.getConnectionType(), null, timestamp );
-        }
-
-        // Add property indexes
-
-        // Iterate though all the properties of the connected entity
-
-        Schema schema = getDefaultSchema();
-        for ( String propertyName : connectedEntity.getProperties().keySet() ) {
-            Object propertyValue = connectedEntity.getProperties().get( propertyName );
-
-            boolean indexed = schema.isPropertyIndexed( connectedEntity.getType(), propertyName );
-
-            boolean connection_indexes_property = schema.isPropertyIndexed( connectedEntity.getType(), propertyName );
-            boolean item_schema_has_property = schema.hasProperty( connectedEntity.getType(), propertyName );
-            boolean fulltext_indexed = schema.isPropertyFulltextIndexed( connectedEntity.getType(), propertyName );
-            // For each property, if the schema says it's indexed, update its
-            // index
-
-            if ( indexed && ( connection_indexes_property || !item_schema_has_property ) ) {
-                IndexUpdate indexUpdate =
-                        batchStartIndexUpdate( batch, connectedEntity, propertyName, disconnect ? null : propertyValue,
-                                timestampUuid, item_schema_has_property, false, false, fulltext_indexed );
-                batchUpdateConnectionIndex( indexUpdate, connection );
-            }
-        }
-
-        // Add indexes for the connected entity's list properties
-
-        // Get the names of the list properties in the connected entity
-        Set<String> dictionaryNames = em.getDictionaryNames( connectedEntity );
-
-        // For each list property, get the values in the list and
-        // update the index with those values
-
-        for ( String dictionaryName : dictionaryNames ) {
-            boolean has_dictionary = schema.hasDictionary( connectedEntity.getType(), dictionaryName );
-            boolean dictionary_indexed =
-                    schema.isDictionaryIndexedInConnections( connectedEntity.getType(), dictionaryName );
-
-            if ( dictionary_indexed || !has_dictionary ) {
-                Set<Object> elementValues = em.getDictionaryAsSet( connectedEntity, dictionaryName );
-                for ( Object elementValue : elementValues ) {
-                    IndexUpdate indexUpdate =
-                            batchStartIndexUpdate( batch, connectedEntity, dictionaryName, elementValue, timestampUuid,
-                                    has_dictionary, true, disconnect, false );
-                    batchUpdateConnectionIndex( indexUpdate, connection );
-                }
-            }
-        }
-
-        return batch;
-    }
-
-
-    public void updateEntityConnection( boolean disconnect, ConnectionRefImpl connection ) throws Exception {
-
-        UUID timestampUuid = newTimeUUID();
-        Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ), Serializers.be );
-
-        // Make or break the connection
-
-        batchUpdateEntityConnection( batch, disconnect, connection, timestampUuid );
-
-        // Make or break a connection from the connecting entity
-        // to the connection itself
-
-        ConnectionRefImpl loopback = connection.getConnectionToConnectionEntity();
-        if ( !disconnect ) {
-            em.insertEntity( new SimpleEntityRef( 
-                    CONNECTION_ENTITY_CONNECTION_TYPE, loopback.getConnectedEntityId() ) );
-        }
-
-        batchUpdateEntityConnection( batch, disconnect, loopback, timestampUuid );
-
-        batchExecute( batch, CassandraService.RETRY_COUNT );
-    }
-
-
-    @Metered(group = "core", name = "RelationManager_batchDisconnect")
-    public void batchDisconnect( Mutator<ByteBuffer> batch, UUID timestampUuid ) throws Exception {
-
-
-        PagingResultsIterator itr =
-                new PagingResultsIterator( getConnectingEntities( headEntity, null, null, Level.REFS ) );
-
-        ConnectionRefImpl connection = null;
-
-        while ( itr.hasNext() ) {
-            Object itrObj = itr.next();
-            if ( itrObj instanceof ConnectionRefImpl ) {
-                connection = (ConnectionRefImpl) itrObj;
-            }
-            else if ( itrObj instanceof SimpleEntityRef ) {
-                connection = new ConnectionRefImpl( (SimpleEntityRef) itrObj );
-            }
-            else if ( itrObj instanceof EntityRef ) {
-                    connection = new ConnectionRefImpl( new SimpleEntityRef((EntityRef) itr.next()));
-            }
-            else if ( itrObj instanceof UUID ) {
-                    connection = new ConnectionRefImpl( new SimpleEntityRef((UUID)itr.next()));
-            }
-
-            batchUpdateEntityConnection( batch, true, connection, timestampUuid );
-        }
-    }
-
-
-    public IndexUpdate batchStartIndexUpdate( Mutator<ByteBuffer> batch, Entity entity, String entryName,
-                                              Object entryValue, UUID timestampUuid, boolean schemaHasProperty,
-                                              boolean isMultiValue, boolean removeListEntry, boolean fulltextIndexed )
-            throws Exception {
-        return batchStartIndexUpdate( batch, entity, entryName, entryValue, timestampUuid, schemaHasProperty,
-                isMultiValue, removeListEntry, fulltextIndexed, false );
-    }
-
-
-    @Metered(group = "core", name = "RelationManager_batchStartIndexUpdate")
-    public IndexUpdate batchStartIndexUpdate( Mutator<ByteBuffer> batch, Entity entity, String entryName,
-                                              Object entryValue, UUID timestampUuid, boolean schemaHasProperty,
-                                              boolean isMultiValue, boolean removeListEntry, boolean fulltextIndexed,
-                                              boolean skipRead ) throws Exception {
-
-        long timestamp = getTimestampInMicros( timestampUuid );
-
-        IndexUpdate indexUpdate =
-                new IndexUpdate( batch, entity, entryName, entryValue, schemaHasProperty, isMultiValue, removeListEntry,
-                        timestampUuid );
-
-        // entryName = entryName.toLowerCase();
-
-        // entity_id,connection_type,connected_entity_id,prop_name
-
-        if ( !skipRead ) {
-
-            List<HColumn<ByteBuffer, ByteBuffer>> entries = null;
-
-            if ( isMultiValue && validIndexableValue( entryValue ) ) {
-                entries = cass.getColumns( cass.getApplicationKeyspace( applicationId ), ENTITY_INDEX_ENTRIES,
-                        entity.getUuid(),
-                        new DynamicComposite( entryName, indexValueCode( entryValue ), toIndexableValue( entryValue ) ),
-                        setGreaterThanEqualityFlag( new DynamicComposite( entryName, indexValueCode( entryValue ),
-                                toIndexableValue( entryValue ) ) ), INDEX_ENTRY_LIST_COUNT, false );
-            }
-            else {
-                entries = cass.getColumns( cass.getApplicationKeyspace( applicationId ), ENTITY_INDEX_ENTRIES,
-                        entity.getUuid(), new DynamicComposite( entryName ),
-                        setGreaterThanEqualityFlag( new DynamicComposite( entryName ) ), INDEX_ENTRY_LIST_COUNT,
-                        false );
-            }
-
-            if ( logger.isDebugEnabled() ) {
-                logger.debug( "Found {} previous index entries for {} of entity {}", new Object[] {
-                        entries.size(), entryName, entity.getUuid()
-                } );
-            }
-
-            // Delete all matching entries from entry list
-            for ( HColumn<ByteBuffer, ByteBuffer> entry : entries ) {
-                UUID prev_timestamp = null;
-                Object prev_value = null;
-                String prev_obj_path = null;
-
-                // new format:
-                // composite(entryName,
-                // value_code,prev_value,prev_timestamp,prev_obj_path) = null
-                DynamicComposite composite = DynamicComposite.fromByteBuffer( entry.getName().duplicate() );
-                prev_value = composite.get( 2 );
-                prev_timestamp = ( UUID ) composite.get( 3 );
-                if ( composite.size() > 4 ) {
-                    prev_obj_path = ( String ) composite.get( 4 );
-                }
-
-                if ( prev_value != null ) {
-
-                    String entryPath = entryName;
-                    if ( ( prev_obj_path != null ) && ( prev_obj_path.length() > 0 ) ) {
-                        entryPath = entryName + "." + prev_obj_path;
-                    }
-
-                    indexUpdate.addPrevEntry( entryPath, prev_value, prev_timestamp, entry.getName().duplicate() );
-
-                    // composite(property_value,connected_entity_id,entry_timestamp)
-                    // addDeleteToMutator(batch, ENTITY_INDEX_ENTRIES,
-                    // entity.getUuid(), entry.getName(), timestamp);
-
-                }
-                else {
-                    logger.error( "Unexpected condition - deserialized property value is null" );
-                }
-            }
-        }
-
-        if ( !isMultiValue || ( isMultiValue && !removeListEntry ) ) {
-
-            List<Map.Entry<String, Object>> list = IndexUtils.getKeyValueList( entryName, entryValue, fulltextIndexed );
-
-            if ( entryName.equalsIgnoreCase( "location" ) && ( entryValue instanceof Map ) ) {
-                @SuppressWarnings("rawtypes") double latitude =
-                        MapUtils.getDoubleValue( ( Map ) entryValue, "latitude" );
-                @SuppressWarnings("rawtypes") double longitude =
-                        MapUtils.getDoubleValue( ( Map ) entryValue, "longitude" );
-                list.add( new AbstractMap.SimpleEntry<String, Object>( "location.coordinates",
-                        latitude + "," + longitude ) );
-            }
-
-            for ( Map.Entry<String, Object> indexEntry : list ) {
-
-                if ( validIndexableValue( indexEntry.getValue() ) ) {
-                    indexUpdate.addNewEntry( indexEntry.getKey(), toIndexableValue( indexEntry.getValue() ) );
-                }
-            }
-
-            if ( isMultiValue ) {
-                addInsertToMutator( batch, ENTITY_INDEX_ENTRIES, entity.getUuid(),
-                        asList( entryName, indexValueCode( entryValue ), toIndexableValue( entryValue ),
-                                indexUpdate.getTimestampUuid() ), null, timestamp );
-            }
-            else {
-                // int i = 0;
-
-                for ( Map.Entry<String, Object> indexEntry : list ) {
-
-                    String name = indexEntry.getKey();
-                    if ( name.startsWith( entryName + "." ) ) {
-                        name = name.substring( entryName.length() + 1 );
-                    }
-                    else if ( name.startsWith( entryName ) ) {
-                        name = name.substring( entryName.length() );
-                    }
-
-                    byte code = indexValueCode( indexEntry.getValue() );
-                    Object val = toIndexableValue( indexEntry.getValue() );
-                    addInsertToMutator( batch, ENTITY_INDEX_ENTRIES, entity.getUuid(),
-                            asList( entryName, code, val, indexUpdate.getTimestampUuid(), name ), null, timestamp );
-
-                    indexUpdate.addIndex( indexEntry.getKey() );
-                }
-            }
-
-            indexUpdate.addIndex( entryName );
-        }
-
-        return indexUpdate;
-    }
-
-
-    @Metered(group = "core", name = "RelationManager_batchUpdatePropertyIndexes")
-    public void batchUpdatePropertyIndexes( Mutator<ByteBuffer> batch, String propertyName, Object propertyValue,
-                                            boolean entitySchemaHasProperty, boolean noRead, UUID timestampUuid )
-            throws Exception {
-
-        Entity entity = getHeadEntity();
-
-        UUID associatedId = null;
-        String associatedType = null;
-
-        if ( Schema.isAssociatedEntityType( entity.getType() ) ) {
-            Object item = entity.getProperty( PROPERTY_ITEM );
-            if ( ( item instanceof UUID ) && ( entity.getProperty( PROPERTY_COLLECTION_NAME ) instanceof String ) ) {
-                associatedId = ( UUID ) item;
-                associatedType = string( entity.getProperty( PROPERTY_ITEM_TYPE ) );
-                String entryName = TYPE_MEMBER + "." + propertyName;
-                if ( logger.isDebugEnabled() ) {
-                    logger.debug( "Extended property {} ( {} ).{} indexed as {} ({})." + entryName, new Object[] {
-                            entity.getType(), entity.getUuid(), propertyName, associatedType, associatedId
-                    } );
-                }
-                propertyName = entryName;
-            }
-        }
-
-        IndexUpdate indexUpdate = batchStartIndexUpdate( batch, entity, propertyName, propertyValue, timestampUuid,
-                entitySchemaHasProperty, false, false,
-                getDefaultSchema().isPropertyFulltextIndexed( entity.getType(), propertyName ), noRead );
-
-        // Update collections
-
-        String effectiveType = entity.getType();
-        if ( associatedType != null ) {
-            indexUpdate.setAssociatedId( associatedId );
-            effectiveType = associatedType;
-        }
-
-        Map<String, Set<CollectionInfo>> containers = getDefaultSchema().getContainers( effectiveType );
-        if ( containers != null ) {
-
-            Map<EntityRef, Set<String>> containerEntities = null;
-            if ( noRead ) {
-                containerEntities = new LinkedHashMap<EntityRef, Set<String>>();
-                EntityRef applicationRef = new SimpleEntityRef( TYPE_APPLICATION, applicationId );
-                addMapSet( containerEntities, applicationRef, defaultCollectionName( entity.getType() ) );
-            }
-            else {
-                containerEntities = getContainingCollections();
-            }
-
-            for ( EntityRef containerEntity : containerEntities.keySet() ) {
-                if ( containerEntity.getType().equals( TYPE_APPLICATION ) && Schema
-                        .isAssociatedEntityType( entity.getType() ) ) {
-                    logger.debug( "Extended properties for {} not indexed by application", entity.getType() );
-                    continue;
-                }
-                Set<String> collectionNames = containerEntities.get( containerEntity );
-                Set<CollectionInfo> collections = containers.get( containerEntity.getType() );
-
-                if ( collections != null ) {
-                    for ( CollectionInfo collection : collections ) {
-                        if ( collectionNames.contains( collection.getName() ) ) {
-                            batchUpdateCollectionIndex( indexUpdate, containerEntity, collection.getName() );
-                        }
-                    }
-                }
-            }
-        }
-
-        if ( !noRead ) {
-            batchUpdateBackwardConnectionsPropertyIndexes( indexUpdate );
-        }
-
-        /**
-         * We've updated the properties, add the deletes to the ledger
-         *
-         */
-
-        for ( IndexEntry entry : indexUpdate.getPrevEntries() ) {
-            addDeleteToMutator( batch, ENTITY_INDEX_ENTRIES, entity.getUuid(), entry.getLedgerColumn(),
-                    indexUpdate.getTimestamp() );
-        }
-    }
-
-
-    public void batchUpdateSetIndexes( Mutator<ByteBuffer> batch, String setName, Object elementValue,
-                                       boolean removeFromSet, UUID timestampUuid ) throws Exception {
-
-        Entity entity = getHeadEntity();
-
-        elementValue = getDefaultSchema().validateEntitySetValue( entity.getType(), setName, elementValue );
-
-        IndexUpdate indexUpdate =
-                batchStartIndexUpdate( batch, entity, setName, elementValue, timestampUuid, true, true, removeFromSet,
-                        false );
-
-        // Update collections
-        Map<String, Set<CollectionInfo>> containers =
-                getDefaultSchema().getContainersIndexingDictionary( entity.getType(), setName );
-
-        if ( containers != null ) {
-            Map<EntityRef, Set<String>> containerEntities = getContainingCollections();
-            for ( EntityRef containerEntity : containerEntities.keySet() ) {
-                if ( containerEntity.getType().equals( TYPE_APPLICATION ) && Schema
-                        .isAssociatedEntityType( entity.getType() ) ) {
-                    logger.debug( "Extended properties for {} not indexed by application", entity.getType() );
-                    continue;
-                }
-                Set<String> collectionNames = containerEntities.get( containerEntity );
-                Set<CollectionInfo> collections = containers.get( containerEntity.getType() );
-
-                if ( collections != null ) {
-
-                    for ( CollectionInfo collection : collections ) {
-                        if ( collectionNames.contains( collection.getName() ) ) {
-
-                            batchUpdateCollectionIndex( indexUpdate, containerEntity, collection.getName() );
-                        }
-                    }
-                }
-            }
-        }
-
-        batchUpdateBackwardConnectionsDictionaryIndexes( indexUpdate );
-    }
-
-
-    private IndexScanner searchIndex( Object indexKey, QuerySlice slice, int pageSize ) throws Exception {
-
-        DynamicComposite[] range = slice.getRange();
-
-        Object keyPrefix = key( indexKey, slice.getPropertyName() );
-
-        IndexScanner scanner =
-                new IndexBucketScanner( cass, indexBucketLocator, ENTITY_INDEX, applicationId, IndexType.CONNECTION,
-                        keyPrefix, range[0], range[1], slice.isReversed(), pageSize, slice.hasCursor(), slice.getPropertyName() );
-
-        return scanner;
-    }
-
-
-    /**
-     * Search the collection index using all the buckets for the given collection
-     *
-     * @param indexKey The index key to read
-     * @param slice Slice set in the query
-     * @param collectionName The name of the collection to search
-     * @param pageSize The page size to load when iterating
-     */
-    private IndexScanner searchIndexBuckets( Object indexKey, QuerySlice slice, String collectionName, int pageSize )
-            throws Exception {
-
-        DynamicComposite[] range = slice.getRange();
-
-        Object keyPrefix = key( indexKey, slice.getPropertyName() );
-
-        IndexScanner scanner =
-                new IndexBucketScanner( cass, indexBucketLocator, ENTITY_INDEX, applicationId, IndexType.COLLECTION,
-                        keyPrefix, range[0], range[1], slice.isReversed(), pageSize, slice.hasCursor(), collectionName );
-
-        return scanner;
-    }
-
-
-    @SuppressWarnings("unchecked")
-    @Override
-    @Metered(group = "core", name = "RelationManager_isOwner")
-    public boolean isCollectionMember( String collectionName, EntityRef entity ) throws Exception {
-
-        Keyspace ko = cass.getApplicationKeyspace( applicationId );
-
-        ByteBuffer col = DynamicComposite
-                .toByteBuffer( asList( this.headEntity.getType(), collectionName, headEntity.getUuid() ) );
-
-        HColumn<ByteBuffer, ByteBuffer> result = cass.getColumn( ko, ENTITY_COMPOSITE_DICTIONARIES,
-                key( entity.getUuid(), Schema.DICTIONARY_CONTAINER_ENTITIES ), col, Serializers.be, Serializers.be );
-
-        return result != null;
-    }
-
-
-    /** @param connectionName The name of hte connection */
-    public boolean isConnectionMember( String connectionName, EntityRef entity ) throws Exception {
-        Keyspace ko = cass.getApplicationKeyspace( applicationId );
-
-        Object key = key( this.headEntity.getUuid(), DICTIONARY_CONNECTED_ENTITIES, connectionName );
-
-        DynamicComposite start = new DynamicComposite( entity.getUuid() );
-
-        List<HColumn<ByteBuffer, ByteBuffer>> cols =
-                cass.getColumns( ko, ENTITY_COMPOSITE_DICTIONARIES, key, start, null, 1, false );
-
-        if ( cols == null || cols.size() == 0 ) {
-            return false;
-        }
-
-        UUID returnedUUID = ( UUID ) DynamicComposite.fromByteBuffer( cols.get( 0 ).getName() ).get( 0 );
-
-        return entity.getUuid().equals( returnedUUID );
-
-
-        //    addDeleteToMutator(batch, ENTITY_COMPOSITE_DICTIONARIES,
-        //        key(connection.getConnectedEntityId(), DICTIONARY_CONNECTING_ENTITIES,
-        // connection.getConnectionType()),
-        //        asList(connection.getConnectingEntityId(), connection.getConnectingEntityType()), timestamp);
-        //
-        //
-        //    ConnectionRefImpl ref = new ConnectionRefImpl(this.headEntity, connectionName, entity);
-        //
-        //
-        //
-        //
-        //
-        //
-        //    HColumn<String, UUID> col = cass.getColumn(ko, ENTITY_CONNECTIONS, ref.getUuid(),
-        //        ConnectionRefImpl.CONNECTED_ENTITY_ID, se, ue);
-        //
-        //
-        //    getConnectedEntities(this.headEntity, connectionName, )
-        //
-        //    return col != null && entity.getUuid().equals(col.getValue());
-    }
-
-
-    @Override
-    @Metered(group = "core", name = "RelationManager_getOwners")
-    public Map<String, Map<UUID, Set<String>>> getOwners() throws Exception {
-        Map<EntityRef, Set<String>> containerEntities = getContainingCollections();
-        Map<String, Map<UUID, Set<String>>> owners = new LinkedHashMap<String, Map<UUID, Set<String>>>();
-
-        for ( EntityRef owner : containerEntities.keySet() ) {
-            Set<String> collections = containerEntities.get( owner );
-            for ( String collection : collections ) {
-                MapUtils.addMapMapSet( owners, owner.getType(), owner.getUuid(), collection );
-            }
-        }
-
-        return owners;
-    }
-
-
-    @Override
-    @Metered(group = "core", name = "RelationManager_getCollections")
-    public Set<String> getCollections() throws Exception {
-
-        Map<String, CollectionInfo> collections = getDefaultSchema().getCollections( headEntity.getType() );
-        if ( collections == null ) {
-            return null;
-        }
-
-        return collections.keySet();
-    }
-
-
-    @Override
-    @Metered(group = "core", name = "RelationManager_getCollection_start_result")
-    public Results getCollection( String collectionName, UUID startResult, int count, Level resultsLevel,
-                                  boolean reversed ) throws Exception {
-        // changed intentionally to delegate to search so that behavior is
-        // consistent across all index access.
-
-        // TODO T.N fix cursor parsing here so startResult can be used in this
-        // context. Needs a bit of refactor
-        // for accommodating cursor I/O USERGRID-1750. A bit hacky, but until a
-        // furthur refactor this works.
-
-        Query query = new Query().withResultsLevel( resultsLevel ).withReversed( reversed ).withLimit( count )
-                                 .withStartResult( startResult );
-
-        return searchCollection( collectionName, query );
-    }
-
-
-    @Override
-    @Metered(group = "core", name = "RelationManager_getCollecitonForQuery")
-    public Results getCollection( String collectionName, Query query, Level resultsLevel ) throws Exception {
-
-        // changed intentionally to delegate to search so that behavior is
-        // consistent across all index access.
-
-        return searchCollection( collectionName, query );
-    }
-
-
-    @Override
-    @Metered(group = "core", name = "RelationManager_addToCollection")
-    public Entity addToCollection( String collectionName, EntityRef itemRef ) throws Exception {
-
-        Entity itemEntity = em.get( itemRef );
-
-        if ( itemEntity == null ) {
-            return null;
-        }
-
-        CollectionInfo collection = getDefaultSchema().getCollection( headEntity.getType(), collectionName );
-        if ( ( collection != null ) && !collection.getType().equals( itemRef.getType() ) ) {
-            return null;
-        }
-
-        UUID timestampUuid = newTimeUUID();
-        Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator(cass.getApplicationKeyspace( applicationId ), Serializers.be );
-
-        batchAddToCollection( batch, collectionName, itemEntity, timestampUuid );
-
-        if ( collection.getLinkedCollection() != null ) {
-            getRelationManager( itemEntity )
-                    .batchAddToCollection( batch, collection.getLinkedCollection(), getHeadEntity(), timestampUuid );
-        }
-
-        batchExecute( batch, CassandraService.RETRY_COUNT );
-
-        return itemEntity;
-    }
-
-
-    @Override
-    @Metered(group = "core", name = "RelationManager_addToCollections")
-    public Entity addToCollections( List<EntityRef> owners, String collectionName ) throws Exception {
-
-        Entity itemEntity = getHeadEntity();
-
-        Map<String, List<UUID>> collectionsByType = new LinkedHashMap<String, List<UUID>>();
-        for ( EntityRef owner : owners ) {
-            MapUtils.addMapList( collectionsByType, owner.getType(), owner.getUuid() );
-        }
-
-        UUID timestampUuid = newTimeUUID();
-        Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ), Serializers.be );
-
-        Schema schema = getDefaultSchema();
-        for ( Entry<String, List<UUID>> entry : collectionsByType.entrySet() ) {
-            CollectionInfo collection = schema.getCollection( entry.getKey(), collectionName );
-            if ( ( collection != null ) && !collection.getType().equals( headEntity.getType() ) ) {
-                continue;
-            }
-            batchAddToCollections( batch, entry.getKey(), entry.getValue(), collectionName, itemEntity, timestampUuid );
-
-            if ( collection.getLinkedCollection() != null ) {
-                logger.error(
-                        "Bulk add to collections used on a linked collection, linked connection will not be updated" );
-            }
-        }
-
-        batchExecute( batch, CassandraService.RETRY_COUNT );
-
-        return null;
-    }
-
-
-    @Override
-    @Metered(group = "core", name = "RelationManager_createItemInCollection")
-    public Entity createItemInCollection( String collectionName, String itemType, Map<String, Object> properties )
-            throws Exception {
-
-        if ( headEntity.getUuid().equals( applicationId ) ) {
-            if ( itemType.equals( TYPE_ENTITY ) ) {
-                itemType = singularize( collectionName );
-            }
-            if ( itemType.equals( TYPE_ROLE ) ) {
-                Long inactivity = ( Long ) properties.get( PROPERTY_INACTIVITY );
-                if ( inactivity == null ) {
-                    inactivity = 0L;
-                }
-                return em.createRole( ( String ) properties.get( PROPERTY_NAME ),
-                        ( String ) properties.get( PROPERTY_TITLE ), inactivity );
-            }
-            return em.create( itemType, properties );
-        }
-        else if ( headEntity.getType().equals( Group.ENTITY_TYPE ) && ( collectionName.equals( COLLECTION_ROLES ) ) ) {
-            UUID groupId = headEntity.getUuid();
-            String roleName = ( String ) properties.get( PROPERTY_NAME );
-            return em.createGroupRole( groupId, roleName, ( Long ) properties.get( PROPERTY_INACTIVITY ) );
-        }
-
-        CollectionInfo collection = getDefaultSchema().getCollection( headEntity.getType(), collectionName );
-        if ( ( collection != null ) && !collection.getType().equals( itemType ) ) {
-            return null;
-        }
-
-        properties = getDefaultSchema().cleanUpdatedProperties( itemType, properties, true );
-
-        Entity itemEntity = em.create( itemType, properties );
-
-        if ( itemEntity != null ) {
-            UUID timestampUuid = newTimeUUID();
-            Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ), Serializers.be );
-
-            batchAddToCollection( batch, collectionName, itemEntity, timestampUuid );
-
-            if ( collection.getLinkedCollection() != null ) {
-                getRelationManager( itemEntity )
-                        .batchAddToCollection( batch, collection.getLinkedCollection(), getHeadEntity(),
-                                timestampUuid );
-            }
-
-            batchExecute( batch, CassandraService.RETRY_COUNT );
-        }
-
-        return itemEntity;
-    }
-
-
-    @Override
-    @Metered(group = "core", name = "RelationManager_removeFromCollection")
-    public void removeFromCollection( String collectionName, EntityRef itemRef ) throws Exception {
-
-        if ( headEntity.getUuid().equals( applicationId ) ) {
-            if ( collectionName.equals( COLLECTION_ROLES ) ) {
-                Entity itemEntity = em.get( itemRef );
-                if ( itemEntity != null ) {
-                    RoleRef roleRef = SimpleRoleRef.forRoleEntity( itemEntity );
-                    em.deleteRole( roleRef.getApplicationRoleName() );
-                    return;
-                }
-                em.delete( itemEntity );
-                return;
-            }
-            em.delete( itemRef );
-            return;
-        }
-
-        Entity itemEntity = em.get( itemRef );
-
-        if ( itemEntity == null ) {
-            return;
-        }
-
-        UUID timestampUuid = newTimeUUID();
-        Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ), Serializers.be );
-
-        batchRemoveFromCollection( batch, collectionName, itemEntity, timestampUuid );
-
-        CollectionInfo collection = getDefaultSchema().getCollection( headEntity.getType(), collectionName );
-        if ( ( collection != null ) && ( collection.getLinkedCollection() != null ) ) {
-            getRelationManager( itemEntity )
-                    .batchRemoveFromCollection( batch, collection.getLinkedCollection(), getHeadEntity(),
-                            timestampUuid );
-        }
-
-        batchExecute( batch, CassandraService.RETRY_COUNT );
-
-        if ( headEntity.getType().equals( Group.ENTITY_TYPE ) ) {
-            if ( collectionName.equals( COLLECTION_ROLES ) ) {
-                String path = ( String ) ( ( Entity ) itemRef ).getMetadata( "path" );
-                if ( path.startsWith( "/roles/" ) ) {
-                    RoleRef roleRef = SimpleRoleRef.forRoleEntity( itemEntity );
-                    em.deleteRole( roleRef.getApplicationRoleName() );
-                }
-            }
-        }
-    }
-
-
-    @Metered(group = "core", name = "RelationManager_batchRemoveFromContainers")
-    public void batchRemoveFromContainers( Mutator<ByteBuffer> m, UUID timestampUuid ) throws Exception {
-        Entity entity = getHeadEntity();
-        // find all the containing collections
-        Map<EntityRef, Set<String>> containers = getContainingCollections();
-        if ( containers != null ) {
-            for ( Entry<EntityRef, Set<String>> container : containers.entrySet() ) {
-                for ( String collectionName : container.getValue() ) {
-                    getRelationManager( container.getKey() )
-                            .batchRemoveFromCollection( m, collectionName, entity, true, timestampUuid );
-                }
-            }
-        }
-    }
-
-
-    @Override
-    @Metered(group = "core", name = "RelationManager_copyRelationships")
-    public void copyRelationships( String srcRelationName, EntityRef dstEntityRef, String dstRelationName )
-            throws Exception {
-
-        headEntity = em.validate( headEntity );
-        dstEntityRef = em.validate( dstEntityRef );
-
-        CollectionInfo srcCollection = getDefaultSchema().getCollection( headEntity.getType(), srcRelationName );
-
-        CollectionInfo dstCollection = getDefaultSchema().getCollection( dstEntityRef.getType(), dstRelationName );
-
-        Results results = null;
-        do {
-            if ( srcCollection != null ) {
-                results = em.getCollection( headEntity, srcRelationName, null, 5000, Level.REFS, false );
-            }
-            else {
-                results = em.getConnectedEntities( headEntity, srcRelationName, null, Level.REFS );
-            }
-
-            if ( ( results != null ) && ( results.size() > 0 ) ) {
-                List<EntityRef> refs = results.getRefs();
-                for ( EntityRef ref : refs ) {
-                    if ( dstCollection != null ) {
-                        em.addToCollection( dstEntityRef, dstRelationName, ref );
-                    }
-                    else {
-                        em.createConnection( dstEntityRef, dstRelationName, ref );
-                    }
-                }
-            }
-        }
-        while ( ( results != null ) && ( results.hasMoreResults() ) );
-    }
-
-
-    @Override
-    @Metered(group = "core", name = "RelationManager_searchCollection")
-    public Results searchCollection( String collectionName, Query query ) throws Exception {
-
-        if ( query == null ) {
-            query = new Query();
-        }
-
-        headEntity = em.validate( headEntity );
-
-        CollectionInfo collection = getDefaultSchema().getCollection( headEntity.getType(), collectionName );
-
-        query.setEntityType( collection.getType() );
-
-        final CollectionResultsLoaderFactory factory = new CollectionResultsLoaderFactory();
-
-        // we have something to search with, visit our tree and evaluate the
-        // results
-        QueryProcessorImpl qp = new QueryProcessorImpl( query, collection, em, factory );
-        SearchCollectionVisitor visitor = new SearchCollectionVisitor( qp );
-
-        return qp.getResults( visitor );
-    }
-
-
-    @Override
-    @Metered(group = "core", name = "RelationManager_createConnection_connection_ref")
-    public ConnectionRef createConnection( ConnectionRef connection ) throws Exception {
-        ConnectionRefImpl connectionImpl = new ConnectionRefImpl( connection );
-
-        updateEntityConnection( false, connectionImpl );
-
-        return connection;
-    }
-
-
-    @Override
-    @Metered(group = "core", name = "RelationManager_createConnection_connectionType")
-    public ConnectionRef createConnection( String connectionType, EntityRef connectedEntityRef ) throws Exception {
-
-        headEntity = em.validate( headEntity );
-        connectedEntityRef = em.validate( connectedEntityRef );
-
-        ConnectionRefImpl connection = new ConnectionRefImpl( headEntity, connectionType, connectedEntityRef );
-
-        updateEntityConnection( false, connection );
-
-        return connection;
-    }
-
-
-    @Override
-    @Metered(group = "core", name = "RelationManager_createConnection_paired_connection_type")
-    public ConnectionRef createConnection( String pairedConnectionType, EntityRef pairedEntity, String connectionType,
-                                           EntityRef connectedEntityRef ) throws Exception {
-
-        ConnectionRefImpl connection =
-                new ConnectionRefImpl( headEntity, new ConnectedEntityRefImpl( pairedConnectionType, pairedEntity ),
-                        new ConnectedEntityRefImpl( connectionType, connectedEntityRef ) );
-
-        updateEntityConnection( false, connection );
-
-        return connection;
-    }
-
-
-    @Override
-    @Metered(group = "core", name = "RelationManager_createConnection_connected_entity_ref")
-    public ConnectionRef createConnection( ConnectedEntityRef... connections ) throws Exception {
-
-        ConnectionRefImpl connection = new ConnectionRefImpl( headEntity, connections );
-
-        updateEntityConnection( false, connection );
-
-        return connection;
-    }
-
-
-    @Override
-    @Metered(group = "core", name = "RelationManager_connectionRef_type_entity")
-    public ConnectionRef connectionRef( String connectionType, EntityRef connectedEntityRef ) throws Exception {
-
-        ConnectionRef connection = new ConnectionRefImpl( headEntity, connectionType, connectedEntityRef );
-
-        return connection;
-    }
-
-
-    @Override
-    @Metered(group = "core", name = "RelationManager_connectionRef_entity_to_entity")
-    public ConnectionRef connectionRef( String pairedConnectionType, EntityRef pairedEntity, String connectionType,
-                                        EntityRef connectedEntityRef ) throws Exception {
-
-        ConnectionRef connection =
-                new ConnectionRefImpl( headEntity, new ConnectedEntityRefImpl( pairedConnectionType, pairedEntity ),
-                        new ConnectedEntityRefImpl( connectionType, connectedEntityRef ) );
-
-        return connection;
-    }
-
-
-    @Override
-    @Metered(group = "core", name = "RelationManager_connectionRef_connections")
-    public ConnectionRef connectionRef( ConnectedEntityRef... connections ) {
-
-        ConnectionRef connection = new ConnectionRefImpl( headEntity, connections );
-
-        return connection;
-    }
-
-
-    @Override
-    @Metered(group = "core", name = "RelationManager_deleteConnection")
-    public void deleteConnection( ConnectionRef connectionRef ) throws Exception {
-        updateEntityConnection( true, new ConnectionRefImpl( connectionRef ) );
-    }
-
-
-    @Override
-    @Metered(group = "core", name = "RelationManager_getConnectionTypes_entity_id")
-    public Set<String> getConnectionTypes( UUID connectedEntityId ) throws Exception {
-        // Add connection type to connections set
-        //    addInsertToMutator(batch, ENTITY_DICTIONARIES,
-        //        key(connection.getConnectingEntityId(), DICTIONARY_CONNECTED_TYPES),
-        // connection.getConnectionType(), null,
-        //        timestamp);
-        //
-        //    // Add connection type to connections set
-        //    addInsertToMutator(batch, ENTITY_DICTIONARIES,
-        //        key(connection.getConnectedEntityId(), DICTIONARY_CONNECTING_TYPES),
-        // connection.getConnectionType(), null,
-        //        timestamp);
-        //
-        //
-        //    Object key = key(connectedEntityId, DICTIONARY_CONNECTED_TYPES);
-
-        Set<String> connections = cast( em.getDictionaryAsSet( new SimpleEntityRef( connectedEntityId ),
-                Schema.DICTIONARY_CONNECTED_TYPES ) );
-
-        return connections;
-
-        //    Set<String> connection_types = new TreeSet<String>(CASE_INSENSITIVE_ORDER);
-        //
-        //    //TODO T.N. get this from the dictionary
-        //    List<ConnectionRefImpl> connections = getConnections(new ConnectionRefImpl(headEntity,
-        // new ConnectedEntityRefImpl(
-        //        NULL_ID), new ConnectedEntityRefImpl(connectedEntityId)), false);
-        //
-        //    for (ConnectionRefImpl connection : connections) {
-        //      if ((connection.getConnectionType() != null) && (connection.getFirstPairedConnectedEntityId() ==
-        // null)) {
-        //        connection_types.add(connection.getConnectionType());
-        //      }
-        //   

<TRUNCATED>