You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/08/17 21:48:42 UTC

[35/38] usergrid git commit: Merge branch 'master' into datastax-cass-driver Fix issue with UniqueValueSerialization backwards compatibility via CQL and legacy Usergrid data.

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
index 5320152,8c1f2d2..0753281
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
@@@ -18,34 -18,40 +18,38 @@@
  package org.apache.usergrid.persistence.collection.serialization.impl;
  
  
++
 +import java.nio.ByteBuffer;
  import java.util.*;
  
 -import com.netflix.astyanax.util.RangeBuilder;
 +import com.datastax.driver.core.*;
 +import com.datastax.driver.core.Row;
 +import com.datastax.driver.core.querybuilder.Clause;
 +import com.datastax.driver.core.querybuilder.QueryBuilder;
 +import com.datastax.driver.core.querybuilder.Using;
 +import org.apache.usergrid.persistence.core.CassandraConfig;
++import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
 +import org.apache.usergrid.persistence.core.datastax.TableDefinition;
 +import org.apache.usergrid.persistence.model.entity.SimpleId;
 +import org.apache.usergrid.persistence.model.field.*;
++
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
 -import org.apache.cassandra.db.marshal.BytesType;
--
  import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
  import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
  import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
  import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
 -import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
 -import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
 -import org.apache.usergrid.persistence.core.astyanax.ColumnParser;
 -import org.apache.usergrid.persistence.core.astyanax.ColumnTypes;
 -import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamily;
 -import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
 -import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
 +import org.apache.usergrid.persistence.core.CassandraFig;
- import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
  import org.apache.usergrid.persistence.core.scope.ApplicationScope;
  import org.apache.usergrid.persistence.core.util.ValidationUtils;
  import org.apache.usergrid.persistence.model.entity.Id;
 -import org.apache.usergrid.persistence.model.field.Field;
  
  import com.google.common.base.Preconditions;
 +
+ import com.netflix.astyanax.ColumnListMutation;
 -import com.netflix.astyanax.Keyspace;
 -import com.netflix.astyanax.MutationBatch;
 -import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 -import com.netflix.astyanax.model.Column;
 -import com.netflix.astyanax.model.ConsistencyLevel;
 -import com.netflix.astyanax.model.Row;
 -import com.netflix.astyanax.query.RowQuery;
++
+ 
  
  /**
   * Reads and writes to UniqueValues column family.
@@@ -53,27 -59,25 +57,29 @@@
  public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
      implements UniqueValueSerializationStrategy {
  
-     private static final Logger log = LoggerFactory.getLogger( UniqueValueSerializationStrategyImpl.class );
+     private static final Logger logger = LoggerFactory.getLogger( UniqueValueSerializationStrategyImpl.class );
  
 +    public static final String UUID_TYPE_REVERSED = "UUIDType(reversed=true)";
  
 -    private final MultiTenantColumnFamily<ScopedRowKey<FieldKey>, EntityVersion>
 -        CF_UNIQUE_VALUES;
  
 +    private final String TABLE_UNIQUE_VALUES;
 +    private final String TABLE_UNIQUE_VALUES_LOG;
 +
 +    private final Map COLUMNS_UNIQUE_VALUES;
 +    private final Map COLUMNS_UNIQUE_VALUES_LOG;
  
 -    private final MultiTenantColumnFamily<ScopedRowKey<EntityKey>, UniqueFieldEntry>
 -        CF_ENTITY_UNIQUE_VALUE_LOG ;
  
      public static final int COL_VALUE = 0x0;
  
+     private final Comparator<UniqueValue> uniqueValueComparator = new UniqueValueComparator();
+ 
  
      private final SerializationFig serializationFig;
 -    protected final Keyspace keyspace;
      private final CassandraFig cassandraFig;
  
 +    private final Session session;
 +    private final CassandraConfig cassandraConfig;
 +
  
      /**
       * Construct serialization strategy for keyspace.
@@@ -186,134 -182,251 +192,285 @@@
          final EntityVersion ev = new EntityVersion( entityId, entityVersion );
          final UniqueFieldEntry uniqueFieldEntry = new UniqueFieldEntry( entityVersion, field );
  
 -        return doWrite( scope, value, new RowOp() {
  
 -            @Override
 -            public void doLookup( final ColumnListMutation<EntityVersion> colMutation ) {
 -                colMutation.deleteColumn( ev );
 -            }
 +        ByteBuffer partitionKey = getPartitionKey( scope.getApplication(), value.getEntityId().getType(),
 +            value.getField().getTypeName().toString(), value.getField().getName(), value.getField().getValue());
  
 +        ByteBuffer columnValue = serializeUniqueValueColumn(ev);
  
 -            @Override
 -            public void doLog( final ColumnListMutation<UniqueFieldEntry> colMutation ) {
 -                colMutation.deleteColumn( uniqueFieldEntry );
 -            }
 -        } );
 -    }
 -
 -
 -    /**
 -     * Do the column update or delete for the given column and row key
 -     *
 -     * @param applicationScope We need to use this when getting the keyspace
 -     * @param uniqueValue The unique value to write
 -     * @param op The operation to write
 -     */
 -    private MutationBatch doWrite( ApplicationScope applicationScope, UniqueValue uniqueValue, RowOp op ) {
 -        final MutationBatch batch = keyspace.prepareMutationBatch();
 +        final Clause uniqueEqKey = QueryBuilder.eq("key", partitionKey );
 +        final Clause uniqueEqColumn = QueryBuilder.eq("column1", columnValue );
 +        Statement uniqueDelete = QueryBuilder.delete().from(TABLE_UNIQUE_VALUES).where(uniqueEqKey).and(uniqueEqColumn);
 +        batch.add(uniqueDelete);
  
 -        final Id applicationId = applicationScope.getApplication();
  
 -        final FieldKey fieldKey = createUniqueValueKey( applicationId, uniqueValue.getEntityId().getType(), uniqueValue.getField() );
  
 +        ByteBuffer logPartitionKey = getLogPartitionKey(scope.getApplication(), entityId);
 +        ByteBuffer logColumnValue = serializeUniqueValueLogColumn(uniqueFieldEntry);
  
 -        op.doLookup( batch.withRow( CF_UNIQUE_VALUES, ScopedRowKey.fromKey( applicationId, fieldKey ) ) );
  
 +        final Clause uniqueLogEqKey = QueryBuilder.eq("key", logPartitionKey );
 +        final Clause uniqueLogEqColumn = QueryBuilder.eq("column1", logColumnValue );
  
 -        final EntityKey entityKey = createEntityUniqueLogKey( applicationId, uniqueValue.getEntityId() );
 +        Statement uniqueLogDelete = QueryBuilder.delete()
 +            .from(TABLE_UNIQUE_VALUES_LOG).where(uniqueLogEqKey).and( uniqueLogEqColumn);
  
 -        op.doLog( batch.withRow( CF_ENTITY_UNIQUE_VALUE_LOG,
 -            ScopedRowKey.fromKey( applicationId, entityKey ) ) );
 +        batch.add(uniqueLogDelete);
  
  
+         if ( logger.isTraceEnabled() ) {
+             logger.trace( "Building batch statement for unique value entity={} version={} name={} value={} ",
 -                uniqueValue.getEntityId().getUuid(), uniqueValue.getEntityVersion(),
 -                uniqueValue.getField().getName(), uniqueValue.getField().getValue() );
++                value.getEntityId().getUuid(), value.getEntityVersion(),
++                value.getField().getName(), value.getField().getValue() );
+         }
+ 
+ 
 +
          return batch;
      }
  
  
      @Override
--    public UniqueValueSet load( final ApplicationScope colScope, final String type, final Collection<Field> fields )
-          {
-         return load( colScope, ConsistencyLevel.valueOf( cassandraFig.getReadCl() ), type, fields );
 -        throws ConnectionException {
 -        return load( colScope, ConsistencyLevel.valueOf( cassandraFig.getReadCL() ), type, fields, false);
++    public UniqueValueSet load( final ApplicationScope colScope, final String type, final Collection<Field> fields ) {
++
++        return load( colScope, ConsistencyLevel.valueOf( cassandraFig.getReadCl() ), type, fields, false );
++
      }
  
+     @Override
+     public UniqueValueSet load( final ApplicationScope colScope, final String type, final Collection<Field> fields,
 -                                boolean useReadRepair)
 -        throws ConnectionException {
 -        return load( colScope, ConsistencyLevel.valueOf( cassandraFig.getReadCL() ), type, fields, useReadRepair);
++                                boolean useReadRepair) {
++
++        return load( colScope, ConsistencyLevel.valueOf( cassandraFig.getReadCl() ), type, fields, useReadRepair);
++
+     }
+ 
+ 
  
      @Override
 -    public UniqueValueSet load(final ApplicationScope appScope, final ConsistencyLevel consistencyLevel,
 -                               final String type, final Collection<Field> fields, boolean useReadRepair) throws ConnectionException {
 +    public UniqueValueSet load( final ApplicationScope appScope,
 +                                final ConsistencyLevel consistencyLevel,
-                                 final String type, final Collection<Field> fields ) {
++                                final String type, final Collection<Field> fields, boolean useReadRepair ) {
++
  
          Preconditions.checkNotNull( fields, "fields are required" );
          Preconditions.checkArgument( fields.size() > 0, "More than 1 field must be specified" );
  
-         return loadCQL(appScope, consistencyLevel, type, fields);
++        return loadCQL(appScope, consistencyLevel, type, fields, useReadRepair);
 +
 +    }
 +
 +
 +    private UniqueValueSet loadCQL( final ApplicationScope appScope,
 +                                    final ConsistencyLevel consistencyLevel,
-                                     final String type, final Collection<Field> fields ) {
++                                    final String type, final Collection<Field> fields, boolean useReadRepair ) {
 +
 +        Preconditions.checkNotNull( fields, "fields are required" );
 +        Preconditions.checkArgument( fields.size() > 0, "More than 1 field must be specified" );
  
 -        final List<ScopedRowKey<FieldKey>> keys = new ArrayList<>( fields.size() );
  
          final Id applicationId = appScope.getApplication();
  
 -        for ( Field field : fields ) {
 +        // row key = app UUID + app type + entityType + field type + field name + field value
  
-         List<ByteBuffer> partitionKeys = new ArrayList<>( fields.size() );
 -            final FieldKey key = createUniqueValueKey( applicationId, type,  field );
+ 
+ 
 -            final ScopedRowKey<FieldKey> rowKey =
 -                ScopedRowKey.fromKey( applicationId, key );
+ 
 -            keys.add( rowKey );
 -        }
++        //List<ByteBuffer> partitionKeys = new ArrayList<>( fields.size() );
+ 
+         final UniqueValueSetImpl uniqueValueSet = new UniqueValueSetImpl( fields.size() );
+ 
 -        Iterator<Row<ScopedRowKey<FieldKey>, EntityVersion>> results =
 -            keyspace.prepareQuery( CF_UNIQUE_VALUES ).setConsistencyLevel( consistencyLevel ).getKeySlice( keys )
 -                .withColumnRange(new RangeBuilder().setLimit(serializationFig.getMaxLoadSize()).build())
 -                .execute().getResult().iterator();
+ 
 -        if( !results.hasNext()){
 -            if(logger.isTraceEnabled()){
 -                logger.trace("No partitions returned for unique value lookup");
 -            }
 -        }
 +        for ( Field field : fields ) {
 +
 +            //log.info(Bytes.toHexString(getPartitionKey(applicationId, type, field.getTypeName().toString(), field.getName(), field.getValue())));
  
-             partitionKeys.add(getPartitionKey(applicationId, type, field.getTypeName().toString(), field.getName(), field.getValue()));
++            //partitionKeys.add(getPartitionKey(applicationId, type, field.getTypeName().toString(), field.getName(), field.getValue()));
  
-         }
 -        while ( results.hasNext() )
++            final Clause inKey = QueryBuilder.in("key", getPartitionKey(applicationId, type,
++                field.getTypeName().toString(), field.getName(), field.getValue()) );
  
-         final UniqueValueSetImpl uniqueValueSet = new UniqueValueSetImpl( fields.size() );
 -        {
++            final Statement statement = QueryBuilder.select().all().from(TABLE_UNIQUE_VALUES)
++                .where(inKey)
++                .setConsistencyLevel(consistencyLevel);
  
-         final Clause inKey = QueryBuilder.in("key", partitionKeys );
 -            final Row<ScopedRowKey<FieldKey>, EntityVersion> unique = results.next();
++            final ResultSet resultSet = session.execute(statement);
  
-         final Statement statement = QueryBuilder.select().all().from(TABLE_UNIQUE_VALUES)
-             .where(inKey)
-             .setConsistencyLevel(consistencyLevel);
 -            final Field field = parseRowKey( unique.getKey() );
  
-         final ResultSet resultSet = session.execute(statement);
 -            final Iterator<Column<EntityVersion>> columnList = unique.getColumns().iterator();
++            Iterator<com.datastax.driver.core.Row> results = resultSet.iterator();
  
 -            //sanity check, nothing to do, skip it
 -            if ( !columnList.hasNext() ) {
++            if( !results.hasNext()){
+                 if(logger.isTraceEnabled()){
 -                    logger.trace("No cells exist in partition for unique value [{}={}]",
 -                        field.getName(), field.getValue().toString());
++                    logger.trace("No rows returned for unique value lookup of field: {}", field);
+                 }
 -                continue;
+             }
  
-         Iterator<com.datastax.driver.core.Row> results = resultSet.iterator();
  
+             List<UniqueValue> candidates = new ArrayList<>();
  
-         while( results.hasNext() ){
 -            /**
 -             *  While iterating the columns, a rule is being enforced to only EVER return the oldest UUID.  This means
 -             *  the UUID with the oldest timestamp ( it was the original entity written for the unique value ).
 -             *
 -             *  We do this to prevent cycling of unique value -> entity UUID mappings as this data is ordered by the
 -             *  entity's version and not the entity's timestamp itself.
 -             *
 -             *  If newer entity UUIDs are encountered, they are removed from the unique value tables, however their
 -             *  backing serialized entity data is left in tact in case a cleanup / audit is later needed.
 -             */
 -            while (columnList.hasNext()) {
++            while( results.hasNext() ){
 +
-             final com.datastax.driver.core.Row unique = results.next();
-             ByteBuffer partitionKey = unique.getBytes("key");
-             ByteBuffer column = unique.getBytesUnsafe("column1");
++                final com.datastax.driver.core.Row unique = results.next();
++                ByteBuffer partitionKey = unique.getBytes("key");
++                ByteBuffer column = unique.getBytesUnsafe("column1");
 +
-             List<Object> keyContents = deserializePartitionKey(partitionKey);
-             List<Object> columnContents = deserializeUniqueValueColumn(column);
++                List<Object> keyContents = deserializePartitionKey(partitionKey);
++                List<Object> columnContents = deserializeUniqueValueColumn(column);
 +
-             FieldTypeName fieldType;
-             String name;
-             String value;
-             if(this instanceof UniqueValueSerializationStrategyV2Impl) {
++                FieldTypeName fieldType;
++                String name;
++                String value;
++                if(this instanceof UniqueValueSerializationStrategyV2Impl) {
 +
-                  fieldType = FieldTypeName.valueOf((String) keyContents.get(3));
-                  name = (String) keyContents.get(4);
-                  value = (String) keyContents.get(5);
 +
-             }else{
++                    fieldType = FieldTypeName.valueOf((String) keyContents.get(3));
++                    name = (String) keyContents.get(4);
++                    value = (String) keyContents.get(5);
 +
-                 fieldType = FieldTypeName.valueOf((String) keyContents.get(5));
-                 name = (String) keyContents.get(6);
-                 value = (String) keyContents.get(7);
++                }else{
 +
-             }
 +
-             Field field = getField(name, value, fieldType);
++                    fieldType = FieldTypeName.valueOf((String) keyContents.get(5));
++                    name = (String) keyContents.get(6);
++                    value = (String) keyContents.get(7);
++
++
++                }
++
++                Field returnedField = getField(name, value, fieldType);
++
++
++                final EntityVersion entityVersion = new EntityVersion(
++                    new SimpleId((UUID)columnContents.get(1), (String)columnContents.get(2)), (UUID)columnContents.get(0));
++//            //sanity check, nothing to do, skip it
++//            if ( !columnList.hasNext() ) {
++//                if(logger.isTraceEnabled()){
++//                    logger.trace("No cells exist in partition for unique value [{}={}]",
++//                        field.getName(), field.getValue().toString());
++//                }
++//                continue;
++//            }
++
++
++
++
++                /**
++                 *  While iterating the rows, a rule is enforced to only EVER return the oldest UUID for the field.
++                 *  This means the UUID with the oldest timestamp ( it was the original entity written for
++                 *  the unique value ).
++                 *
++                 *  We do this to prevent cycling of unique value -> entity UUID mappings as this data is ordered by the
++                 *  entity's version and not the entity's timestamp itself.
++                 *
++                 *  If newer entity UUIDs are encountered, they are removed from the unique value tables, however their
++                 *  backing serialized entity data is left in tact in case a cleanup / audit is later needed.
++                 */
+ 
 -                final EntityVersion entityVersion = columnList.next().getName();
+ 
+                 final UniqueValue uniqueValue =
 -                    new UniqueValueImpl(field, entityVersion.getEntityId(), entityVersion.getEntityVersion());
++                    new UniqueValueImpl(returnedField, entityVersion.getEntityId(), entityVersion.getEntityVersion());
+ 
+                 // set the initial candidate and move on
+                 if (candidates.size() == 0) {
+                     candidates.add(uniqueValue);
  
+                     if (logger.isTraceEnabled()) {
+                         logger.trace("First entry for unique value [{}={}] found for application [{}], adding " +
+                                 "entry with entity id [{}] and entity version [{}] to the candidate list and continuing",
 -                            field.getName(), field.getValue().toString(), applicationId.getType(),
++                            returnedField.getName(), returnedField.getValue().toString(), applicationId.getType(),
+                             uniqueValue.getEntityId().getUuid(), uniqueValue.getEntityVersion());
+                     }
  
-             final EntityVersion entityVersion = new EntityVersion(
-                 new SimpleId((UUID)columnContents.get(1), (String)columnContents.get(2)), (UUID)columnContents.get(0));
+                     continue;
+                 }
  
+                 if(!useReadRepair){
  
-             final UniqueValueImpl uniqueValue =
-               new UniqueValueImpl( field, entityVersion.getEntityId(), entityVersion.getEntityVersion() );
+                     // take only the first
+                     if (logger.isTraceEnabled()) {
+                         logger.trace("Read repair not enabled for this request of unique value [{}={}], breaking out" +
 -                            " of cell loop", field.getName(), field.getValue().toString());
++                            " of cell loop", returnedField.getName(), returnedField.getValue().toString());
+                     }
+                     break;
+ 
+                 } else {
+ 
+ 
+                     final int result = uniqueValueComparator.compare(uniqueValue, candidates.get(candidates.size() - 1));
+ 
+                     if (result == 0) {
+ 
+                         // do nothing, only versions can be newer and we're not worried about newer versions of same entity
+                         if (logger.isTraceEnabled()) {
+                             logger.trace("Current unique value [{}={}] entry has UUID [{}] equal to candidate UUID [{}]",
 -                                field.getName(), field.getValue().toString(), uniqueValue.getEntityId().getUuid(),
++                                returnedField.getName(), returnedField.getValue().toString(), uniqueValue.getEntityId().getUuid(),
+                                 candidates.get(candidates.size() -1));
+                         }
+ 
+                         // update candidate w/ latest version
+                         candidates.add(uniqueValue);
+ 
+                     } else if (result < 0) {
+ 
+                         // delete the duplicate from the unique value index
+                         candidates.forEach(candidate -> {
+ 
 -                            try {
 -
 -                                logger.warn("Duplicate unique value [{}={}] found for application [{}], removing newer " +
 -                                        "entry with entity id [{}] and entity version [{}]", field.getName(),
 -                                    field.getValue().toString(), applicationId.getUuid(),
 -                                    candidate.getEntityId().getUuid(), candidate.getEntityVersion());
 -
 -                                delete(appScope, candidate).execute();
++                            logger.warn("Duplicate unique value [{}={}] found for application [{}], removing newer " +
++                                    "entry with entity id [{}] and entity version [{}]", returnedField.getName(),
++                                returnedField.getValue().toString(), applicationId.getUuid(),
++                                candidate.getEntityId().getUuid(), candidate.getEntityVersion());
+ 
 -                            } catch (ConnectionException e) {
 -                               logger.error( "Unable to connect to cassandra during duplicate repair of [{}={}]",
 -                                   field.getName(), field.getValue().toString() );
 -                            }
++                            session.execute(deleteCQL(appScope, candidate));
+ 
+                         });
+ 
+                         // clear the transient candidates list
+                         candidates.clear();
+ 
+                         if (logger.isTraceEnabled()) {
+                             logger.trace("Updating candidate unique value [{}={}] to entity id [{}] and " +
 -                                "entity version [{}]", field.getName(), field.getValue().toString(),
++                                    "entity version [{}]", returnedField.getName(), returnedField.getValue().toString(),
+                                 uniqueValue.getEntityId().getUuid(), uniqueValue.getEntityVersion());
+ 
+                         }
+ 
+                         // add our new candidate to the list
+                         candidates.add(uniqueValue);
+ 
+ 
+                     } else {
+ 
+                         logger.warn("Duplicate unique value [{}={}] found for application [{}], removing newer entry " +
 -                                "with entity id [{}] and entity version [{}].", field.getName(), field.getValue().toString(),
++                                "with entity id [{}] and entity version [{}].", returnedField.getName(), returnedField.getValue().toString(),
+                             applicationId.getUuid(), uniqueValue.getEntityId().getUuid(), uniqueValue.getEntityVersion());
+ 
+                         // delete the duplicate from the unique value index
 -                        delete(appScope, uniqueValue).execute();
++                        session.execute(deleteCQL(appScope, uniqueValue));
+ 
+ 
+                     }
+ 
+                 }
 -            }
+ 
 -            // take the last candidate ( should be the latest version) and add to the result set
++            }
+ 
 -            final UniqueValue returnValue = candidates.get(candidates.size() -1);
 -            if(logger.isTraceEnabled()){
 -                logger.trace("Adding unique value [{}={}] with entity id [{}] and entity version [{}] to response set",
 -                    returnValue.getField().getName(), returnValue.getField().getValue().toString(),
 -                    returnValue.getEntityId().getUuid(), returnValue.getEntityVersion());
++            if ( candidates.size() > 0 ) {
++                // take the last candidate ( should be the latest version) and add to the result set
++                final UniqueValue returnValue = candidates.get(candidates.size() - 1);
++                if (logger.isTraceEnabled()) {
++                    logger.trace("Adding unique value [{}={}] with entity id [{}] and entity version [{}] to response set",
++                        returnValue.getField().getName(), returnValue.getField().getValue().toString(),
++                        returnValue.getEntityId().getUuid(), returnValue.getEntityVersion());
++                }
++                uniqueValueSet.addValue(returnValue);
+             }
 -            uniqueValueSet.addValue(returnValue);
 +
-             uniqueValueSet.addValue(uniqueValue);
  
          }
  
++
          return uniqueValueSet;
 +
      }
  
  
@@@ -366,103 -471,101 +523,127 @@@
  
  
      /**
 -     * Converts raw columns to the expected output
 +     * Get the CQL table definition for the unique values log table
       */
 -    private static final class UniqueEntryParser implements ColumnParser<UniqueFieldEntry, UniqueValue> {
 +    protected abstract TableDefinition getEntityUniqueLogTable();
 +
  
 +    public class AllUniqueFieldsIterator implements Iterable<UniqueValue>, Iterator<UniqueValue> {
 +
 +        private final Session session;
 +        private final Statement query;
          private final Id entityId;
  
 +        private Iterator<Row> sourceIterator;
  
 -        private UniqueEntryParser( final Id entityId ) {this.entityId = entityId;}
  
  
 -        @Override
 -        public UniqueValue parseColumn( final Column<UniqueFieldEntry> column ) {
 -            final UniqueFieldEntry entry = column.getName();
 +        public AllUniqueFieldsIterator( final Session session, final Statement query, final Id entityId){
 +
 +            this.session = session;
 +            this.query = query;
 +            this.entityId = entityId;
  
 -            return new UniqueValueImpl( entry.getField(), entityId, entry.getVersion() );
          }
 -    }
  
  
 -    @Override
 -    public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() {
 +        @Override
 +        public Iterator<UniqueValue> iterator() {
 +            return this;
 +        }
  
 -        final MultiTenantColumnFamilyDefinition uniqueLookupCF =
 -            new MultiTenantColumnFamilyDefinition( CF_UNIQUE_VALUES, BytesType.class.getSimpleName(),
 -                ColumnTypes.DYNAMIC_COMPOSITE_TYPE, BytesType.class.getSimpleName(),
 -                MultiTenantColumnFamilyDefinition.CacheOption.KEYS );
 +        @Override
 +        public boolean hasNext() {
  
 -        final MultiTenantColumnFamilyDefinition uniqueLogCF =
 -            new MultiTenantColumnFamilyDefinition( CF_ENTITY_UNIQUE_VALUE_LOG, BytesType.class.getSimpleName(),
 -                ColumnTypes.DYNAMIC_COMPOSITE_TYPE, BytesType.class.getSimpleName(),
 -                MultiTenantColumnFamilyDefinition.CacheOption.KEYS );
 +            if ( sourceIterator == null ) {
  
 -        return Arrays.asList( uniqueLookupCF, uniqueLogCF );
 -    }
 +                advanceIterator();
  
 +                return sourceIterator.hasNext();
 +            }
  
 -    /**
 -     * Get the column family for the unique fields
 -     */
 -    protected abstract MultiTenantColumnFamily<ScopedRowKey<FieldKey>, EntityVersion> getUniqueValuesCF();
 +            return sourceIterator.hasNext();
 +        }
  
 +        @Override
 +        public UniqueValue next() {
  
 -    /**
 -     * Generate a key that is compatible with the column family
 -     *
 -     * @param applicationId The applicationId
 -     * @param type The type in the field
 -     * @param field The field we're creating the key for
 -     */
 -    protected abstract FieldKey createUniqueValueKey(final Id applicationId, final String type, final Field field );
 +            com.datastax.driver.core.Row next = sourceIterator.next();
  
 -    /**
 -     * Parse the row key into the field
 -     * @param rowKey
 -     * @return
 -     */
 -    protected abstract Field parseRowKey(final ScopedRowKey<FieldKey> rowKey);
 +            ByteBuffer column = next.getBytesUnsafe("column1");
  
 +            List<Object> columnContents = deserializeUniqueValueLogColumn(column);
  
 -    /**
 -     * Get the column family for the unique field CF
 -     */
 -    protected abstract MultiTenantColumnFamily<ScopedRowKey<EntityKey>, UniqueFieldEntry> getEntityUniqueLogCF();
 +            UUID version = (UUID) columnContents.get(0);
 +            String name = (String) columnContents.get(1);
 +            String value = (String) columnContents.get(2);
 +            FieldTypeName fieldType = FieldTypeName.valueOf((String) columnContents.get(3));
  
 -    /**
 -     * Generate a key that is compatible with the column family
 -     *
 -     * @param applicationId The applicationId
 -     * @param uniqueValueId The uniqueValue
 -     */
 -    protected abstract EntityKey createEntityUniqueLogKey(final Id applicationId,  final Id uniqueValueId );
  
 +            return new UniqueValueImpl(getField(name, value, fieldType), entityId, version);
 +
 +        }
 +
 +        private void advanceIterator() {
 +
 +            sourceIterator = session.execute(query).iterator();
 +        }
 +    }
 +
 +    private Field getField( String name, String value, FieldTypeName fieldType){
 +
 +        Field field = null;
 +
 +        switch ( fieldType ) {
 +            case BOOLEAN:
 +                field = new BooleanField( name, Boolean.parseBoolean( value ) );
 +                break;
 +            case DOUBLE:
 +                field = new DoubleField( name, Double.parseDouble( value ) );
 +                break;
 +            case FLOAT:
 +                field = new FloatField( name, Float.parseFloat( value ) );
 +                break;
 +            case INTEGER:
 +                field =  new IntegerField( name, Integer.parseInt( value ) );
 +                break;
 +            case LONG:
 +                field = new LongField( name, Long.parseLong( value ) );
 +                break;
 +            case STRING:
 +                field = new StringField( name, value );
 +                break;
 +            case UUID:
 +                field = new UUIDField( name, UUID.fromString( value ) );
 +                break;
 +        }
 +
 +        return field;
 +
 +    }
  
+ 
+     private class UniqueValueComparator implements Comparator<UniqueValue> {
+ 
+         @Override
+         public int compare(UniqueValue o1, UniqueValue o2) {
+ 
+             if( o1.getEntityId().getUuid().equals(o2.getEntityId().getUuid())){
+ 
+                 return 0;
+ 
+             }else if( o1.getEntityId().getUuid().timestamp() < o2.getEntityId().getUuid().timestamp()){
+ 
+                 return -1;
+ 
+             }
+ 
+             // if the UUIDs are not equal and o1's timestamp is not less than o2's timestamp,
+             // then o1 must be greater than o2
+             return 1;
+ 
+ 
+         }
+     }
+ 
  }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java
index 4b5653f,f971b23..61f0f80
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java
@@@ -91,10 -113,23 +91,24 @@@ public class UniqueValueSerializationSt
          return migration.to.load( applicationScope, type, fields );
      }
  
+     @Override
+     public UniqueValueSet load( final ApplicationScope applicationScope, final String type,
 -                                final Collection<Field> fields, boolean useReadRepair ) throws ConnectionException {
++                                final Collection<Field> fields, boolean useReadRepair ) {
+ 
+         final MigrationRelationship<UniqueValueSerializationStrategy> migration = getMigrationRelationShip();
+ 
+         if ( migration.needsMigration() ) {
+             return migration.from.load( applicationScope, type, fields, useReadRepair );
+         }
+ 
+         return migration.to.load( applicationScope, type, fields, useReadRepair );
+     }
+ 
  
      @Override
 -    public UniqueValueSet load(final ApplicationScope applicationScope, final ConsistencyLevel consistencyLevel,
 -                               final String type, final Collection<Field> fields, boolean useReadRepair) throws ConnectionException {
 +    public UniqueValueSet load( final ApplicationScope applicationScope, final ConsistencyLevel consistencyLevel,
-                                 final String type, final Collection<Field> fields ) {
++                                final String type, final Collection<Field> fields, boolean useReadRepair ) {
++
  
          final MigrationRelationship<UniqueValueSerializationStrategy> migration = getMigrationRelationShip();
  

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java
index d305044,6a1cb58..55ba011
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java
@@@ -133,280 -114,40 +133,286 @@@ public class UniqueValueSerializationSt
  
  
      @Override
 -    protected CollectionPrefixedKey<Field> createUniqueValueKey( final Id applicationId,
 -                                                                 final String type, final Field field) {
 +    protected List<Object> deserializePartitionKey(ByteBuffer bb){
 +
 +
 +        /**
 +         *  List<Object> keys = new ArrayList<>(8);
 +            keys.add(0, appUUID);
 +            keys.add(1, applicationType);
 +            keys.add(2, appUUID);
 +            keys.add(3, applicationType);
 +            keys.add(4, entityType);
 +            keys.add(5, fieldType);
 +            keys.add(6, fieldName);
 +            keys.add(7, fieldValueString);
 +
 +         */
 +
 +        int count = 0;
 +        List<Object> stuff = new ArrayList<>();
 +        while(bb.hasRemaining()){
 +            ByteBuffer data = CQLUtils.getWithShortLength(bb);
 +            if(count == 0 || count == 2){
 +                stuff.add(DataType.uuid().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED));
 +            }else{
 +                stuff.add(DataType.text().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED));
 +            }
 +            byte equality = bb.get(); // we don't use this but take the equality byte off the buffer
 +            count++;
 +        }
 +
 +        return stuff;
 +
 +    }
 +
 +    @Override
 +    protected ByteBuffer serializeUniqueValueLogColumn(UniqueFieldEntry fieldEntry){
 +
 +        /**
 +         *  final UUID version = value.getVersion();
 +            final Field<?> field = value.getField();
 +
 +             final FieldTypeName fieldType = field.getTypeName();
 +             final String fieldValue = field.getValue().toString().toLowerCase();
 +
 +
 +             DynamicComposite composite = new DynamicComposite(  );
 +
 +             //we want to sort ascending to descending by version
 +             composite.addComponent( version,  UUID_SERIALIZER, ColumnTypes.UUID_TYPE_REVERSED);
 +             composite.addComponent( field.getName(), STRING_SERIALIZER );
 +             composite.addComponent( fieldValue, STRING_SERIALIZER );
 +             composite.addComponent( fieldType.name() , STRING_SERIALIZER);
 +         */
 +
 +        // values are serialized as strings, not sure why, and always lower cased
 +        String fieldValueString = fieldEntry.getField().getValue().toString().toLowerCase();
 +
 +
 +        List<Object> keys = new ArrayList<>(4);
 +        keys.add(fieldEntry.getVersion());
 +        keys.add(fieldEntry.getField().getName());
 +        keys.add(fieldValueString);
 +        keys.add(fieldEntry.getField().getTypeName().name());
 +
 +        String comparator = UUID_TYPE_REVERSED;
 +
 +        int size = 16+fieldEntry.getField().getName().length()+fieldEntry.getField().getValue().toString().length()+
 +            fieldEntry.getField().getTypeName().name().length();
 +
 +        // we always need to add length for the 2 byte comparator short,  2 byte length short and 1 byte equality
 +        size += keys.size()*5;
 +
 +        // uuid type comparator is longest, ensure we allocate buffer using the max size to avoid overflow
 +        size += keys.size()*comparator.length();
 +
 +        ByteBuffer stuff = ByteBuffer.allocate(size);
 +
 +
 +        for (Object key : keys) {
  
 +            if(key.equals(fieldEntry.getVersion())) {
 +                int p = comparator.indexOf("(reversed=true)");
 +                boolean desc = false;
 +                if (p >= 0) {
 +                    comparator = comparator.substring(0, p);
 +                    desc = true;
 +                }
  
 -        final String collectionName = LegacyScopeUtils.getCollectionScopeNameFromEntityType( type );
 +                byte a = (byte) 85; // this is the byte value for UUIDType in astyanax used in legacy data
 +                if (desc) {
 +                    a = (byte) Character.toUpperCase((char) a);
 +                }
  
 +                stuff.putShort((short) ('\u8000' | a));
 +            }else{
 +                comparator = "UTF8Type"; // only strings are being serialized other than UUIDs here
 +                stuff.putShort((short)comparator.length());
 +                stuff.put(DataType.serializeValue(comparator, ProtocolVersion.NEWEST_SUPPORTED));
 +            }
  
 -        final CollectionPrefixedKey<Field> uniquePrefixedKey =
 -            new CollectionPrefixedKey<>( collectionName, applicationId, field );
 +            ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
 +            if (kb == null) {
 +                kb = ByteBuffer.allocate(0);
 +            }
 +
 +            // put a short that indicates how big the buffer is for this item
 +            stuff.putShort((short) kb.remaining());
 +
 +            // put the actual item
 +            stuff.put(kb.slice());
 +
 +            // put an equality byte ( again not used by part of legacy thrift Astyanax schema)
 +            stuff.put((byte) 0);
 +
 +
 +        }
 +
 +        stuff.flip();
 +        return stuff.duplicate();
  
 -        return uniquePrefixedKey;
      }
  
 +    @Override
 +    protected ByteBuffer getPartitionKey(Id applicationId, String entityType, String fieldType, String fieldName, Object fieldValue ){
 +
 +        return serializeKey(applicationId.getUuid(), applicationId.getType(),
 +            entityType, fieldType, fieldName, fieldValue);
 +
 +    }
 +
 +    @Override
 +    protected ByteBuffer getLogPartitionKey(final Id applicationId, final Id uniqueValueId){
 +
 +        return serializeLogKey(applicationId.getUuid(), applicationId.getType(),
 +            uniqueValueId.getUuid(), uniqueValueId.getType());
 +
 +    }
  
      @Override
 -    protected Field parseRowKey( final ScopedRowKey<CollectionPrefixedKey<Field>> rowKey ) {
 -        return rowKey.getKey().getSubKey();
 +    protected ByteBuffer serializeUniqueValueColumn(EntityVersion entityVersion){
 +
 +        /**
 +         *  final Id entityId = ev.getEntityId();
 +            final UUID entityUuid = entityId.getUuid();
 +            final String entityType = entityId.getType();
 +
 +            CompositeBuilder builder = Composites.newDynamicCompositeBuilder();
 +
 +            builder.addUUID( entityVersion );
 +            builder.addUUID( entityUuid );
 +            builder.addString(entityType );
 +         */
 +
 +        String comparator = "UTF8Type";
 +
 +        List<Object> keys = new ArrayList<>(3);
 +        keys.add(entityVersion.getEntityVersion());
 +        keys.add(entityVersion.getEntityId().getUuid());
 +        keys.add(entityVersion.getEntityId().getType());
 +
 +        // UUIDs are 16 bytes
 +        int size = 16+16+entityVersion.getEntityId().getType().length();
 +
 +        // we always need to add length for the 2 byte comparator short,  2 byte length short and 1 byte equality
 +        size += keys.size()*5;
 +
 +        // we always add comparator to the buffer as well
 +        size += keys.size()*comparator.length();
 +
 +        ByteBuffer stuff = ByteBuffer.allocate(size);
 +
 +        for (Object key : keys) {
 +
 +            if(key instanceof UUID){
 +                comparator = "UUIDType";
 +            }else{
 +                comparator = "UTF8Type"; // if it's not a UUID, the only other thing we're serializing is text
 +            }
 +
 +            stuff.putShort((short)comparator.length());
 +            stuff.put(DataType.serializeValue(comparator, ProtocolVersion.NEWEST_SUPPORTED));
 +
 +            ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
 +            if (kb == null) {
 +                kb = ByteBuffer.allocate(0);
 +            }
 +
 +            // put a short that indicates how big the buffer is for this item
 +            stuff.putShort((short) kb.remaining());
 +
 +            // put the actual item
 +            stuff.put(kb.slice());
 +
 +            // put an equality byte ( again not used by part of legacy thrift Astyanax schema)
 +            stuff.put((byte) 0);
 +
 +
 +        }
 +
 +        stuff.flip();
 +        return stuff.duplicate();
 +
      }
  
 +    @Override
 +    protected List<Object> deserializeUniqueValueColumn(ByteBuffer bb){
 +
 +        List<Object> stuff = new ArrayList<>();
 +        int count = 0;
 +        while(bb.hasRemaining()){
 +
-             // custom columns have a short at beginning for comparator (which we don't use here )
-             ByteBuffer comparator = CQLUtils.getWithShortLength(bb);
++            // pull of custom comparator (per Astyanax deserialize)
++            int e = CQLUtils.getShortLength(bb);
++            if((e & '\u8000') == 0) {
++                CQLUtils.getBytes(bb, e);
++            } else {
++                // do nothing
++            }
 +
 +            ByteBuffer data = CQLUtils.getWithShortLength(bb);
 +
 +
 +            // first two composites are UUIDs, rest are strings
 +            if(count == 0) {
 +                stuff.add(new UUID(data.getLong(), data.getLong()));
 +            }else if(count ==1){
 +                stuff.add(new UUID(data.getLong(), data.getLong()));
 +            }else{
 +                stuff.add(DataType.text().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED));
 +            }
 +
 +            byte equality = bb.get(); // we don't use this but take the equality byte off the buffer
 +
 +            count++;
 +        }
 +
 +        return stuff;
 +
 +    }
  
      @Override
 -    protected CollectionPrefixedKey<Id> createEntityUniqueLogKey( final Id applicationId,
 -                                                                  final Id uniqueValueId ) {
 +    protected List<Object> deserializeUniqueValueLogColumn(ByteBuffer bb){
 +
 +
 +        /**
 +         *  List<Object> keys = new ArrayList<>(4);
 +            keys.add(fieldEntry.getVersion());
 +            keys.add(fieldEntry.getField().getName());
 +            keys.add(fieldValueString);
 +            keys.add(fieldEntry.getField().getTypeName().name());
 +         */
  
 +        List<Object> stuff = new ArrayList<>();
 +        int count = 0;
 +        while(bb.hasRemaining()){
  
-             // the comparator info is different for the UUID reversed type vs. UTF8 type
-             if(count ==0){
-                 bb.getShort(); // take the reversed comparator byte off
-             }else {
-                 ByteBuffer comparator = CQLUtils.getWithShortLength(bb);
 -        final String collectionName = LegacyScopeUtils.getCollectionScopeNameFromEntityType( uniqueValueId.getType() );
++            // pull of custom comparator (per Astyanax deserialize)
++            int e = CQLUtils.getShortLength(bb);
++            if((e & '\u8000') == 0) {
++                CQLUtils.getBytes(bb, e);
++            } else {
++                // do nothing
 +            }
  
 +            ByteBuffer data = CQLUtils.getWithShortLength(bb);
  
 -        final CollectionPrefixedKey<Id> collectionPrefixedEntityKey =
 -            new CollectionPrefixedKey<>( collectionName, applicationId, uniqueValueId );
  
 +            // first composite is a UUID, rest are strings
 +            if(count == 0) {
 +                stuff.add(new UUID(data.getLong(), data.getLong()));
 +            }else{
 +                stuff.add(DataType.text().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED));
 +            }
  
 +            byte equality = bb.get(); // we don't use this but take the equality byte off the buffer
 +
 +            count++;
 +        }
 +
 +        return stuff;
  
 -        return collectionPrefixedEntityKey;
      }
  
  

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java
index 6ea5c1e,40622a4..92c0a5b
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java
@@@ -20,20 -20,21 +20,21 @@@
  package org.apache.usergrid.persistence.collection.serialization.impl;
  
  
 -import java.util.Arrays;
 -import java.util.Collection;
 +import java.nio.ByteBuffer;
 +import java.util.*;
  
 -import org.apache.cassandra.db.marshal.BytesType;
 +import com.datastax.driver.core.DataType;
 +import com.datastax.driver.core.ProtocolVersion;
 +import com.datastax.driver.core.Session;
  
++import org.apache.cassandra.utils.ByteBufferUtil;
  import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 -import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
 -import org.apache.usergrid.persistence.core.astyanax.ColumnTypes;
 -import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
 -import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamily;
 +import org.apache.usergrid.persistence.core.CassandraConfig;
 +import org.apache.usergrid.persistence.core.CassandraFig;
  import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
 -import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
 -import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
 +import org.apache.usergrid.persistence.core.datastax.CQLUtils;
 +import org.apache.usergrid.persistence.core.datastax.TableDefinition;
  import org.apache.usergrid.persistence.model.entity.Id;
 -import org.apache.usergrid.persistence.model.field.Field;
  
  import com.google.inject.Inject;
  import com.google.inject.Singleton;
@@@ -124,277 -114,20 +125,284 @@@ public class UniqueValueSerializationSt
  
  
      @Override
 -    protected TypeField createUniqueValueKey( final Id applicationId,  final String type, final Field field) {
 -        return new TypeField(type,field);
 +    protected List<Object> deserializePartitionKey(ByteBuffer bb){
 +
 +
 +        /**
 +         *   List<Object> keys = new ArrayList<>(6);
 +             keys.add(0, appUUID); // UUID
 +             keys.add(1, applicationType); // String
 +             keys.add(2, entityType); // String
 +             keys.add(3, fieldType); // String
 +             keys.add(4, fieldName); // String
 +             keys.add(5, fieldValueString); // String
 +
 +         */
 +
 +        List<Object> stuff = new ArrayList<>();
 +        while(bb.hasRemaining()){
 +            ByteBuffer data = CQLUtils.getWithShortLength(bb);
 +            if(stuff.size() == 0){
 +                stuff.add(DataType.uuid().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED));
 +            }else{
 +                stuff.add(DataType.text().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED));
 +            }
 +            byte equality = bb.get(); // we don't use this but take the equality byte off the buffer
 +
 +        }
 +
 +        return stuff;
 +
      }
  
 +    @Override
 +    protected ByteBuffer serializeUniqueValueLogColumn(UniqueFieldEntry fieldEntry){
 +
 +        /**
 +         *   final UUID version = value.getVersion();
 +             final Field<?> field = value.getField();
 +
 +             final FieldTypeName fieldType = field.getTypeName();
 +             final String fieldValue = field.getValue().toString().toLowerCase();
 +
 +
 +             DynamicComposite composite = new DynamicComposite(  );
 +
 +             //we want to sort ascending to descending by version
 +             composite.addComponent( version,  UUID_SERIALIZER, ColumnTypes.UUID_TYPE_REVERSED);
 +             composite.addComponent( field.getName(), STRING_SERIALIZER );
 +             composite.addComponent( fieldValue, STRING_SERIALIZER );
 +             composite.addComponent( fieldType.name() , STRING_SERIALIZER);
 +         */
 +
 +        // values are serialized as strings, not sure why, and always lower cased
 +        String fieldValueString = fieldEntry.getField().getValue().toString().toLowerCase();
 +
 +
 +        List<Object> keys = new ArrayList<>(4);
 +        keys.add(fieldEntry.getVersion());
 +        keys.add(fieldEntry.getField().getName());
 +        keys.add(fieldValueString);
 +        keys.add(fieldEntry.getField().getTypeName().name());
 +
 +        String comparator = UUID_TYPE_REVERSED;
 +
 +        int size = 16+fieldEntry.getField().getName().length()+fieldEntry.getField().getValue().toString().length()+
 +            fieldEntry.getField().getTypeName().name().length();
 +
 +        // we always need to add length for the 2 byte comparator short,  2 byte length short and 1 byte equality
 +        size += keys.size()*5;
 +
 +        // uuid type comparator is longest, ensure we allocate buffer using the max size to avoid overflow
 +        size += keys.size()*comparator.length();
 +
 +        ByteBuffer stuff = ByteBuffer.allocate(size);
 +
 +
 +        for (Object key : keys) {
 +
 +            if(key.equals(fieldEntry.getVersion())) {
 +                int p = comparator.indexOf("(reversed=true)");
 +                boolean desc = false;
 +                if (p >= 0) {
 +                    comparator = comparator.substring(0, p);
 +                    desc = true;
 +                }
 +
 +                byte a = (byte) 85; // this is the byte value for UUIDType in astyanax used in legacy data
 +                if (desc) {
 +                    a = (byte) Character.toUpperCase((char) a);
 +                }
 +
 +                stuff.putShort((short) ('\u8000' | a));
 +            }else{
 +                comparator = "UTF8Type"; // only strings are being serialized other than UUIDs here
 +                stuff.putShort((short)comparator.length());
 +                stuff.put(DataType.serializeValue(comparator, ProtocolVersion.NEWEST_SUPPORTED));
 +            }
 +
 +            ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
 +            if (kb == null) {
 +                kb = ByteBuffer.allocate(0);
 +            }
 +
 +            // put a short that indicates how big the buffer is for this item
 +            stuff.putShort((short) kb.remaining());
 +
 +            // put the actual item
 +            stuff.put(kb.slice());
 +
 +            // put an equality byte ( again not used by part of legacy thrift Astyanax schema)
 +            stuff.put((byte) 0);
 +
 +
 +        }
 +
 +        stuff.flip();
 +        return stuff.duplicate();
 +
 +    }
  
      @Override
 -    protected Field parseRowKey( final ScopedRowKey<TypeField> rowKey ) {
 -        return rowKey.getKey().getField();
 +    protected ByteBuffer getPartitionKey(Id applicationId, String entityType, String fieldType, String fieldName, Object fieldValue ){
 +
 +        return serializeKey(applicationId.getUuid(), applicationId.getType(),
 +            entityType, fieldType, fieldName, fieldValue);
 +
 +    }
 +
 +    @Override
 +    protected ByteBuffer getLogPartitionKey(final Id applicationId, final Id uniqueValueId){
 +
 +        return serializeLogKey(applicationId.getUuid(), applicationId.getType(),
 +            uniqueValueId.getUuid(), uniqueValueId.getType());
 +
 +    }
 +
 +    @Override
 +    protected ByteBuffer serializeUniqueValueColumn(EntityVersion entityVersion){
 +
 +        /**
 +         *   final Id entityId = ev.getEntityId();
 +             final UUID entityUuid = entityId.getUuid();
 +             final String entityType = entityId.getType();
 +
 +             CompositeBuilder builder = Composites.newDynamicCompositeBuilder();
 +
 +             builder.addUUID( entityVersion );
 +             builder.addUUID( entityUuid );
 +             builder.addString(entityType );
 +         */
 +
 +        String comparator = "UTF8Type";
 +
 +        List<Object> keys = new ArrayList<>(3);
 +        keys.add(entityVersion.getEntityVersion());
 +        keys.add(entityVersion.getEntityId().getUuid());
 +        keys.add(entityVersion.getEntityId().getType());
 +
 +        // UUIDs are 16 bytes
 +        int size = 16+16+entityVersion.getEntityId().getType().length();
 +
 +        // we always need to add length for the 2 byte comparator short,  2 byte length short and 1 byte equality
 +        size += keys.size()*5;
 +
 +        // we always add comparator to the buffer as well
 +        size += keys.size()*comparator.length();
 +
 +        ByteBuffer stuff = ByteBuffer.allocate(size);
 +
 +        for (Object key : keys) {
 +
++            // custom comparator mappings in  CQLUtils.COMPOSITE_TYPE ( more leftover from Asytanax )
 +            if(key instanceof UUID){
 +                comparator = "UUIDType";
 +            }else{
 +                comparator = "UTF8Type"; // if it's not a UUID, the only other thing we're serializing is text
 +            }
 +
 +            stuff.putShort((short)comparator.length());
 +            stuff.put(DataType.serializeValue(comparator, ProtocolVersion.NEWEST_SUPPORTED));
 +
 +            ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
 +            if (kb == null) {
 +                kb = ByteBuffer.allocate(0);
 +            }
 +
 +            // put a short that indicates how big the buffer is for this item
 +            stuff.putShort((short) kb.remaining());
 +
 +            // put the actual item
 +            stuff.put(kb.slice());
 +
 +            // put an equality byte ( again not used by part of legacy thrift Astyanax schema)
 +            stuff.put((byte) 0);
 +
 +
 +        }
 +
 +        stuff.flip();
 +        return stuff.duplicate();
 +
      }
  
 +    @Override
 +    protected List<Object> deserializeUniqueValueColumn(ByteBuffer bb){
 +
 +        List<Object> stuff = new ArrayList<>();
 +        int count = 0;
 +        while(bb.hasRemaining()){
 +
-             // custom columns have a short at beginning for comparator (which we don't use here )
-             ByteBuffer comparator = CQLUtils.getWithShortLength(bb);
++            // pull of custom comparator (per Astyanax deserialize)
++            int e = CQLUtils.getShortLength(bb);
++            if((e & '\u8000') == 0) {
++                CQLUtils.getBytes(bb, e);
++            } else {
++                // do nothing
++            }
++
 +
 +            ByteBuffer data = CQLUtils.getWithShortLength(bb);
 +
 +
 +            // first two composites are UUIDs, rest are strings
 +            if(count == 0) {
 +                stuff.add(new UUID(data.getLong(), data.getLong()));
 +            }else if(count ==1){
 +                stuff.add(new UUID(data.getLong(), data.getLong()));
 +            }else{
 +                stuff.add(DataType.text().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED));
 +            }
 +
 +            byte equality = bb.get(); // we don't use this but take the equality byte off the buffer
 +
 +            count++;
 +        }
 +
 +        return stuff;
 +
 +    }
  
      @Override
 -    protected Id createEntityUniqueLogKey( final Id applicationId, final Id uniqueValueId ) {
 -       return uniqueValueId;
 +    protected List<Object> deserializeUniqueValueLogColumn(ByteBuffer bb){
 +
 +
 +        /**
 +         *   List<Object> keys = new ArrayList<>(4);
 +             keys.add(fieldEntry.getVersion());
 +             keys.add(fieldEntry.getField().getName());
 +             keys.add(fieldValueString);
 +             keys.add(fieldEntry.getField().getTypeName().name());
 +         */
 +
 +        List<Object> stuff = new ArrayList<>();
 +        int count = 0;
 +        while(bb.hasRemaining()){
 +
-             // the comparator info is different for the UUID reversed type vs. UTF8 type
-             if(count ==0){
-                 bb.getShort(); // take the reversed comparator byte off
-             }else {
-                 ByteBuffer comparator = CQLUtils.getWithShortLength(bb);
++            int e = CQLUtils.getShortLength(bb);
++            if((e & '\u8000') == 0) {
++                CQLUtils.getBytes(bb, e);
++            } else {
++                // do nothing
 +            }
 +
 +            ByteBuffer data = CQLUtils.getWithShortLength(bb);
 +
 +
 +            // first composite is a UUID, rest are strings
 +            if(count == 0) {
 +                stuff.add(new UUID(data.getLong(), data.getLong()));
 +            }else{
 +                stuff.add(DataType.text().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED));
 +            }
 +
 +            byte equality = bb.get(); // we don't use this but take the equality byte off the buffer
 +
 +            count++;
 +        }
 +
 +        return stuff;
 +
      }
  
  

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java
index 0000000,2cad32c..ed88ba6
mode 000000,100644..100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java
@@@ -1,0 -1,94 +1,101 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.usergrid.persistence.collection.uniquevalues;
+ 
++import com.datastax.driver.core.BatchStatement;
++import com.datastax.driver.core.Session;
++import com.datastax.driver.core.querybuilder.Batch;
+ import com.google.inject.Inject;
+ import com.google.inject.Singleton;
+ import com.netflix.astyanax.MutationBatch;
+ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+ import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+ import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+ import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+ import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
+ import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
+ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+ import org.apache.usergrid.persistence.model.entity.Id;
+ import org.apache.usergrid.persistence.model.field.Field;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.Iterator;
+ import java.util.UUID;
+ 
+ 
+ @Singleton
+ public class UniqueValuesTableImpl implements UniqueValuesTable {
+     private static final Logger logger = LoggerFactory.getLogger( UniqueValuesTableImpl.class );
+ 
 -    final UniqueValueSerializationStrategy strat;
 -    final UniqueValuesFig uniqueValuesFig;
++    private final UniqueValueSerializationStrategy strat;
++    private final UniqueValuesFig uniqueValuesFig;
++    private final Session session;
+ 
+     @Inject
 -    public UniqueValuesTableImpl( final UniqueValueSerializationStrategy strat, UniqueValuesFig uniqueValuesFig) {
++    public UniqueValuesTableImpl( final UniqueValueSerializationStrategy strat,
++                                  final UniqueValuesFig uniqueValuesFig,
++                                  final Session session ) {
+         this.strat = strat;
+         this.uniqueValuesFig = uniqueValuesFig;
++        this.session = session;
+     }
+ 
+ 
+     @Override
+     public Id lookupOwner( ApplicationScope scope, String type, Field field) throws ConnectionException {
+ 
+         UniqueValueSet set = strat.load( scope, type, Collections.singletonList( field ) );
+         UniqueValue uv  = set.getValue( field.getName() );
+         return uv == null ? null : uv.getEntityId();
+     }
+ 
+     @Override
+     public void reserve( ApplicationScope scope, Id owner, UUID version, Field field ) throws ConnectionException {
+ 
+         UniqueValue uv = new UniqueValueImpl( field, owner, version);
 -        final MutationBatch write = strat.write( scope, uv, uniqueValuesFig.getUniqueValueReservationTtl() );
 -        write.execute();
++        final BatchStatement statement = strat.writeCQL( scope, uv, uniqueValuesFig.getUniqueValueReservationTtl() );
++        session.execute(statement);
+     }
+ 
+     @Override
+     public void confirm( ApplicationScope scope, Id owner, UUID version, Field field) throws ConnectionException {
+ 
+         UniqueValue uv = new UniqueValueImpl( field, owner, version);
 -        final MutationBatch write = strat.write( scope, uv );
 -        write.execute();
++        final BatchStatement statement = strat.writeCQL( scope, uv, -1 );
++        session.execute(statement);
+ 
+     }
+ 
+     @Override
+     public void cancel( ApplicationScope scope, Id owner, UUID version, Field field) throws ConnectionException {
+ 
+         UniqueValue uv = new UniqueValueImpl( field, owner, version );
 -        final MutationBatch write = strat.delete( scope, uv );
 -        write.execute();
++        final BatchStatement statement = strat.deleteCQL( scope, uv );
++        session.execute(statement);
+     }
+ 
+     @Override
+     public Iterator<UniqueValue> getUniqueValues(ApplicationScope scope, Id entityId) {
+         return strat.getAllUniqueFields( scope, entityId );
+     }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
index b18b095,89169ac..f98a3ea
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
@@@ -82,13 -71,12 +82,14 @@@ public class MarkCommitTest extends Abs
  
  
          //run the stage
-         WriteCommit newStage = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, session );
 -        WriteCommit newStage = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null, null);
++        WriteCommit newStage
++            = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null, null, session);
 +
  
-         //verify the observable is correct
-         Entity result  = newStage.call( new CollectionIoEvent<MvccEntity>( context, mvccEntityInput ) ).getEvent().getEntity().get();
- 
  
+         //verify the observable is correct
+         Entity result  = newStage.call(
+             new CollectionIoEvent<MvccEntity>( context, mvccEntityInput ) ).getEvent().getEntity().get();
  
  
          //verify the log entry is correct

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
index 60281d4,dcc473c..df0fc9e
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
@@@ -92,10 -84,12 +92,13 @@@ public class WriteCommitTest extends Ab
  
  
          //run the stage
-         WriteCommit newStage = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, session );
+         WriteCommit newStage =
 -            new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null, null );
++            new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null, null, session );
 +
  
-         Entity result = newStage.call( new CollectionIoEvent<MvccEntity>( context, mvccEntityInput ) ).getEvent().getEntity().get();
+ 
+         Entity result = newStage.call(
+             new CollectionIoEvent<MvccEntity>( context, mvccEntityInput ) ).getEvent().getEntity().get();
  
  
          //verify the log entry is correct
@@@ -142,7 -133,7 +145,8 @@@
          when( mvccEntityStrategy.write( any( ApplicationScope.class ), any( MvccEntity.class ) ) )
                  .thenReturn( entityMutation );
  
-         new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, session ).call( event );
 -        new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null, null ).call( event );
++        new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null, null, session ).call( event );
++
      }
  }
  

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyIT.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyIT.java
index 9d0cd20,401d23e..87226be
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyIT.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyIT.java
@@@ -18,12 -18,9 +18,10 @@@
  package org.apache.usergrid.persistence.collection.mvcc.stage.write;
  
  
- import org.junit.Rule;
- import org.junit.Test;
- import org.junit.runner.RunWith;
- 
- import org.apache.usergrid.persistence.collection.EntityCollectionManager;
- import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
++import com.datastax.driver.core.Session;
+ import com.google.inject.Inject;
+ import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
+ import org.apache.usergrid.persistence.collection.*;
  import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException;
  import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
  import org.apache.usergrid.persistence.collection.mvcc.stage.TestEntityGenerator;
@@@ -60,8 -70,25 +71,28 @@@ public class WriteUniqueVerifyIT extend
      public MigrationManagerRule migrationManagerRule;
  
      @Inject
+     public UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
+ 
+     @Inject
      public EntityCollectionManagerFactory cmf;
  
+     @Inject
+     ActorSystemManager actorSystemManager;
+ 
+     @Inject
+     UniqueValuesService uniqueValuesService;
+ 
++    @Inject
++    Session session;
++
+ 
+     @Before
+     public void initAkka() {
+         // each test class needs unique port number
+         initAkka( 2553, actorSystemManager, uniqueValuesService );
+     }
+ 
+ 
      @Test
      public void testConflict() {
  
@@@ -137,9 -162,69 +166,69 @@@
          entity.setField(new StringField("name", "Alfa Romeo 8C Competizione", true));
          entity.setField(new StringField("identifier", "ar8c", true));
          entity.setField(new IntegerField("top_speed_mph", 182));
-         entityManager.write( entity ).toBlocking().last();
+         entityManager.write( entity, null ).toBlocking().last();
  
          entity.setField( new StringField("foo", "bar"));
-         entityManager.write( entity ).toBlocking().last();
+         entityManager.write( entity, null ).toBlocking().last();
+     }
+ 
+     @Test
+     public void testConflictReadRepair() throws Exception {
+ 
+         final Id appId = new SimpleId("testNoConflict");
+ 
+ 
+ 
+         final ApplicationScope scope = new ApplicationScopeImpl( appId);
+ 
+         final EntityCollectionManager entityManager = cmf.createCollectionManager( scope );
+ 
+         final Entity entity = TestEntityGenerator.generateEntity();
+         entity.setField(new StringField("name", "Porsche 911 GT3", true));
+         entity.setField(new StringField("identifier", "911gt3", true));
+         entity.setField(new IntegerField("top_speed_mph", 194));
+         entityManager.write( entity, null ).toBlocking().last();
+ 
+ 
+         FieldSet fieldSet =
+             entityManager.getEntitiesFromFields("test", Collections.singletonList(entity.getField("name")), true)
+             .toBlocking().last();
+ 
+         MvccEntity entityFetched = fieldSet.getEntity( entity.getField("name") );
+ 
+ 
+         final Entity entityDuplicate = TestEntityGenerator.generateEntity();
+         UniqueValue uniqueValue = new UniqueValueImpl(new StringField("name", "Porsche 911 GT3", true),
+             entityDuplicate.getId(), UUIDGenerator.newTimeUUID());
+ 
+         // manually insert a record to simulate a 'duplicate' trying to be inserted
 -        uniqueValueSerializationStrategy.
 -            write(scope, uniqueValue).execute();
++        session.execute(uniqueValueSerializationStrategy.
++            writeCQL(scope, uniqueValue, -1));
+ 
+ 
+ 
+         FieldSet fieldSetAgain =
+             entityManager.getEntitiesFromFields("test", Collections.singletonList(entity.getField("name")), true)
+                 .toBlocking().last();
+ 
+         MvccEntity entityFetchedAgain = fieldSetAgain.getEntity( entity.getField("name") );
+ 
+         assertEquals(entityFetched, entityFetchedAgain);
+ 
+ 
+         // now test writing the original entity again ( simulates a PUT )
+         // this should read repair and work
+         entityManager.write( entity, null ).toBlocking().last();
+ 
+         FieldSet fieldSetAgainAgain =
+             entityManager.getEntitiesFromFields("test", Collections.singletonList(entity.getField("name")), true)
+                 .toBlocking().last();
+ 
+         MvccEntity entityFetchedAgainAgain = fieldSetAgainAgain.getEntity( entity.getField("name") );
+ 
+         assertEquals(entityFetched, entityFetchedAgainAgain);
+ 
+ 
+ 
      }
  }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
index 3ddc14d,7afba05..1290a5c
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
@@@ -18,17 -18,20 +18,27 @@@
  package org.apache.usergrid.persistence.collection.mvcc.stage.write;
  
  
++
 +import com.datastax.driver.core.Session;
- import org.junit.Rule;
- import org.junit.Test;
- import org.junit.runner.RunWith;
++
+ import com.google.inject.Inject;
+ import com.netflix.astyanax.Keyspace;
+ import com.netflix.astyanax.MutationBatch;
+ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+ import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
+ import org.apache.usergrid.persistence.collection.AbstractUniqueValueTest;
+ import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 +
  import org.apache.usergrid.persistence.collection.MvccEntity;
  import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
  import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
  import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
  import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
++
 +import org.apache.usergrid.persistence.core.CassandraConfig;
++
+ import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesService;
 -import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
++
  import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
  import org.apache.usergrid.persistence.core.scope.ApplicationScope;
  import org.apache.usergrid.persistence.core.test.ITRunner;
@@@ -84,12 -96,11 +106,12 @@@ public class WriteUniqueVerifyTest exte
          final MvccEntity mvccEntity = fromEntity( entity );
  
          // run the stage
-         WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, fig, keyspace,cassandraConfig, session );
 -        WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, fig, keyspace, cassandraConfig, null, null, null );
++        WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, fig, keyspace, cassandraConfig, null, null, null, session );
++
  
-        newStage.call(
-             new CollectionIoEvent<>( collectionScope, mvccEntity ) ) ;
+        newStage.call( new CollectionIoEvent<>( collectionScope, mvccEntity ) ) ;
  
-        //if we get here, it's a success.  We want to test no exceptions are thrown
+        // if we get here, it's a success.  We want to test no exceptions are thrown
  
          verify(batch, never()).execute();
      }