You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/01/28 23:21:23 UTC

[13/96] [abbrv] [partial] Change package namespace to org.apache.usergrid

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/usergrid/persistence/cassandra/EntityManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/usergrid/persistence/cassandra/EntityManagerImpl.java b/stack/core/src/main/java/org/usergrid/persistence/cassandra/EntityManagerImpl.java
deleted file mode 100644
index f7d4069..0000000
--- a/stack/core/src/main/java/org/usergrid/persistence/cassandra/EntityManagerImpl.java
+++ /dev/null
@@ -1,2858 +0,0 @@
-/*******************************************************************************
- * Copyright 2012 Apigee Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.usergrid.persistence.cassandra;
-
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.UUID;
-
-import javax.annotation.Resource;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.context.ApplicationContext;
-import org.springframework.util.Assert;
-import org.usergrid.locking.Lock;
-import org.usergrid.mq.Message;
-import org.usergrid.mq.QueueManager;
-import org.usergrid.mq.cassandra.QueueManagerFactoryImpl;
-import org.usergrid.persistence.AggregateCounter;
-import org.usergrid.persistence.AggregateCounterSet;
-import org.usergrid.persistence.CollectionRef;
-import org.usergrid.persistence.ConnectedEntityRef;
-import org.usergrid.persistence.ConnectionRef;
-import org.usergrid.persistence.CounterResolution;
-import org.usergrid.persistence.DynamicEntity;
-import org.usergrid.persistence.Entity;
-import org.usergrid.persistence.EntityFactory;
-import org.usergrid.persistence.EntityManager;
-import org.usergrid.persistence.EntityRef;
-import org.usergrid.persistence.Identifier;
-import org.usergrid.persistence.IndexBucketLocator;
-import org.usergrid.persistence.IndexBucketLocator.IndexType;
-import org.usergrid.persistence.Query;
-import org.usergrid.persistence.Query.CounterFilterPredicate;
-import org.usergrid.persistence.Results;
-import org.usergrid.persistence.Results.Level;
-import org.usergrid.persistence.RoleRef;
-import org.usergrid.persistence.Schema;
-import org.usergrid.persistence.SimpleCollectionRef;
-import org.usergrid.persistence.SimpleEntityRef;
-import org.usergrid.persistence.SimpleRoleRef;
-import org.usergrid.persistence.TypedEntity;
-import org.usergrid.persistence.cassandra.CounterUtils.AggregateCounterSelection;
-import org.usergrid.persistence.cassandra.util.TraceParticipant;
-import org.usergrid.persistence.entities.Application;
-import org.usergrid.persistence.entities.Event;
-import org.usergrid.persistence.entities.Group;
-import org.usergrid.persistence.entities.Role;
-import org.usergrid.persistence.entities.User;
-import org.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsException;
-import org.usergrid.persistence.exceptions.EntityNotFoundException;
-import org.usergrid.persistence.exceptions.RequiredPropertyNotFoundException;
-import org.usergrid.persistence.exceptions.UnexpectedEntityTypeException;
-import org.usergrid.persistence.schema.CollectionInfo;
-import org.usergrid.utils.ClassUtils;
-import org.usergrid.utils.CompositeUtils;
-import org.usergrid.utils.UUIDUtils;
-
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
-import com.yammer.metrics.annotation.Metered;
-
-import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
-import me.prettyprint.cassandra.serializers.LongSerializer;
-import me.prettyprint.cassandra.serializers.StringSerializer;
-import me.prettyprint.cassandra.serializers.UUIDSerializer;
-import me.prettyprint.hector.api.Keyspace;
-import me.prettyprint.hector.api.beans.ColumnSlice;
-import me.prettyprint.hector.api.beans.CounterRow;
-import me.prettyprint.hector.api.beans.CounterRows;
-import me.prettyprint.hector.api.beans.CounterSlice;
-import me.prettyprint.hector.api.beans.DynamicComposite;
-import me.prettyprint.hector.api.beans.HColumn;
-import me.prettyprint.hector.api.beans.HCounterColumn;
-import me.prettyprint.hector.api.beans.Row;
-import me.prettyprint.hector.api.beans.Rows;
-import me.prettyprint.hector.api.factory.HFactory;
-import me.prettyprint.hector.api.mutation.Mutator;
-import me.prettyprint.hector.api.query.MultigetSliceCounterQuery;
-import me.prettyprint.hector.api.query.QueryResult;
-import me.prettyprint.hector.api.query.SliceCounterQuery;
-
-import static java.lang.String.CASE_INSENSITIVE_ORDER;
-import static java.util.Arrays.asList;
-
-import static me.prettyprint.hector.api.factory.HFactory.createCounterSliceQuery;
-import static me.prettyprint.hector.api.factory.HFactory.createMutator;
-import static org.apache.commons.lang.StringUtils.capitalize;
-import static org.apache.commons.lang.StringUtils.isBlank;
-import static org.usergrid.locking.LockHelper.getUniqueUpdateLock;
-import static org.usergrid.persistence.Results.Level.REFS;
-import static org.usergrid.persistence.Results.fromEntities;
-import static org.usergrid.persistence.Schema.COLLECTION_ROLES;
-import static org.usergrid.persistence.Schema.COLLECTION_USERS;
-import static org.usergrid.persistence.Schema.DICTIONARY_COLLECTIONS;
-import static org.usergrid.persistence.Schema.DICTIONARY_PERMISSIONS;
-import static org.usergrid.persistence.Schema.DICTIONARY_PROPERTIES;
-import static org.usergrid.persistence.Schema.DICTIONARY_ROLENAMES;
-import static org.usergrid.persistence.Schema.DICTIONARY_ROLETIMES;
-import static org.usergrid.persistence.Schema.DICTIONARY_SETS;
-import static org.usergrid.persistence.Schema.PROPERTY_ASSOCIATED;
-import static org.usergrid.persistence.Schema.PROPERTY_CREATED;
-import static org.usergrid.persistence.Schema.PROPERTY_INACTIVITY;
-import static org.usergrid.persistence.Schema.PROPERTY_MODIFIED;
-import static org.usergrid.persistence.Schema.PROPERTY_NAME;
-import static org.usergrid.persistence.Schema.PROPERTY_TIMESTAMP;
-import static org.usergrid.persistence.Schema.PROPERTY_TYPE;
-import static org.usergrid.persistence.Schema.PROPERTY_UUID;
-import static org.usergrid.persistence.Schema.TYPE_APPLICATION;
-import static org.usergrid.persistence.Schema.TYPE_CONNECTION;
-import static org.usergrid.persistence.Schema.TYPE_ENTITY;
-import static org.usergrid.persistence.Schema.TYPE_MEMBER;
-import static org.usergrid.persistence.Schema.TYPE_ROLE;
-import static org.usergrid.persistence.Schema.defaultCollectionName;
-import static org.usergrid.persistence.Schema.deserializeEntityProperties;
-import static org.usergrid.persistence.Schema.getDefaultSchema;
-import static org.usergrid.persistence.SimpleEntityRef.getUuid;
-import static org.usergrid.persistence.SimpleEntityRef.ref;
-import static org.usergrid.persistence.SimpleRoleRef.getIdForGroupIdAndRoleName;
-import static org.usergrid.persistence.SimpleRoleRef.getIdForRoleName;
-import static org.usergrid.persistence.cassandra.ApplicationCF.APPLICATION_AGGREGATE_COUNTERS;
-import static org.usergrid.persistence.cassandra.ApplicationCF.ENTITY_COMPOSITE_DICTIONARIES;
-import static org.usergrid.persistence.cassandra.ApplicationCF.ENTITY_COUNTERS;
-import static org.usergrid.persistence.cassandra.ApplicationCF.ENTITY_DICTIONARIES;
-import static org.usergrid.persistence.cassandra.ApplicationCF.ENTITY_ID_SETS;
-import static org.usergrid.persistence.cassandra.ApplicationCF.ENTITY_PROPERTIES;
-import static org.usergrid.persistence.cassandra.ApplicationCF.ENTITY_UNIQUE;
-import static org.usergrid.persistence.cassandra.CassandraPersistenceUtils.addDeleteToMutator;
-import static org.usergrid.persistence.cassandra.CassandraPersistenceUtils.addInsertToMutator;
-import static org.usergrid.persistence.cassandra.CassandraPersistenceUtils.addPropertyToMutator;
-import static org.usergrid.persistence.cassandra.CassandraPersistenceUtils.batchExecute;
-import static org.usergrid.persistence.cassandra.CassandraPersistenceUtils.key;
-import static org.usergrid.persistence.cassandra.CassandraPersistenceUtils.toStorableBinaryValue;
-import static org.usergrid.persistence.cassandra.CassandraService.ALL_COUNT;
-import static org.usergrid.utils.ClassUtils.cast;
-import static org.usergrid.utils.ConversionUtils.bytebuffer;
-import static org.usergrid.utils.ConversionUtils.getLong;
-import static org.usergrid.utils.ConversionUtils.object;
-import static org.usergrid.utils.ConversionUtils.string;
-import static org.usergrid.utils.ConversionUtils.uuid;
-import static org.usergrid.utils.InflectionUtils.singularize;
-import static org.usergrid.utils.UUIDUtils.getTimestampInMicros;
-import static org.usergrid.utils.UUIDUtils.getTimestampInMillis;
-import static org.usergrid.utils.UUIDUtils.isTimeBased;
-import static org.usergrid.utils.UUIDUtils.newTimeUUID;
-
-
-/**
- * Cassandra-specific implementation of Datastore
- *
- * @author edanuff
- * @author tnine
- */
-public class EntityManagerImpl implements EntityManager {
-
-    /** The log4j logger. */
-    private static final Logger logger = LoggerFactory.getLogger( EntityManagerImpl.class );
-    public static final String APPLICATION_COLLECTION = "application.collection.";
-    public static final String APPLICATION_ENTITIES = "application.entities";
-    public static final long ONE_COUNT = 1L;
-    @Resource
-    private EntityManagerFactoryImpl emf;
-    @Resource
-    private QueueManagerFactoryImpl qmf;
-    @Resource
-    private IndexBucketLocator indexBucketLocator;
-
-    private UUID applicationId;
-
-    private Application application;
-    @Resource
-    private CassandraService cass;
-    @Resource
-    private CounterUtils counterUtils;
-
-    private boolean skipAggregateCounters;
-
-    public static final StringSerializer se = new StringSerializer();
-    public static final ByteBufferSerializer be = new ByteBufferSerializer();
-    public static final UUIDSerializer ue = new UUIDSerializer();
-    public static final LongSerializer le = new LongSerializer();
-
-
-    public EntityManagerImpl() {
-    }
-
-
-    public EntityManagerImpl init( EntityManagerFactoryImpl emf, CassandraService cass, CounterUtils counterUtils,
-                                   UUID applicationId, boolean skipAggregateCounters ) {
-        this.emf = emf;
-        this.cass = cass;
-        this.counterUtils = counterUtils;
-        this.applicationId = applicationId;
-        this.skipAggregateCounters = skipAggregateCounters;
-        qmf = ( QueueManagerFactoryImpl ) getApplicationContext().getBean( "queueManagerFactory" );
-        indexBucketLocator = ( IndexBucketLocator ) getApplicationContext().getBean( "indexBucketLocator" );
-        // prime the application entity for the EM
-        try {
-            getApplication();
-        }
-        catch ( Exception ex ) {
-            ex.printStackTrace();
-        }
-        return this;
-    }
-
-
-    public void setApplicationId( UUID applicationId ) {
-        this.applicationId = applicationId;
-    }
-
-
-    public ApplicationContext getApplicationContext() {
-        return emf.applicationContext;
-    }
-
-
-    @Override
-    public EntityRef getApplicationRef() {
-        return ref( TYPE_APPLICATION, applicationId );
-    }
-
-
-    @Override
-    public Application getApplication() throws Exception {
-        if ( application == null ) {
-            application = get( applicationId, Application.class );
-        }
-        return application;
-    }
-
-
-    @Override
-    public void updateApplication( Application app ) throws Exception {
-        update( app );
-        this.application = app;
-    }
-
-
-    @Override
-    public void updateApplication( Map<String, Object> properties ) throws Exception {
-        this.updateProperties( applicationId, properties );
-        this.application = get( applicationId, Application.class );
-    }
-
-
-    @Override
-    public RelationManagerImpl getRelationManager( EntityRef entityRef ) {
-        //RelationManagerImpl rmi = applicationContext.getBean(RelationManagerImpl.class);
-        RelationManagerImpl rmi = new RelationManagerImpl();
-        rmi.init( this, cass, applicationId, entityRef, indexBucketLocator );
-        return rmi;
-    }
-
-
-    /**
-     * Batch dictionary property.
-     *
-     * @param batch The batch to set the property into
-     * @param entity The entity that owns the property
-     * @param propertyName the property name
-     * @param propertyValue the property value
-     * @param timestampUuid The update timestamp as a uuid
-     *
-     * @return batch
-     *
-     * @throws Exception the exception
-     */
-    public Mutator<ByteBuffer> batchSetProperty( Mutator<ByteBuffer> batch, EntityRef entity, String propertyName,
-                                                 Object propertyValue, UUID timestampUuid ) throws Exception {
-        return this.batchSetProperty( batch, entity, propertyName, propertyValue, false, false, timestampUuid );
-    }
-
-
-    public Mutator<ByteBuffer> batchSetProperty( Mutator<ByteBuffer> batch, EntityRef entity, String propertyName,
-                                                 Object propertyValue, boolean force, boolean noRead,
-                                                 UUID timestampUuid ) throws Exception {
-
-        long timestamp = getTimestampInMicros( timestampUuid );
-
-        // propertyName = propertyName.toLowerCase();
-
-        boolean entitySchemaHasProperty = getDefaultSchema().hasProperty( entity.getType(), propertyName );
-
-        propertyValue = getDefaultSchema().validateEntityPropertyValue( entity.getType(), propertyName, propertyValue );
-
-        Schema defaultSchema = Schema.getDefaultSchema();
-
-        if ( PROPERTY_TYPE.equalsIgnoreCase( propertyName ) && ( propertyValue != null ) ) {
-            if ( "entity".equalsIgnoreCase( propertyValue.toString() ) || "dynamicentity"
-                    .equalsIgnoreCase( propertyValue.toString() ) ) {
-                String errorMsg =
-                        "Unable to dictionary entity type to " + propertyValue + " because that is not a valid type.";
-                logger.error( errorMsg );
-                throw new IllegalArgumentException( errorMsg );
-            }
-        }
-
-        if ( entitySchemaHasProperty ) {
-
-            if ( !force ) {
-                if ( !defaultSchema.isPropertyMutable( entity.getType(), propertyName ) ) {
-                    return batch;
-                }
-
-                // Passing null for propertyValue indicates delete the property
-                // so if required property, exit
-                if ( ( propertyValue == null ) && defaultSchema.isRequiredProperty( entity.getType(), propertyName ) ) {
-                    return batch;
-                }
-            }
-
-
-            /**
-             * Unique property, load the old value and remove it, check if it's not a duplicate
-             */
-            if ( defaultSchema.getEntityInfo( entity.getType() ).isPropertyUnique( propertyName ) ) {
-
-                Lock lock = getUniqueUpdateLock( cass.getLockManager(), applicationId, propertyValue, entity.getType(),
-                        propertyName );
-
-                try {
-                    lock.lock();
-
-                    if ( !isPropertyValueUniqueForEntity( entity.getUuid(), entity.getType(), propertyName,
-                            propertyValue ) ) {
-                        throw new DuplicateUniquePropertyExistsException( entity.getType(), propertyName,
-                                propertyValue );
-                    }
-
-
-                    String collectionName = Schema.defaultCollectionName( entity.getType() );
-
-                    uniquePropertyDelete( batch, collectionName, entity.getType(), propertyName, propertyValue,
-                            entity.getUuid(), timestamp - 1 );
-                    uniquePropertyWrite( batch, collectionName, propertyName, propertyValue, entity.getUuid(),
-                            timestamp );
-                }
-                finally {
-                    lock.unlock();
-                }
-            }
-        }
-
-        if ( getDefaultSchema().isPropertyIndexed( entity.getType(), propertyName ) ) {
-            //this call is incorrect.  The current entity is NOT the head entity
-            getRelationManager( entity )
-                    .batchUpdatePropertyIndexes( batch, propertyName, propertyValue, entitySchemaHasProperty, noRead,
-                            timestampUuid );
-        }
-
-
-        if ( propertyValue != null ) {
-            // Set the new value
-            addPropertyToMutator( batch, key( entity.getUuid() ), entity.getType(), propertyName, propertyValue,
-                    timestamp );
-
-            if ( !entitySchemaHasProperty ) {
-                // Make a list of all the properties ever dictionary on this
-                // entity
-                addInsertToMutator( batch, ENTITY_DICTIONARIES, key( entity.getUuid(), DICTIONARY_PROPERTIES ),
-                        propertyName, null, timestamp );
-            }
-        }
-        else {
-            addDeleteToMutator( batch, ENTITY_PROPERTIES, key( entity.getUuid() ), propertyName, timestamp );
-        }
-
-        return batch;
-    }
-
-
-    /**
-     * Batch update properties.
-     *
-     * @param batch the batch
-     * @param entity The owning entity reference
-     * @param properties the properties to set
-     * @param timestampUuid the timestamp of the update operation as a time uuid
-     *
-     * @return batch
-     *
-     * @throws Exception the exception
-     */
-    public Mutator<ByteBuffer> batchUpdateProperties( Mutator<ByteBuffer> batch, EntityRef entity,
-                                                      Map<String, Object> properties, UUID timestampUuid )
-            throws Exception {
-
-        for ( String propertyName : properties.keySet() ) {
-            Object propertyValue = properties.get( propertyName );
-
-            batch = batchSetProperty( batch, entity, propertyName, propertyValue, timestampUuid );
-        }
-
-        return batch;
-    }
-
-
-    /**
-     * Batch update set.
-     *
-     * @param batch the batch
-     * @param entity The owning entity
-     * @param dictionaryName the dictionary name
-     * @param elementValue the dictionary value
-     * @param removeFromDictionary True to delete from the dictionary
-     * @param timestampUuid the timestamp
-     *
-     * @return batch
-     *
-     * @throws Exception the exception
-     */
-    public Mutator<ByteBuffer> batchUpdateDictionary( Mutator<ByteBuffer> batch, EntityRef entity,
-                                                      String dictionaryName, Object elementValue,
-                                                      boolean removeFromDictionary, UUID timestampUuid )
-            throws Exception {
-        return batchUpdateDictionary( batch, entity, dictionaryName, elementValue, null, removeFromDictionary,
-                timestampUuid );
-    }
-
-
-    public Mutator<ByteBuffer> batchUpdateDictionary( Mutator<ByteBuffer> batch, EntityRef entity,
-                                                      String dictionaryName, Object elementValue, Object elementCoValue,
-                                                      boolean removeFromDictionary, UUID timestampUuid )
-            throws Exception {
-
-        long timestamp = getTimestampInMicros( timestampUuid );
-
-        // dictionaryName = dictionaryName.toLowerCase();
-        if ( elementCoValue == null ) {
-            elementCoValue = ByteBuffer.allocate( 0 );
-        }
-
-        boolean entityHasDictionary = getDefaultSchema().hasDictionary( entity.getType(), dictionaryName );
-
-        // Don't index dynamic dictionaries not defined by the schema
-        if ( entityHasDictionary ) {
-            getRelationManager( entity )
-                    .batchUpdateSetIndexes( batch, dictionaryName, elementValue, removeFromDictionary, timestampUuid );
-        }
-
-        ApplicationCF dictionary_cf = entityHasDictionary ? ENTITY_DICTIONARIES : ENTITY_COMPOSITE_DICTIONARIES;
-
-        if ( elementValue != null ) {
-            if ( !removeFromDictionary ) {
-                // Set the new value
-
-                elementCoValue = toStorableBinaryValue( elementCoValue, !entityHasDictionary );
-
-                addInsertToMutator( batch, dictionary_cf, key( entity.getUuid(), dictionaryName ),
-                        entityHasDictionary ? elementValue : asList( elementValue ), elementCoValue, timestamp );
-
-                if ( !entityHasDictionary ) {
-                    addInsertToMutator( batch, ENTITY_DICTIONARIES, key( entity.getUuid(), DICTIONARY_SETS ),
-                            dictionaryName, null, timestamp );
-                }
-            }
-            else {
-                addDeleteToMutator( batch, dictionary_cf, key( entity.getUuid(), dictionaryName ),
-                        entityHasDictionary ? elementValue : asList( elementValue ), timestamp );
-            }
-        }
-
-        return batch;
-    }
-
-
-    /**
-     * Returns true if the property is unique, and the entity can be saved.  If it's not unique, false is returned
-     *
-     * @return True if this entity can safely "own" this property name and value unique combination
-     */
-    @Metered( group = "core", name = "EntityManager_isPropertyValueUniqueForEntity" )
-    public boolean isPropertyValueUniqueForEntity( UUID ownerEntityId, String entityType, String propertyName,
-                                                   Object propertyValue ) throws Exception {
-
-        if ( !getDefaultSchema().isPropertyUnique( entityType, propertyName ) ) {
-            return true;
-        }
-
-        if ( propertyValue == null ) {
-            return true;
-        }
-
-        /**
-         * Doing this in a loop sucks, but we need to account for possibly having more than 1 entry in the index due
-         * to corruption.  We need to allow them to update, otherwise
-         * both entities will be unable to update and must be deleted
-         */
-
-        Set<UUID> ownerEntityIds = getUUIDsForUniqueProperty( applicationId, entityType, propertyName, propertyValue );
-
-        //if there are no entities for this property, we know it's unique.  If there are,
-        // we have to make sure the one we were passed is in the set.  otherwise it belongs
-        //to a different entity
-        return ownerEntityIds.size() == 0 || ownerEntityIds.contains( ownerEntityId );
-    }
-
-
-    /**
-     * Return all UUIDs that have this unique value
-     *
-     * @param ownerEntityId The entity id that owns this entity collection
-     * @param collectionName The entity collection name
-     * @param propertyName The name of the unique property
-     * @param propertyValue The value of the unique property
-     */
-    private Set<UUID> getUUIDsForUniqueProperty( UUID ownerEntityId, String collectionName, String propertyName,
-                                                 Object propertyValue ) throws Exception {
-
-
-        String collectionNameInternal = defaultCollectionName( collectionName );
-
-        Object key = createUniqueIndexKey( ownerEntityId, collectionNameInternal, propertyName, propertyValue );
-
-        List<HColumn<ByteBuffer, ByteBuffer>> cols =
-                cass.getColumns( cass.getApplicationKeyspace( applicationId ), ENTITY_UNIQUE, key, null, null, 2,
-                        false );
-
-
-        //No columns at all, it's unique
-        if ( cols.size() == 0 ) {
-            return Collections.emptySet();
-        }
-
-        //shouldn't happen, but it's an error case
-        if ( cols.size() > 1 ) {
-            logger.error( "INDEX CORRUPTION: More than 1 unique value exists for entities in ownerId {} of type {} on "
-                    + "property {} with value {}",
-                    new Object[] { ownerEntityId, collectionNameInternal, propertyName, propertyValue } );
-        }
-
-        /**
-         * Doing this in a loop sucks, but we need to account for possibly having more than 1 entry in the index due
-         * to corruption.  We need to allow them to update, otherwise
-         * both entities will be unable to update and must be deleted
-         */
-
-        Set<UUID> results = new HashSet<UUID>( cols.size() );
-
-        for ( HColumn<ByteBuffer, ByteBuffer> col : cols ) {
-            results.add( ue.fromByteBuffer( col.getName() ) );
-        }
-
-        return results;
-    }
-
-
-    /** Add this unique index to the delete */
-    private void uniquePropertyDelete( Mutator<ByteBuffer> m, String collectionName, String entityType,
-                                       String propertyName, Object propertyValue, UUID entityId, long timestamp )
-            throws Exception {
-        //read the old value and delete it
-
-        Object oldValue = getProperty( new SimpleEntityRef( entityType, entityId ), propertyName );
-
-        //we have an old value.  If the new value is empty, we want to delete the old value.  If the new value is
-        // different we want to delete, otherwise we don't issue the delete
-        if ( oldValue != null && ( propertyValue == null || !oldValue.equals( propertyValue ) ) ) {
-            Object key = createUniqueIndexKey( applicationId, collectionName, propertyName, oldValue );
-
-            addDeleteToMutator( m, ENTITY_UNIQUE, key, timestamp, entityId );
-        }
-    }
-
-
-    /** Add this unique index to the delete */
-    private void uniquePropertyWrite( Mutator<ByteBuffer> m, String collectionName, String propertyName,
-                                      Object propertyValue, UUID entityId, long timestamp ) throws Exception {
-        Object key = createUniqueIndexKey( applicationId, collectionName, propertyName, propertyValue );
-
-        addInsertToMutator( m, ENTITY_UNIQUE, key, entityId, null, timestamp );
-    }
-
-
-    /**
-     * Create a row key for the entity of the given type with the name and value in the property.  Used for fast unique
-     * index lookups
-     */
-    private Object createUniqueIndexKey( UUID ownerId, String collectionName, String propertyName, Object value ) {
-        return key( ownerId, collectionName, propertyName, value );
-    }
-
-
-    @Override
-    @Metered( group = "core", name = "EntityManager_getAlias_single" )
-    public EntityRef getAlias( UUID ownerId, String collectionType, String aliasValue ) throws Exception {
-
-        Assert.notNull( ownerId, "ownerId is required" );
-        Assert.notNull( collectionType, "collectionType is required" );
-        Assert.notNull( aliasValue, "aliasValue is required" );
-
-        Map<String, EntityRef> results = getAlias( ownerId, collectionType, Collections.singletonList( aliasValue ) );
-
-        if ( results == null || results.size() == 0 ) {
-            return null;
-        }
-
-        //add a warn statement so we can see if we have data migration issues.
-        //TODO When we get an event system, trigger a repair if this is detected
-        if ( results.size() > 1 ) {
-            logger.warn(
-                    "More than 1 entity with Owner id '{}' of type '{}' and alias '{}' exists.  This is a duplicate "
-                            + "alias, and needs audited", new Object[] { ownerId, collectionType, aliasValue } );
-        }
-
-        return results.get( aliasValue );
-    }
-
-
-    @Override
-    public Map<String, EntityRef> getAlias( String aliasType, List<String> aliases ) throws Exception {
-        return getAlias( applicationId, aliasType, aliases );
-    }
-
-
-    @Override
-    @Metered( group = "core", name = "EntityManager_getAlias_multi" )
-    public Map<String, EntityRef> getAlias( UUID ownerId, String collectionName, List<String> aliases )
-            throws Exception {
-
-        Assert.notNull( ownerId, "ownerId is required" );
-        Assert.notNull( collectionName, "collectionName is required" );
-        Assert.notEmpty( aliases, "aliases are required" );
-
-
-        String propertyName = Schema.getDefaultSchema().aliasProperty( collectionName );
-
-        Map<String, EntityRef> results = new HashMap<String, EntityRef>();
-
-        for ( String alias : aliases ) {
-            for ( UUID id : getUUIDsForUniqueProperty( ownerId, collectionName, propertyName, alias ) ) {
-                results.put( alias, new SimpleEntityRef( collectionName, id ) );
-            }
-        }
-
-        return results;
-    }
-
-
-    @SuppressWarnings( "unchecked" )
-    @Override
-    public <A extends Entity> A create( String entityType, Class<A> entityClass, Map<String, Object> properties )
-            throws Exception {
-        if ( ( entityType != null ) && ( entityType.startsWith( TYPE_ENTITY ) || entityType
-                .startsWith( "entities" ) ) ) {
-            throw new IllegalArgumentException( "Invalid entity type" );
-        }
-        A e = null;
-        try {
-            e = ( A ) create( entityType, ( Class<Entity> ) entityClass, properties, null );
-        }
-        catch ( ClassCastException e1 ) {
-            logger.error( "Unable to create typed entity", e1 );
-        }
-        return e;
-    }
-
-
-    @Override
-    public Entity create( UUID importId, String entityType, Map<String, Object> properties ) throws Exception {
-        return create( entityType, null, properties, importId );
-    }
-
-
-    @SuppressWarnings( "unchecked" )
-    @Override
-    public <A extends TypedEntity> A create( A entity ) throws Exception {
-        return ( A ) create( entity.getType(), entity.getClass(), entity.getProperties() );
-    }
-
-
-    @Override
-    @TraceParticipant
-    public Entity create( String entityType, Map<String, Object> properties ) throws Exception {
-        return create( entityType, null, properties );
-    }
-
-
-    /**
-     * Creates a new entity.
-     *
-     * @param entityType the entity type
-     * @param entityClass the entity class
-     * @param properties the properties
-     * @param importId an existing external uuid to use as the id for the new entity
-     *
-     * @return new entity
-     *
-     * @throws Exception the exception
-     */
-    @Metered( group = "core", name = "EntityManager_create" )
-    @TraceParticipant
-    public <A extends Entity> A create( String entityType, Class<A> entityClass, Map<String, Object> properties,
-                                        UUID importId ) throws Exception {
-
-        UUID timestampUuid = newTimeUUID();
-
-        Keyspace ko = cass.getApplicationKeyspace( applicationId );
-        Mutator<ByteBuffer> m = createMutator( ko, be );
-        A entity = batchCreate( m, entityType, entityClass, properties, importId, timestampUuid );
-
-        batchExecute( m, CassandraService.RETRY_COUNT );
-
-        return entity;
-    }
-
-
-    @SuppressWarnings( "unchecked" )
-    @Metered( group = "core", name = "EntityManager_batchCreate" )
-    public <A extends Entity> A batchCreate( Mutator<ByteBuffer> m, String entityType, Class<A> entityClass,
-                                             Map<String, Object> properties, UUID importId, UUID timestampUuid )
-            throws Exception {
-
-        String eType = Schema.normalizeEntityType( entityType );
-
-        Schema schema = getDefaultSchema();
-
-        boolean is_application = TYPE_APPLICATION.equals( eType );
-
-        if ( ( ( applicationId == null ) || applicationId.equals( UUIDUtils.ZERO_UUID ) ) && !is_application ) {
-            return null;
-        }
-
-        long timestamp = getTimestampInMicros( timestampUuid );
-
-        UUID itemId = UUIDUtils.newTimeUUID();
-
-        if ( is_application ) {
-            itemId = applicationId;
-        }
-        if ( importId != null ) {
-            itemId = importId;
-        }
-        boolean emptyPropertyMap = false;
-        if ( properties == null ) {
-            properties = new TreeMap<String, Object>( CASE_INSENSITIVE_ORDER );
-        }
-        if ( properties.isEmpty() ) {
-            emptyPropertyMap = true;
-        }
-
-        if ( importId != null ) {
-            if ( isTimeBased( importId ) ) {
-                timestamp = UUIDUtils.getTimestampInMicros( importId );
-            }
-            else if ( properties.get( PROPERTY_CREATED ) != null ) {
-                timestamp = getLong( properties.get( PROPERTY_CREATED ) ) * 1000;
-            }
-        }
-
-        if ( entityClass == null ) {
-            entityClass = ( Class<A> ) Schema.getDefaultSchema().getEntityClass( entityType );
-        }
-
-        Set<String> required = schema.getRequiredProperties( entityType );
-
-        if ( required != null ) {
-            for ( String p : required ) {
-                if ( !PROPERTY_UUID.equals( p ) && !PROPERTY_TYPE.equals( p ) && !PROPERTY_CREATED.equals( p )
-                        && !PROPERTY_MODIFIED.equals( p ) ) {
-                    Object v = properties.get( p );
-                    if ( schema.isPropertyTimestamp( entityType, p ) ) {
-                        if ( v == null ) {
-                            properties.put( p, timestamp / 1000 );
-                        }
-                        else {
-                            long ts = getLong( v );
-                            if ( ts <= 0 ) {
-                                properties.put( p, timestamp / 1000 );
-                            }
-                        }
-                        continue;
-                    }
-                    if ( v == null ) {
-                        throw new RequiredPropertyNotFoundException( entityType, p );
-                    }
-                    else if ( ( v instanceof String ) && isBlank( ( String ) v ) ) {
-                        throw new RequiredPropertyNotFoundException( entityType, p );
-                    }
-                }
-            }
-        }
-
-        // Create collection name based on entity: i.e. "users"
-        String collection_name = Schema.defaultCollectionName( eType );
-        // Create collection key based collection name
-        String bucketId = indexBucketLocator.getBucket( applicationId, IndexType.COLLECTION, itemId, collection_name );
-
-        Object collection_key = key( applicationId, Schema.DICTIONARY_COLLECTIONS, collection_name, bucketId );
-
-        CollectionInfo collection = null;
-
-        if ( !is_application ) {
-            // Add entity to collection
-
-
-            if ( !emptyPropertyMap ) {
-                addInsertToMutator( m, ENTITY_ID_SETS, collection_key, itemId, null, timestamp );
-            }
-
-            // Add name of collection to dictionary property
-            // Application.collections
-            addInsertToMutator( m, ENTITY_DICTIONARIES, key( applicationId, Schema.DICTIONARY_COLLECTIONS ),
-                    collection_name, null, timestamp );
-
-            addInsertToMutator( m, ENTITY_COMPOSITE_DICTIONARIES, key( itemId, Schema.DICTIONARY_CONTAINER_ENTITIES ),
-                    asList( TYPE_APPLICATION, collection_name, applicationId ), null, timestamp );
-        }
-
-        if ( emptyPropertyMap ) {
-            return null;
-        }
-        properties.put( PROPERTY_UUID, itemId );
-        properties.put( PROPERTY_TYPE, Schema.normalizeEntityType( entityType, false ) );
-
-        if ( importId != null ) {
-            if ( properties.get( PROPERTY_CREATED ) == null ) {
-                properties.put( PROPERTY_CREATED, timestamp / 1000 );
-            }
-
-            if ( properties.get( PROPERTY_MODIFIED ) == null ) {
-                properties.put( PROPERTY_MODIFIED, timestamp / 1000 );
-            }
-        }
-        else {
-            properties.put( PROPERTY_CREATED, timestamp / 1000 );
-            properties.put( PROPERTY_MODIFIED, timestamp / 1000 );
-        }
-
-        // special case timestamp and published properties
-        // and dictionary their timestamp values if not set
-        // this is sure to break something for someone someday
-
-        if ( properties.containsKey( PROPERTY_TIMESTAMP ) ) {
-            long ts = getLong( properties.get( PROPERTY_TIMESTAMP ) );
-            if ( ts <= 0 ) {
-                properties.put( PROPERTY_TIMESTAMP, timestamp / 1000 );
-            }
-        }
-
-        A entity = EntityFactory.newEntity( itemId, eType, entityClass );
-        logger.info( "Entity created of type {}", entity.getClass().getName() );
-
-        if ( Event.ENTITY_TYPE.equals( eType ) ) {
-            Event event = ( Event ) entity.toTypedEntity();
-            for ( String prop_name : properties.keySet() ) {
-                Object propertyValue = properties.get( prop_name );
-                if ( propertyValue != null ) {
-                    event.setProperty( prop_name, propertyValue );
-                }
-            }
-            Message message = storeEventAsMessage( m, event, timestamp );
-            incrementEntityCollection( "events", timestamp );
-
-            entity.setUuid( message.getUuid() );
-            return entity;
-        }
-
-        for ( String prop_name : properties.keySet() ) {
-
-            Object propertyValue = properties.get( prop_name );
-
-            if ( propertyValue == null ) {
-                continue;
-            }
-
-
-            if ( User.ENTITY_TYPE.equals( entityType ) && "me".equals( prop_name ) ) {
-                throw new DuplicateUniquePropertyExistsException( entityType, prop_name, propertyValue );
-            }
-
-            entity.setProperty( prop_name, propertyValue );
-
-            batchSetProperty( m, entity, prop_name, propertyValue, true, true, timestampUuid );
-        }
-
-        if ( !is_application ) {
-            incrementEntityCollection( collection_name, timestamp );
-        }
-
-        return entity;
-    }
-
-
-    private void incrementEntityCollection( String collection_name, long cassandraTimestamp ) {
-        try {
-            incrementAggregateCounters( null, null, null, new String( APPLICATION_COLLECTION + collection_name ),
-                    ONE_COUNT, cassandraTimestamp );
-        }
-        catch ( Exception e ) {
-            logger.error( "Unable to increment counter application.collection: {}.", new Object[]{ collection_name, e} );
-        }
-        try {
-            incrementAggregateCounters( null, null, null, APPLICATION_ENTITIES, ONE_COUNT, cassandraTimestamp );
-        }
-        catch ( Exception e ) {
-            logger.error( "Unable to increment counter application.entities for collection: {} with timestamp: {}", new Object[]{collection_name, cassandraTimestamp,e} );
-        }
-    }
-
-
-    public void decrementEntityCollection( String collection_name ) {
-
-        long cassandraTimestamp = cass.createTimestamp();
-        decrementEntityCollection( collection_name, cassandraTimestamp );
-    }
-
-
-    public void decrementEntityCollection( String collection_name, long cassandraTimestamp ) {
-        try {
-            incrementAggregateCounters( null, null, null, APPLICATION_COLLECTION + collection_name, -ONE_COUNT,
-                    cassandraTimestamp );
-        }
-        catch ( Exception e ) {
-            logger.error( "Unable to decrement counter application.collection: {}.", new Object[]{collection_name, e} );
-        }
-        try {
-            incrementAggregateCounters( null, null, null, APPLICATION_ENTITIES, -ONE_COUNT, cassandraTimestamp );
-        }
-        catch ( Exception e ) {
-        	logger.error( "Unable to decrement counter application.entities for collection: {} with timestamp: {}", new Object[]{collection_name, cassandraTimestamp,e} );
-        }
-    }
-
-
-    @Metered( group = "core", name = "EntityManager_insertEntity" )
-    public void insertEntity( String type, UUID entityId ) throws Exception {
-
-        Keyspace ko = cass.getApplicationKeyspace( applicationId );
-        Mutator<ByteBuffer> m = createMutator( ko, be );
-
-        Object itemKey = key( entityId );
-
-        long timestamp = cass.createTimestamp();
-
-        addPropertyToMutator( m, itemKey, type, PROPERTY_UUID, entityId, timestamp );
-        addPropertyToMutator( m, itemKey, type, PROPERTY_TYPE, type, timestamp );
-
-        batchExecute( m, CassandraService.RETRY_COUNT );
-    }
-
-
-    public Message storeEventAsMessage( Mutator<ByteBuffer> m, Event event, long timestamp ) {
-
-        counterUtils.addEventCounterMutations( m, applicationId, event, timestamp );
-
-        QueueManager q = qmf.getQueueManager( applicationId );
-
-        Message message = new Message();
-        message.setType( "event" );
-        message.setCategory( event.getCategory() );
-        message.setStringProperty( "message", event.getMessage() );
-        message.setTimestamp( timestamp );
-        q.postToQueue( "events", message );
-
-        return message;
-    }
-
-
-    /**
-     * Gets the type.
-     *
-     * @param entityId the entity id
-     *
-     * @return entity type
-     *
-     * @throws Exception the exception
-     */
-    @Metered( group = "core", name = "EntityManager_getEntityType" )
-    public String getEntityType( UUID entityId ) throws Exception {
-
-        HColumn<String, String> column =
-                cass.getColumn( cass.getApplicationKeyspace( applicationId ), ENTITY_PROPERTIES, key( entityId ),
-                        PROPERTY_TYPE, se, se );
-        if ( column != null ) {
-            return column.getValue();
-        }
-        return null;
-    }
-
-
-    /**
-     * Gets the entity info. If no propertyNames are passed it loads the ENTIRE entity!
-     *
-     * @param entityId the entity id
-     * @param propertyNames the property names
-     *
-     * @return DynamicEntity object holding properties
-     *
-     * @throws Exception the exception
-     */
-    @Metered( group = "core", name = "EntityManager_loadPartialEntity" )
-    public DynamicEntity loadPartialEntity( UUID entityId, String... propertyNames ) throws Exception {
-
-        List<HColumn<String, ByteBuffer>> results = null;
-        if ( ( propertyNames != null ) && ( propertyNames.length > 0 ) ) {
-            Set<String> column_names = new TreeSet<String>( CASE_INSENSITIVE_ORDER );
-
-            column_names.add( PROPERTY_TYPE );
-            column_names.add( PROPERTY_UUID );
-
-            for ( String propertyName : propertyNames ) {
-                column_names.add( propertyName );
-            }
-
-            results = cass.getColumns( cass.getApplicationKeyspace( applicationId ), ENTITY_PROPERTIES, key( entityId ),
-                    column_names, se, be );
-        }
-        else {
-            results = cass.getAllColumns( cass.getApplicationKeyspace( applicationId ), ENTITY_PROPERTIES,
-                    key( entityId ) );
-        }
-
-        Map<String, Object> entityProperties = deserializeEntityProperties( results );
-        if ( entityProperties == null ) {
-            return null;
-        }
-
-        String entityType = ( String ) entityProperties.get( PROPERTY_TYPE );
-        UUID id = ( UUID ) entityProperties.get( PROPERTY_UUID );
-
-        return new DynamicEntity( entityType, id, entityProperties );
-    }
-
-
-    /**
-     * Gets the specified entity.
-     *
-     * @param entityId the entity id
-     * @param entityClass the entity class
-     *
-     * @return entity
-     *
-     * @throws Exception the exception
-     */
-    public <A extends Entity> A getEntity( UUID entityId, Class<A> entityClass ) throws Exception {
-
-        Object entity_key = key( entityId );
-        Map<String, Object> results = null;
-
-        // if (entityType == null) {
-        results = deserializeEntityProperties(
-                cass.getAllColumns( cass.getApplicationKeyspace( applicationId ), ENTITY_PROPERTIES, entity_key ) );
-        // } else {
-        // Set<String> columnNames = Schema.getPropertyNames(entityType);
-        // results = getColumns(getApplicationKeyspace(applicationId),
-        // EntityCF.PROPERTIES, entity_key, columnNames, se, be);
-        // }
-
-        if ( results == null ) {
-            logger.warn( "getEntity(): No properties found for entity {}, probably doesn't exist...", entityId );
-            return null;
-        }
-
-        UUID id = uuid( results.get( PROPERTY_UUID ) );
-        String type = string( results.get( PROPERTY_TYPE ) );
-
-        if ( !entityId.equals( id ) ) {
-
-            logger.error( "Expected entity id {}, found {}. Returning null entity", new Object[]{entityId, id, new Throwable()} );
-            return null;
-        }
-
-        A entity = EntityFactory.newEntity( id, type, entityClass );
-        entity.setProperties( results );
-
-        return entity;
-    }
-
-
-    /**
-     * Gets the specified list of entities.
-     *
-     * @param entityIds the entity ids
-     * @param entityClass the entity class
-     *
-     * @return entity
-     *
-     * @throws Exception the exception
-     */
-    @Metered( group = "core", name = "EntityManager_getEntities" )
-    public <A extends Entity> List<A> getEntities( Collection<UUID> entityIds, Class<A> entityClass ) throws Exception {
-
-        List<A> entities = new ArrayList<A>();
-
-        if ( ( entityIds == null ) || ( entityIds.size() == 0 ) ) {
-            return entities;
-        }
-
-        Map<UUID, A> resultSet = new LinkedHashMap<UUID, A>();
-
-        Rows<UUID, String, ByteBuffer> results = null;
-
-        // if (entityType == null) {
-        results =
-                cass.getRows( cass.getApplicationKeyspace( applicationId ), ENTITY_PROPERTIES, entityIds, ue, se, be );
-        // } else {
-        // Set<String> columnNames = Schema.getPropertyNames(entityType);
-        // results = getRows(getApplicationKeyspace(applicationId),
-        // EntityCF.PROPERTIES,
-        // entityIds, columnNames, ue, se, be);
-        // }
-
-        if ( results != null ) {
-            for ( UUID key : entityIds ) {
-                Map<String, Object> properties = deserializeEntityProperties( results.getByKey( key ) );
-
-                if ( properties == null ) {
-                    logger.error( "Error deserializing entity with key {} entity probaby doesn't exist, where did this key come from?", key );
-                    continue;
-                }
-
-                UUID id = uuid( properties.get( PROPERTY_UUID ) );
-                String type = string( properties.get( PROPERTY_TYPE ) );
-
-                if ( ( id == null ) || ( type == null ) ) {
-                    logger.error( "Error retrieving entity with key {}, no type or id deseriazable, where did this key come from?", key );
-                    continue;
-                }
-                A entity = EntityFactory.newEntity( id, type, entityClass );
-                entity.setProperties( properties );
-
-                resultSet.put( id, entity );
-            }
-
-            for ( UUID entityId : entityIds ) {
-                A entity = resultSet.get( entityId );
-                if ( entity != null ) {
-                    entities.add( entity );
-                }
-            }
-        }
-
-        return entities;
-    }
-
-
-    @Metered( group = "core", name = "EntityManager_getPropertyNames" )
-    public Set<String> getPropertyNames( EntityRef entity ) throws Exception {
-
-        Set<String> propertyNames = new TreeSet<String>( CASE_INSENSITIVE_ORDER );
-        List<HColumn<String, ByteBuffer>> results =
-                cass.getAllColumns( cass.getApplicationKeyspace( applicationId ), ENTITY_DICTIONARIES,
-                        key( entity.getUuid(), DICTIONARY_PROPERTIES ) );
-        for ( HColumn<String, ByteBuffer> result : results ) {
-            String str = string( result.getName() );
-            if ( str != null ) {
-                propertyNames.add( str );
-            }
-        }
-
-        Set<String> schemaProperties = getDefaultSchema().getPropertyNames( entity.getType() );
-        if ( ( schemaProperties != null ) && !schemaProperties.isEmpty() ) {
-            propertyNames.addAll( schemaProperties );
-        }
-
-        return propertyNames;
-    }
-
-
-    @Metered( group = "core", name = "EntityManager_getDictionaryNames" )
-    public Set<String> getDictionaryNames( EntityRef entity ) throws Exception {
-
-        Set<String> dictionaryNames = new TreeSet<String>( CASE_INSENSITIVE_ORDER );
-        List<HColumn<String, ByteBuffer>> results =
-                cass.getAllColumns( cass.getApplicationKeyspace( applicationId ), ENTITY_DICTIONARIES,
-                        key( entity.getUuid(), DICTIONARY_SETS ) );
-        for ( HColumn<String, ByteBuffer> result : results ) {
-            String str = string( result.getName() );
-            if ( str != null ) {
-                dictionaryNames.add( str );
-            }
-        }
-
-        Set<String> schemaSets = getDefaultSchema().getDictionaryNames( entity.getType() );
-        if ( ( schemaSets != null ) && !schemaSets.isEmpty() ) {
-            dictionaryNames.addAll( schemaSets );
-        }
-
-        return dictionaryNames;
-    }
-
-
-    @Override
-    @Metered( group = "core", name = "EntityManager_getDictionaryElementValue" )
-    public Object getDictionaryElementValue( EntityRef entity, String dictionaryName, String elementName )
-            throws Exception {
-
-        Object value = null;
-
-        ApplicationCF dictionaryCf = null;
-
-        boolean entityHasDictionary = getDefaultSchema().hasDictionary( entity.getType(), dictionaryName );
-
-        if ( entityHasDictionary ) {
-            dictionaryCf = ENTITY_DICTIONARIES;
-        }
-        else {
-            dictionaryCf = ENTITY_COMPOSITE_DICTIONARIES;
-        }
-
-        Class<?> dictionaryCoType = getDefaultSchema().getDictionaryValueType( entity.getType(), dictionaryName );
-        boolean coTypeIsBasic = ClassUtils.isBasicType( dictionaryCoType );
-
-        HColumn<ByteBuffer, ByteBuffer> result =
-                cass.getColumn( cass.getApplicationKeyspace( applicationId ), dictionaryCf,
-                        key( entity.getUuid(), dictionaryName ),
-                        entityHasDictionary ? bytebuffer( elementName ) : DynamicComposite.toByteBuffer( elementName ),
-                        be, be );
-        if ( result != null ) {
-            if ( entityHasDictionary && coTypeIsBasic ) {
-                value = object( dictionaryCoType, result.getValue() );
-            }
-            else if ( result.getValue().remaining() > 0 ) {
-                value = Schema.deserializePropertyValueFromJsonBinary( result.getValue().slice(), dictionaryCoType );
-            }
-        }
-        else {
-            logger.info( "Results of EntityManagerImpl.getDictionaryElementValue is null" );
-        }
-
-        return value;
-    }
-
-
-    @Metered( group = "core", name = "EntityManager_getDictionaryElementValues" )
-    public Map<String, Object> getDictionaryElementValues( EntityRef entity, String dictionaryName,
-                                                           String... elementNames ) throws Exception {
-
-        Map<String, Object> values = null;
-
-        ApplicationCF dictionaryCf = null;
-
-        boolean entityHasDictionary = getDefaultSchema().hasDictionary( entity.getType(), dictionaryName );
-
-        if ( entityHasDictionary ) {
-            dictionaryCf = ENTITY_DICTIONARIES;
-        }
-        else {
-            dictionaryCf = ENTITY_COMPOSITE_DICTIONARIES;
-        }
-
-        Class<?> dictionaryCoType = getDefaultSchema().getDictionaryValueType( entity.getType(), dictionaryName );
-        boolean coTypeIsBasic = ClassUtils.isBasicType( dictionaryCoType );
-
-        ByteBuffer[] columnNames = new ByteBuffer[elementNames.length];
-        for ( int i = 0; i < elementNames.length; i++ ) {
-            columnNames[i] = entityHasDictionary ? bytebuffer( elementNames[i] ) :
-                             DynamicComposite.toByteBuffer( elementNames[i] );
-        }
-
-        ColumnSlice<ByteBuffer, ByteBuffer> results =
-                cass.getColumns( cass.getApplicationKeyspace( applicationId ), dictionaryCf,
-                        key( entity.getUuid(), dictionaryName ), columnNames, be, be );
-        if ( results != null ) {
-            values = new HashMap<String, Object>();
-            for ( HColumn<ByteBuffer, ByteBuffer> result : results.getColumns() ) {
-                String name = entityHasDictionary ? string( result.getName() ) :
-                              DynamicComposite.fromByteBuffer( result.getName() ).get( 0, se );
-                if ( entityHasDictionary && coTypeIsBasic ) {
-                    values.put( name, object( dictionaryCoType, result.getValue() ) );
-                }
-                else if ( result.getValue().remaining() > 0 ) {
-                    values.put( name, Schema.deserializePropertyValueFromJsonBinary( result.getValue().slice(),
-                            dictionaryCoType ) );
-                }
-            }
-        }
-        else {
-            logger.error( "Results of EntityManagerImpl.getDictionaryElementValues is null" );
-        }
-
-        return values;
-    }
-
-
-    /**
-     * Gets the set.
-     *
-     * @param entity The owning entity
-     * @param dictionaryName the dictionary name
-     *
-     * @return contents of dictionary property
-     *
-     * @throws Exception the exception
-     */
-    @Override
-    @Metered( group = "core", name = "EntityManager_getDictionaryAsMap" )
-    public Map<Object, Object> getDictionaryAsMap( EntityRef entity, String dictionaryName ) throws Exception {
-
-        entity = validate( entity );
-
-        Map<Object, Object> dictionary = new LinkedHashMap<Object, Object>();
-
-        ApplicationCF dictionaryCf = null;
-
-        boolean entityHasDictionary = getDefaultSchema().hasDictionary( entity.getType(), dictionaryName );
-
-        if ( entityHasDictionary ) {
-            dictionaryCf = ENTITY_DICTIONARIES;
-        }
-        else {
-            dictionaryCf = ENTITY_COMPOSITE_DICTIONARIES;
-        }
-
-        Class<?> setType = getDefaultSchema().getDictionaryKeyType( entity.getType(), dictionaryName );
-        Class<?> setCoType = getDefaultSchema().getDictionaryValueType( entity.getType(), dictionaryName );
-        boolean coTypeIsBasic = ClassUtils.isBasicType( setCoType );
-
-        List<HColumn<ByteBuffer, ByteBuffer>> results =
-                cass.getAllColumns( cass.getApplicationKeyspace( applicationId ), dictionaryCf,
-                        key( entity.getUuid(), dictionaryName ), be, be );
-        for ( HColumn<ByteBuffer, ByteBuffer> result : results ) {
-            Object name = null;
-            if ( entityHasDictionary ) {
-                name = object( setType, result.getName() );
-            }
-            else {
-                name = CompositeUtils.deserialize( result.getName() );
-            }
-            Object value = null;
-            if ( entityHasDictionary && coTypeIsBasic ) {
-                value = object( setCoType, result.getValue() );
-            }
-            else if ( result.getValue().remaining() > 0 ) {
-                value = Schema.deserializePropertyValueFromJsonBinary( result.getValue().slice(), setCoType );
-            }
-            if ( name != null ) {
-                dictionary.put( name, value );
-            }
-        }
-
-        return dictionary;
-    }
-
-
-    @Override
-    public Set<Object> getDictionaryAsSet( EntityRef entity, String dictionaryName ) throws Exception {
-        return new LinkedHashSet<Object>( getDictionaryAsMap( entity, dictionaryName ).keySet() );
-    }
-
-
-    /**
-     * Update properties.
-     *
-     * @param entityId the entity id
-     * @param properties the properties
-     *
-     * @throws Exception the exception
-     */
-    @Metered( group = "core", name = "EntityManager_updateProperties" )
-    public void updateProperties( UUID entityId, Map<String, Object> properties ) throws Exception {
-
-        EntityRef entity = getRef( entityId );
-        if ( entity == null ) {
-            return;
-        }
-
-        Keyspace ko = cass.getApplicationKeyspace( applicationId );
-        Mutator<ByteBuffer> m = createMutator( ko, be );
-
-        UUID timestampUuid = newTimeUUID();
-        properties.put( PROPERTY_MODIFIED, getTimestampInMillis( timestampUuid ) );
-
-        batchUpdateProperties( m, entity, properties, timestampUuid );
-
-        batchExecute( m, CassandraService.RETRY_COUNT );
-    }
-
-
-    @Metered( group = "core", name = "EntityManager_deleteEntity" )
-    public void deleteEntity( UUID entityId ) throws Exception {
-
-        logger.info( "deleteEntity {} of application {}", entityId, applicationId );
-
-        EntityRef entity = getRef( entityId );
-        if ( entity == null ) {
-            return;
-        }
-
-        logger.info( "deleteEntity: {} is of type {}", entityId, entity.getType() );
-
-        Keyspace ko = cass.getApplicationKeyspace( applicationId );
-        Mutator<ByteBuffer> m = createMutator( ko, be );
-
-        UUID timestampUuid = newTimeUUID();
-        long timestamp = getTimestampInMicros( timestampUuid );
-
-        // get all connections and disconnect them
-        getRelationManager( ref( entityId ) ).batchDisconnect( m, timestampUuid );
-
-        // delete all core properties and any dynamic property that's ever been
-        // dictionary for this entity
-        Set<String> properties = getPropertyNames( entity );
-        if ( properties != null ) {
-            for ( String propertyName : properties ) {
-                m = batchSetProperty( m, entity, propertyName, null, true, false, timestampUuid );
-            }
-        }
-
-        // delete any core dictionaries and dynamic dictionaries associated with
-        // this entity
-        Set<String> dictionaries = getDictionaryNames( entity );
-        if ( dictionaries != null ) {
-            for ( String dictionary : dictionaries ) {
-                Set<Object> values = getDictionaryAsSet( entity, dictionary );
-                if ( values != null ) {
-                    for ( Object value : values ) {
-                        batchUpdateDictionary( m, entity, dictionary, value, true, timestampUuid );
-                    }
-                }
-            }
-        }
-
-        // find all the containing collections
-        getRelationManager( entity ).batchRemoveFromContainers( m, timestampUuid );
-
-        //decrease entity count
-        if ( !TYPE_APPLICATION.equals( entity.getType() ) ) {
-            String collection_name = Schema.defaultCollectionName( entity.getType() );
-            decrementEntityCollection( collection_name );
-        }
-
-
-        timestamp += 1;
-
-        if ( dictionaries != null ) {
-            for ( String dictionary : dictionaries ) {
-
-                ApplicationCF cf =
-                        getDefaultSchema().hasDictionary( entity.getType(), dictionary ) ? ENTITY_DICTIONARIES :
-                        ENTITY_COMPOSITE_DICTIONARIES;
-
-                addDeleteToMutator( m, cf, key( entity.getUuid(), dictionary ), timestamp );
-            }
-        }
-
-        addDeleteToMutator( m, ENTITY_PROPERTIES, key( entityId ), timestamp );
-
-        batchExecute( m, CassandraService.RETRY_COUNT );
-    }
-
-
-    @Override
-    public void delete( EntityRef entityRef ) throws Exception {
-        deleteEntity( entityRef.getUuid() );
-    }
-
-
-    public void batchCreateRole( Mutator<ByteBuffer> batch, UUID groupId, String roleName, String roleTitle,
-                                 long inactivity, RoleRef roleRef, UUID timestampUuid ) throws Exception {
-
-        long timestamp = getTimestampInMicros( timestampUuid );
-
-        if ( roleRef == null ) {
-            roleRef = new SimpleRoleRef( groupId, roleName );
-        }
-        if ( roleTitle == null ) {
-            roleTitle = roleRef.getRoleName();
-        }
-
-        EntityRef ownerRef = null;
-        if ( roleRef.getGroupId() != null ) {
-            ownerRef = new SimpleEntityRef( Group.ENTITY_TYPE, roleRef.getGroupId() );
-        }
-        else {
-            ownerRef = new SimpleEntityRef( Application.ENTITY_TYPE, applicationId );
-        }
-
-        Map<String, Object> properties = new TreeMap<String, Object>( CASE_INSENSITIVE_ORDER );
-        properties.put( PROPERTY_TYPE, Role.ENTITY_TYPE );
-        properties.put( "group", roleRef.getGroupId() );
-        properties.put( PROPERTY_NAME, roleRef.getApplicationRoleName() );
-        properties.put( "roleName", roleRef.getRoleName() );
-        properties.put( "title", roleTitle );
-        properties.put( PROPERTY_INACTIVITY, inactivity );
-
-        Entity role = batchCreate( batch, Role.ENTITY_TYPE, null, properties, roleRef.getUuid(), timestampUuid );
-
-        addInsertToMutator( batch, ENTITY_DICTIONARIES, key( ownerRef.getUuid(), Schema.DICTIONARY_ROLENAMES ),
-                roleRef.getRoleName(), roleTitle, timestamp );
-
-        addInsertToMutator( batch, ENTITY_DICTIONARIES, key( ownerRef.getUuid(), Schema.DICTIONARY_ROLETIMES ),
-                roleRef.getRoleName(), inactivity, timestamp );
-
-        addInsertToMutator( batch, ENTITY_DICTIONARIES, key( ownerRef.getUuid(), DICTIONARY_SETS ),
-                Schema.DICTIONARY_ROLENAMES, null, timestamp );
-
-        if ( roleRef.getGroupId() != null ) {
-            getRelationManager( ownerRef ).batchAddToCollection( batch, COLLECTION_ROLES, role, timestampUuid );
-        }
-    }
-
-
-    @Override
-    public EntityRef getUserByIdentifier( Identifier identifier ) throws Exception {
-        if ( identifier == null ) {
-            return null;
-        }
-        if ( identifier.isUUID() ) {
-            return new SimpleEntityRef( "user", identifier.getUUID() );
-        }
-        if ( identifier.isName() ) {
-            return this.getAlias( applicationId, "user", identifier.getName() );
-        }
-        if ( identifier.isEmail() ) {
-
-            Query query = new Query();
-            query.setEntityType( "user" );
-            query.addEqualityFilter( "email", identifier.getEmail() );
-            query.setLimit( 1 );
-            query.setResultsLevel( REFS );
-
-            Results r = getRelationManager( ref( applicationId ) ).searchCollection( "users", query );
-            if ( r != null && r.getRef() != null ) {
-                return r.getRef();
-            }
-            else {
-                // look-aside as it might be an email in the name field
-                return this.getAlias( applicationId, "user", identifier.getEmail() );
-            }
-        }
-        return null;
-    }
-
-
-    @Override
-    public EntityRef getGroupByIdentifier( Identifier identifier ) throws Exception {
-        if ( identifier == null ) {
-            return null;
-        }
-        if ( identifier.isUUID() ) {
-            return new SimpleEntityRef( "group", identifier.getUUID() );
-        }
-        if ( identifier.isName() ) {
-            return this.getAlias( applicationId, "group", identifier.getName() );
-        }
-        return null;
-    }
-
-
-    @Override
-    public Results getAggregateCounters( UUID userId, UUID groupId, String category, String counterName,
-                                         CounterResolution resolution, long start, long finish, boolean pad ) {
-        return this
-                .getAggregateCounters( userId, groupId, null, category, counterName, resolution, start, finish, pad );
-    }
-
-
-    @Override
-    @Metered( group = "core", name = "EntityManager_getAggregateCounters" )
-    public Results getAggregateCounters( UUID userId, UUID groupId, UUID queueId, String category, String counterName,
-                                         CounterResolution resolution, long start, long finish, boolean pad ) {
-        start = resolution.round( start );
-        finish = resolution.round( finish );
-        long expected_time = start;
-        Keyspace ko = cass.getApplicationKeyspace( applicationId );
-        SliceCounterQuery<String, Long> q = createCounterSliceQuery( ko, se, le );
-        q.setColumnFamily( APPLICATION_AGGREGATE_COUNTERS.toString() );
-        q.setRange( start, finish, false, ALL_COUNT );
-        QueryResult<CounterSlice<Long>> r = q.setKey(
-                counterUtils.getAggregateCounterRow( counterName, userId, groupId, queueId, category, resolution ) )
-                                             .execute();
-        List<AggregateCounter> counters = new ArrayList<AggregateCounter>();
-        for ( HCounterColumn<Long> column : r.get().getColumns() ) {
-            AggregateCounter count = new AggregateCounter( column.getName(), column.getValue() );
-            if ( pad && !( resolution == CounterResolution.ALL ) ) {
-                while ( count.getTimestamp() != expected_time ) {
-                    counters.add( new AggregateCounter( expected_time, 0 ) );
-                    expected_time = resolution.next( expected_time );
-                }
-                expected_time = resolution.next( expected_time );
-            }
-            counters.add( count );
-        }
-        if ( pad && !( resolution == CounterResolution.ALL ) ) {
-            while ( expected_time <= finish ) {
-                counters.add( new AggregateCounter( expected_time, 0 ) );
-                expected_time = resolution.next( expected_time );
-            }
-        }
-        return Results.fromCounters( new AggregateCounterSet( counterName, userId, groupId, category, counters ) );
-    }
-
-
-    @Override
-    @Metered( group = "core", name = "EntityManager_getAggregateCounters_fromQueryObj" )
-    public Results getAggregateCounters( Query query ) throws Exception {
-        CounterResolution resolution = query.getResolution();
-        if ( resolution == null ) {
-            resolution = CounterResolution.ALL;
-        }
-        long start = query.getStartTime() != null ? query.getStartTime() : 0;
-        long finish = query.getFinishTime() != null ? query.getFinishTime() : 0;
-        boolean pad = query.isPad();
-        if ( start <= 0 ) {
-            start = 0;
-        }
-        if ( ( finish <= 0 ) || ( finish < start ) ) {
-            finish = System.currentTimeMillis();
-        }
-        start = resolution.round( start );
-        finish = resolution.round( finish );
-        long expected_time = start;
-
-        if ( pad && ( resolution != CounterResolution.ALL ) ) {
-            long max_counters = ( finish - start ) / resolution.interval();
-            if ( max_counters > 1000 ) {
-                finish = resolution.round( start + ( resolution.interval() * 1000 ) );
-            }
-        }
-
-        List<CounterFilterPredicate> filters = query.getCounterFilters();
-        if ( filters == null ) {
-            return null;
-        }
-        Map<String, AggregateCounterSelection> selections = new HashMap<String, AggregateCounterSelection>();
-        Keyspace ko = cass.getApplicationKeyspace( applicationId );
-
-        for ( CounterFilterPredicate filter : filters ) {
-            AggregateCounterSelection selection =
-                    new AggregateCounterSelection( filter.getName(), getUuid( getUserByIdentifier( filter.getUser() ) ),
-                            getUuid( getGroupByIdentifier( filter.getGroup() ) ),
-                            org.usergrid.mq.Queue.getQueueId( filter.getQueue() ), filter.getCategory() );
-            selections.put( selection.getRow( resolution ), selection );
-        }
-
-        MultigetSliceCounterQuery<String, Long> q = HFactory.createMultigetSliceCounterQuery( ko, se, le );
-        q.setColumnFamily( APPLICATION_AGGREGATE_COUNTERS.toString() );
-        q.setRange( start, finish, false, ALL_COUNT );
-        QueryResult<CounterRows<String, Long>> rows = q.setKeys( selections.keySet() ).execute();
-
-        List<AggregateCounterSet> countSets = new ArrayList<AggregateCounterSet>();
-        for ( CounterRow<String, Long> r : rows.get() ) {
-            expected_time = start;
-            List<AggregateCounter> counters = new ArrayList<AggregateCounter>();
-            for ( HCounterColumn<Long> column : r.getColumnSlice().getColumns() ) {
-                AggregateCounter count = new AggregateCounter( column.getName(), column.getValue() );
-                if ( pad && ( resolution != CounterResolution.ALL ) ) {
-                    while ( count.getTimestamp() != expected_time ) {
-                        counters.add( new AggregateCounter( expected_time, 0 ) );
-                        expected_time = resolution.next( expected_time );
-                    }
-                    expected_time = resolution.next( expected_time );
-                }
-                counters.add( count );
-            }
-            if ( pad && ( resolution != CounterResolution.ALL ) ) {
-                while ( expected_time <= finish ) {
-                    counters.add( new AggregateCounter( expected_time, 0 ) );
-                    expected_time = resolution.next( expected_time );
-                }
-            }
-            AggregateCounterSelection selection = selections.get( r.getKey() );
-            countSets.add( new AggregateCounterSet( selection.getName(), selection.getUserId(), selection.getGroupId(),
-                    selection.getCategory(), counters ) );
-        }
-
-        Collections.sort( countSets, new Comparator<AggregateCounterSet>() {
-            @Override
-            public int compare( AggregateCounterSet o1, AggregateCounterSet o2 ) {
-                String s1 = o1.getName();
-                String s2 = o2.getName();
-                return s1.compareTo( s2 );
-            }
-        } );
-        return Results.fromCounters( countSets );
-    }
-
-
-    @Override
-    @Metered( group = "core", name = "EntityManager_getEntityCounters" )
-    public Map<String, Long> getEntityCounters( UUID entityId ) throws Exception {
-
-        Map<String, Long> counters = new HashMap<String, Long>();
-        Keyspace ko = cass.getApplicationKeyspace( applicationId );
-        SliceCounterQuery<UUID, String> q = createCounterSliceQuery( ko, ue, se );
-        q.setColumnFamily( ENTITY_COUNTERS.toString() );
-        q.setRange( null, null, false, ALL_COUNT );
-        QueryResult<CounterSlice<String>> r = q.setKey( entityId ).execute();
-        for ( HCounterColumn<String> column : r.get().getColumns() ) {
-            counters.put( column.getName(), column.getValue() );
-        }
-        return counters;
-    }
-
-
-    @Override
-    public Map<String, Long> getApplicationCounters() throws Exception {
-        return getEntityCounters( applicationId );
-    }
-
-
-    @Override
-    @Metered( group = "core", name = "EntityManager_createApplicationCollection" )
-    public void createApplicationCollection( String entityType ) throws Exception {
-
-        Keyspace ko = cass.getApplicationKeyspace( applicationId );
-        Mutator<ByteBuffer> m = createMutator( ko, be );
-
-        long timestamp = cass.createTimestamp();
-
-        String collection_name = Schema.defaultCollectionName( entityType );
-        // Add name of collection to dictionary property Application.collections
-        addInsertToMutator( m, ENTITY_DICTIONARIES, key( applicationId, Schema.DICTIONARY_COLLECTIONS ),
-                collection_name, null, timestamp );
-
-        batchExecute( m, CassandraService.RETRY_COUNT );
-    }
-
-
-    @Override
-    public EntityRef getAlias( String aliasType, String alias ) throws Exception {
-        return getAlias( applicationId, aliasType, alias );
-    }
-
-
-    @Override
-    public EntityRef validate( EntityRef entityRef ) throws Exception {
-        return validate( entityRef, true );
-    }
-
-
-    public EntityRef validate( EntityRef entityRef, boolean verify ) throws Exception {
-        if ( ( entityRef == null ) || ( entityRef.getUuid() == null ) ) {
-            return null;
-        }
-        if ( ( entityRef.getType() == null ) || verify ) {
-            UUID entityId = entityRef.getUuid();
-            String entityType = entityRef.getType();
-            try {
-                entityRef = getRef( entityRef.getUuid() );
-            }
-            catch ( Exception e ) {
-                logger.error( "Unable to load entity: {}", new Object[] {entityRef.getUuid(), e} );
-            }
-            if ( entityRef == null ) {
-                throw new EntityNotFoundException( "Entity " + entityId.toString() + " cannot be verified" );
-            }
-            if ( ( entityType != null ) && !entityType.equalsIgnoreCase( entityRef.getType() ) ) {
-                throw new UnexpectedEntityTypeException(
-                        "Entity " + entityId + " is not the expected type, expected " + entityType + ", found "
-                                + entityRef.getType() );
-            }
-        }
-        return entityRef;
-    }
-
-
-    @Override
-    public String getType( UUID entityid ) throws Exception {
-        return getEntityType( entityid );
-    }
-
-
-    public String getType( EntityRef entity ) throws Exception {
-        if ( entity.getType() != null ) {
-            return entity.getType();
-        }
-        return getEntityType( entity.getUuid() );
-    }
-
-
-    @Override
-    public Entity get( UUID entityid ) throws Exception {
-        return getEntity( entityid, null );
-    }
-
-
-    @Override
-    public EntityRef getRef( UUID entityId ) throws Exception {
-        String entityType = getEntityType( entityId );
-        if ( entityType == null ) {
-            logger.warn( "Unable to get type for entity: {} ", new Object[] { entityId, new Exception()} );
-            return null;
-        }
-        return ref( entityType, entityId );
-    }
-
-
-    @Override
-    public Entity get( EntityRef entityRef ) throws Exception {
-        if ( entityRef == null ) {
-            return null;
-        }
-        return getEntity( entityRef.getUuid(), null );
-    }
-
-
-    @Override
-    public <A extends Entity> A get( EntityRef entityRef, Class<A> entityClass ) throws Exception {
-        if ( entityRef == null ) {
-            return null;
-        }
-        return get( entityRef.getUuid(), entityClass );
-    }
-
-
-    @SuppressWarnings( "unchecked" )
-    @Override
-    public <A extends Entity> A get( UUID entityId, Class<A> entityClass ) throws Exception {
-        A e = null;
-        try {
-            e = ( A ) getEntity( entityId, ( Class<Entity> ) entityClass );
-        }
-        catch ( ClassCastException e1 ) {
-            logger.error( "Unable to get typed entity: {} of class {}", new Object[] {entityId, entityClass.getCanonicalName(), e1} );
-        }
-        return e;
-    }
-
-
-    @Override
-    public Results get( Collection<UUID> entityIds, Results.Level resultsLevel ) throws Exception {
-        List<? extends Entity> results = getEntities( entityIds, null );
-        return Results.fromEntities( results );
-    }
-
-
-    /* (non-Javadoc)
-   * @see org.usergrid.persistence.EntityManager#get(java.util.Collection)
-   */
-    @Override
-    public Results get( Collection<UUID> entityIds ) throws Exception {
-        return fromEntities( getEntities( entityIds, null ) );
-    }
-
-
-    @Override
-    public Results get( Collection<UUID> entityIds, Class<? extends Entity> entityClass, Results.Level resultsLevel )
-            throws Exception {
-        return fromEntities( getEntities( entityIds, entityClass ) );
-    }
-
-
-    @Override
-    public Results get( Collection<UUID> entityIds, String entityType, Class<? extends Entity> entityClass,
-                        Results.Level resultsLevel ) throws Exception {
-        return fromEntities( getEntities( entityIds, entityClass ) );
-    }
-
-
-    public Results loadEntities( Results results, Results.Level resultsLevel, int count ) throws Exception {
-        return loadEntities( results, resultsLevel, null, count );
-    }
-
-
-    public Results loadEntities( Results results, Results.Level resultsLevel, Map<UUID, UUID> associatedMap, int count )
-            throws Exception {
-
-        results = results.trim( count );
-        if ( resultsLevel.ordinal() <= results.getLevel().ordinal() ) {
-            return results;
-        }
-
-        results.setEntities( getEntities( results.getIds(), null ) );
-
-        if ( resultsLevel == Results.Level.LINKED_PROPERTIES ) {
-            List<Entity> entities = results.getEntities();
-            BiMap<UUID, UUID> associatedIds = null;
-
-            if ( associatedMap != null ) {
-                associatedIds = HashBiMap.create( associatedMap );
-            }
-            else {
-                associatedIds = HashBiMap.create( entities.size() );
-                for ( Entity entity : entities ) {
-                    Object id = entity.getMetadata( PROPERTY_ASSOCIATED );
-                    if ( id instanceof UUID ) {
-                        associatedIds.put( entity.getUuid(), ( UUID ) id );
-                    }
-                }
-            }
-            List<DynamicEntity> linked = getEntities( new ArrayList<UUID>( associatedIds.values() ), null );
-            for ( DynamicEntity l : linked ) {
-                Map<String, Object> p = l.getDynamicProperties();
-                if ( ( p != null ) && ( p.size() > 0 ) ) {
-                    Entity e = results.getEntitiesMap().get( associatedIds.inverse().get( l.getUuid() ) );
-                    if ( l.getType().endsWith( TYPE_MEMBER ) ) {
-                        e.setProperty( TYPE_MEMBER, p );
-                    }
-                    else if ( l.getType().endsWith( TYPE_CONNECTION ) ) {
-                        e.setProperty( TYPE_CONNECTION, p );
-                    }
-                }
-            }
-        }
-
-        return results;
-    }
-
-
-    @Override
-    public void update( Entity entity ) throws Exception {
-        updateProperties( entity.getUuid(), entity.getProperties() );
-    }
-
-
-    @Override
-    public Object getProperty( EntityRef entityRef, String propertyName ) throws Exception {
-        Entity entity = loadPartialEntity( entityRef.getUuid(), propertyName );
-
-        if ( entity == null ) {
-            return null;
-        }
-
-        return entity.getProperty( propertyName );
-    }
-
-
-    @Override
-    public Map<String, Object> getProperties( EntityRef entityRef ) throws Exception {
-        Entity entity = loadPartialEntity( entityRef.getUuid() );
-        Map<String, Object> props = entity.getProperties();
-        return props;
-    }
-
-
-    @Override
-    public List<Entity> getPartialEntities( Collection<UUID> ids, Collection<String> fields ) throws Exception {
-
-        List<Entity> entities = new ArrayList<Entity>( ids.size() );
-
-        if ( ids == null || ids.size() == 0 ) {
-            return entities;
-        }
-
-        fields.add( PROPERTY_UUID );
-        fields.add( PROPERTY_TYPE );
-
-        Rows<UUID, String, ByteBuffer> results = null;
-
-        results = cass.getRows( cass.getApplicationKeyspace( applicationId ), ENTITY_PROPERTIES, ids, fields, ue, se,
-                be );
-
-        if ( results == null ) {
-            return entities;
-        }
-
-        for ( Row<UUID, String, ByteBuffer> row : results ) {
-
-
-            Map<String, Object> properties =
-                    deserializeEntityProperties( results.getByKey( row.getKey() ).getColumnSlice().getColumns(), true,
-                            false );
-
-            //Could get a tombstoned row if the index is behind, just ignore it
-            if ( properties == null ) {
-                logger.warn( "Received row key {} with no type or properties, skipping", row.getKey() );
-                continue;
-            }
-
-            UUID id = uuid( properties.get( PROPERTY_UUID ) );
-            String type = string( properties.get( PROPERTY_TYPE ) );
-
-            if ( id == null || type == null ) {
-                logger.warn( "Error retrieving entity with key {} no type or id deseriazable, skipping", row.getKey() );
-                continue;
-            }
-
-            Entity entity = EntityFactory.newEntity( id, type );
-            entity.setProperties( properties );
-
-            entities.add( entity );
-        }
-
-
-        return entities;
-    }
-
-
-    @Override
-    public void setProperty( EntityRef entityRef, String propertyName, Object propertyValue ) throws Exception {
-
-        setProperty( entityRef, propertyName, propertyValue, false );
-    }
-
-
-    @Override
-    public void setProperty( EntityRef entityRef, String propertyName, Object propertyValue, boolean override )
-            throws Exception {
-
-        if ( ( propertyValue instanceof String ) && ( ( String ) propertyValue ).equals( "" ) ) {
-            propertyValue = null;
-        }
-
-        // todo: would this ever need to load more?
-        DynamicEntity entity = loadPartialEntity( entityRef.getUuid(), propertyName );
-
-        UUID timestampUuid = newTimeUUID();
-        Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
-
-        propertyValue = getDefaultSchema().validateEntityPropertyValue( entity.getType(), propertyName, propertyValue );
-
-        entity.setProperty( propertyName, propertyValue );
-        batch = batchSetProperty( batch, entity, propertyName, propertyValue, override, false, timestampUuid );
-        batchExecute( batch, CassandraService.RETRY_COUNT );
-    }
-
-
-    @Override
-    public void updateProperties( EntityRef entityRef, Map<String, Object> properties ) throws Exception {
-        entityRef = validate( entityRef );
-        properties = getDefaultSchema().cleanUpdatedProperties( entityRef.getType(), properties, false );
-        updateProperties( entityRef.getUuid(), properties );
-    }
-
-
-    @Override
-    public void addToDictionary( EntityRef entityRef, String dictionaryName, Object elementValue ) throws Exception {
-        addToDictionary( entityRef, dictionaryName, elementValue, null );
-    }
-
-
-    @Override
-    public void addToDictionary( EntityRef entityRef, String dictionaryName, Object elementValue,
-                                 Object elementCoValue ) throws Exception {
-
-        if ( elementValue == null ) {
-            return;
-        }
-
-        EntityRef entity = getRef( entityRef.getUuid() );
-
-        UUID timestampUuid = newTimeUUID();
-        Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
-
-        batch = batchUpdateDictionary( batch, entity, dictionaryName, elementValue, elementCoValue, false,
-                timestampUuid );
-
-        batchExecute( batch, CassandraService.RETRY_COUNT );
-    }
-
-
-    @Override
-    public void addSetToDictionary( EntityRef entityRef, String dictionaryName, Set<?> elementValues )
-            throws Exception {
-
-        if ( ( elementValues == null ) || elementValues.isEmpty() ) {
-            return;
-        }
-
-        EntityRef entity = getRef( entityRef.getUuid() );
-
-        UUID timestampUuid = newTimeUUID();
-        Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
-
-        for ( Object elementValue : elementValues ) {
-            batch = batchUpdateDictionary( batch, entity, dictionaryName, elementValue, null, false, timestampUuid );
-        }
-
-        batchExecute( batch, CassandraService.RETRY_COUNT );
-    }
-
-
-    @Override
-    public void addMapToDictionary( EntityRef entityRef, String dictionaryName, Map<?, ?> elementValues )
-            throws Exception {
-
-        if ( ( elementValues == null ) || elementValues.isEmpty() || entityRef == null ) {
-            return;
-        }
-
-        EntityRef entity = getRef( entityRef.getUuid() );
-
-        UUID timestampUuid = newTimeUUID();
-        Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
-
-        for ( Map.Entry<?, ?> elementValue : elementValues.entrySet() ) {
-            batch = batchUpdateDictionary( batch, entity, dictionaryName, elementValue.getKey(),
-                    elementValue.getValue(), false, timestampUuid );
-        }
-
-        batchExecute( batch, CassandraService.RETRY_COUNT );
-    }
-
-
-    @Override
-    public void removeFromDictionary( EntityRef entityRef, String dictionaryName, Object elementValue )
-            throws Exception {
-
-        if ( elementValue == null ) {
-            return;
-        }
-
-        EntityRef entity = getRef( entityRef.getUuid() );
-
-        UUID timestampUuid = newTimeUUID();
-        Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
-
-        batch = batchUpdateDictionary( batch, entity, dictionaryName, elementValue, true, timestampUuid );
-
-        batchExecute( batch, CassandraService.RETRY_COUNT );
-    }
-
-
-    @Override
-    public Set<String> getDictionaries( EntityRef entity ) throws Exception {
-        return getDictionaryNames( entity );
-    }
-
-
-    @Override
-    public void deleteProperty( EntityRef entityRef, String propertyName ) throws Exception {
-        setProperty( entityRef, propertyName, null );
-    }
-
-
-    @Override
-    public Set<String> getApplicationCollections() throws Exception {
-        Set<String> collections = new TreeSet<String>( CASE_INSENSITIVE_ORDER );
-        Set<String> dynamic_collections = cast( getDictionaryAsSet( getApplicationRef(), DICTIONARY_COLLECTIONS ) );
-        if ( dynamic_collections != null ) {
-            for ( String collection : dynamic_collections ) {
-                if ( !Schema.isAssociatedEntityType( collection ) ) {
-                    collections.add( collection );
-                }
-            }
-        }
-        Set<String> system_collections = getDefaultSchema().getCollectionNames( Application.ENTITY_TYPE );
-        if ( system_collections != null ) {
-            for ( String collection : system_collections ) {
-                if ( !Schema.isAssociatedEntityType( collection ) ) {
-                    collections.add( collection );
-                }
-            }
-        }
-        return collections;
-    }
-
-
-    @Override
-    public long getApplicationCollectionSize( String collectionName ) throws Exception {
-        Long count = null;
-        if ( !Schema.isAssociatedEntityType( collectionName ) ) {
-            Map<String, Long> counts = getApplicationCounters();
-            count = counts.get( new String( APPLICATION_COLLECTION + collectionName ) );
-        }
-        return count != null ? count : 0;
-    }
-
-
-    @Override
-    public Map<String, Object> getApplicationCollectionMetadata() throws Exception {
-        Set<String> collections = getApplicationCollections();
-        Map<String, Long> counts = getApplicationCounters();
-        Map<String, Object> metadata = new HashMap<String, Object>();
-        if ( collections != null ) {
-            for ( String collectionName : collections ) {
-                if ( !Schema.isAssociatedEntityType( collectionName ) ) {
-                    Long count = counts.get( APPLICATION_COLLECTION + collectionName );
-                    /*
-                     * int count = emf .countColumns(
-					 * getApplicationKeyspace(applicationId),
-					 * ApplicationCF.ENTITY_ID_SETS, key(applicationId,
-					 * DICTIONARY_COLLECTIONS, collectionName));
-					 */
-                    Map<String, Object> entry = new HashMap<String, Object>();
-                    entry.put( "count", count != null ? count : 0 );
-                    entry.put( "type", singularize( collectionName ) );
-                    entry.put( "name", collectionName );
-                    entry.put( "title", capitalize( collectionName ) );
-                    metadata.put( collectionName, entry );
-                }
-            }
-        }
-		/*
-		 * if ((counts != null) && !counts.isEmpty()) { metadata.put("counters",
-		 * counts); }
-		 */
-        return metadata;
-    }
-
-
-    public Object getRolePermissionsKey( String roleName ) {
-        return key( getIdFo

<TRUNCATED>