You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/08/17 21:48:35 UTC

[28/38] usergrid git commit: Initial UniqueValueSerialization conversion to CQL.

Initial UniqueValueSerialization conversion to CQL.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/0c609878
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/0c609878
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/0c609878

Branch: refs/heads/master
Commit: 0c609878e1eccd35f31bc4fdc86bd3fe9da21593
Parents: ff3f7e8
Author: Michael Russo <mr...@apigee.com>
Authored: Fri May 6 22:36:37 2016 +0800
Committer: Michael Russo <mr...@apigee.com>
Committed: Fri May 6 22:36:37 2016 +0800

----------------------------------------------------------------------
 .../mvcc/stage/write/WriteCommit.java           |  18 +-
 .../mvcc/stage/write/WriteUniqueVerify.java     |  23 +-
 .../UniqueValueSerializationStrategy.java       |   6 +-
 .../UniqueValueSerializationStrategyImpl.java   | 287 ++++++++++++-
 ...iqueValueSerializationStrategyProxyImpl.java |  31 +-
 .../UniqueValueSerializationStrategyV1Impl.java | 410 ++++++++++++++++++-
 .../UniqueValueSerializationStrategyV2Impl.java | 379 ++++++++++++++++-
 .../migration/MvccEntityDataMigrationImpl.java  |  26 +-
 .../mvcc/stage/delete/MarkCommitTest.java       |  13 +-
 .../mvcc/stage/write/WriteCommitTest.java       |  15 +-
 .../mvcc/stage/write/WriteUniqueVerifyTest.java |   6 +-
 ...niqueValueSerializationStrategyImplTest.java |  41 +-
 ...ctMvccEntityDataMigrationV1ToV3ImplTest.java |   5 +-
 .../core/datastax/impl/DataStaxClusterImpl.java |   3 +
 14 files changed, 1169 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c609878/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
index 7eb96e7..cfac8e4 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
@@ -20,6 +20,8 @@ package org.apache.usergrid.persistence.collection.mvcc.stage.write;
 
 import java.util.UUID;
 
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.Session;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,11 +69,14 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Collect
 
     private final MvccEntitySerializationStrategy entityStrat;
 
+    private final Session session;
+
 
     @Inject
     public WriteCommit( final MvccLogEntrySerializationStrategy logStrat,
                         final MvccEntitySerializationStrategy entryStrat,
-                        final UniqueValueSerializationStrategy uniqueValueStrat) {
+                        final UniqueValueSerializationStrategy uniqueValueStrat,
+                        final Session session) {
 
         Preconditions.checkNotNull( logStrat, "MvccLogEntrySerializationStrategy is required" );
         Preconditions.checkNotNull( entryStrat, "MvccEntitySerializationStrategy is required" );
@@ -80,6 +85,7 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Collect
         this.logEntryStrat = logStrat;
         this.entityStrat = entryStrat;
         this.uniqueValueStrat = uniqueValueStrat;
+        this.session = session;
     }
 
 
@@ -103,6 +109,8 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Collect
 
         final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, Stage.COMMITTED, MvccLogEntry.State.COMPLETE );
 
+
+
         MutationBatch logMutation = logEntryStrat.write( applicationScope, startEntry );
 
         // now get our actual insert into the entity data
@@ -112,21 +120,23 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Collect
         logMutation.mergeShallow( entityMutation );
 
         // re-write the unique values but this time with no TTL
+        final BatchStatement uniqueBatch = new BatchStatement();
+
         for ( Field field : EntityUtils.getUniqueFields(mvccEntity.getEntity().get()) ) {
 
                 UniqueValue written  = new UniqueValueImpl( field,
                     entityId,version);
 
-                MutationBatch mb = uniqueValueStrat.write(applicationScope,  written );
+                uniqueBatch.add(uniqueValueStrat.writeCQL(applicationScope,  written, -1 ));
 
                 logger.debug("Finalizing {} unique value {}", field.getName(), field.getValue().toString());
 
-                // merge into our existing mutation batch
-                logMutation.mergeShallow( mb );
+
         }
 
         try {
             logMutation.execute();
+            session.execute(uniqueBatch);
         }
         catch ( ConnectionException e ) {
             logger.error( "Failed to execute write asynchronously ", e );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c609878/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
index 585c26e..8e0b202 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
@@ -23,6 +23,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.Session;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,14 +73,20 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
     protected final SerializationFig serializationFig;
 
     protected final Keyspace keyspace;
+
+    protected final Session session;
+
     private final CassandraConfig cassandraFig;
 
 
     @Inject
     public WriteUniqueVerify( final UniqueValueSerializationStrategy uniqueValueSerializiationStrategy,
-                              final SerializationFig serializationFig, final Keyspace keyspace, final CassandraConfig cassandraFig ) {
+                              final SerializationFig serializationFig, final Keyspace keyspace,
+                              final CassandraConfig cassandraFig, final Session session ) {
+
         this.keyspace = keyspace;
         this.cassandraFig = cassandraFig;
+        this.session = session;
 
         Preconditions.checkNotNull( uniqueValueSerializiationStrategy, "uniqueValueSerializationStrategy is required" );
         Preconditions.checkNotNull( serializationFig, "serializationFig is required" );
@@ -101,7 +109,7 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
 
         final ApplicationScope scope = ioevent.getEntityCollection();
 
-        final MutationBatch batch = keyspace.prepareMutationBatch();
+        final BatchStatement batch = new BatchStatement();
         //allocate our max size, worst case
         final List<Field> uniqueFields = new ArrayList<>( entity.getFields().size() );
 
@@ -119,9 +127,8 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
             final UniqueValue written = new UniqueValueImpl( field, mvccEntity.getId(), mvccEntity.getVersion() );
 
             // use TTL in case something goes wrong before entity is finally committed
-            final MutationBatch mb = uniqueValueStrat.write( scope, written, serializationFig.getTimeout() );
+            batch.add(uniqueValueStrat.writeCQL( scope, written, serializationFig.getTimeout() ));
 
-            batch.mergeShallow( mb );
             uniqueFields.add(field);
         }
 
@@ -131,12 +138,8 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
         }
 
         //perform the write
-        try {
-            batch.execute();
-        }
-        catch ( ConnectionException ex ) {
-            throw new RuntimeException( "Unable to write to cassandra", ex );
-        }
+        session.execute(batch);
+
 
         // use simple thread pool to verify fields in parallel
         ConsistentReplayCommand cmd = new ConsistentReplayCommand(uniqueValueStrat,cassandraFig,scope, entity.getId().getType(), uniqueFields,entity);

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c609878/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
index 3645107..56e8b87 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
@@ -21,6 +21,8 @@ package org.apache.usergrid.persistence.collection.serialization;
 import java.util.Collection;
 import java.util.Iterator;
 
+import com.datastax.driver.core.BatchStatement;
+import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueSerializationStrategyImpl;
 import org.apache.usergrid.persistence.core.migration.data.VersionedData;
 import org.apache.usergrid.persistence.core.migration.schema.Migration;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -46,7 +48,6 @@ public interface UniqueValueSerializationStrategy extends Migration, VersionedDa
      *
      * @return MutatationBatch that encapsulates operation, caller may or may not execute.
      */
-    MutationBatch write( ApplicationScope applicationScope, UniqueValue uniqueValue );
 
     /**
      * Write the specified UniqueValue to Cassandra with optional timeToLive in milliseconds.
@@ -56,7 +57,8 @@ public interface UniqueValueSerializationStrategy extends Migration, VersionedDa
      * @param timeToLive How long object should live in seconds.  -1 implies store forever
      * @return MutatationBatch that encapsulates operation, caller may or may not execute.
      */
-    MutationBatch write( ApplicationScope applicationScope, UniqueValue uniqueValue, int timeToLive );
+
+    BatchStatement writeCQL(ApplicationScope applicationScope, UniqueValue uniqueValue, int timeToLive );
 
     /**
      * Load UniqueValue that matches field from collection or null if that value does not exist.

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c609878/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
index 0f27167..27a8609 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
@@ -18,9 +18,19 @@
 package org.apache.usergrid.persistence.collection.serialization.impl;
 
 
+import java.nio.ByteBuffer;
 import java.util.*;
 
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.querybuilder.Clause;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.datastax.driver.core.querybuilder.Using;
+import com.netflix.astyanax.model.*;
+import com.netflix.astyanax.util.RangeBuilder;
+import org.apache.usergrid.persistence.core.CassandraConfig;
 import org.apache.usergrid.persistence.core.datastax.TableDefinition;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.field.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,18 +50,13 @@ import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.field.Field;
 
 import com.google.common.base.Preconditions;
 import com.netflix.astyanax.ColumnListMutation;
 import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import com.netflix.astyanax.model.Column;
-import com.netflix.astyanax.model.ConsistencyLevel;
-import com.netflix.astyanax.model.Row;
 import com.netflix.astyanax.query.RowQuery;
-import com.netflix.astyanax.util.RangeBuilder;
 
 
 /**
@@ -62,6 +67,9 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
 
     private static final Logger log = LoggerFactory.getLogger( UniqueValueSerializationStrategyImpl.class );
 
+    public static final String UUID_TYPE_REVERSED = "UUIDType(reversed=true)";
+
+
 
     private final MultiTenantColumnFamily<ScopedRowKey<FieldKey>, EntityVersion>
         CF_UNIQUE_VALUES;
@@ -70,6 +78,15 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
     private final MultiTenantColumnFamily<ScopedRowKey<EntityKey>, UniqueFieldEntry>
         CF_ENTITY_UNIQUE_VALUE_LOG ;
 
+    private final String TABLE_UNIQUE_VALUES;
+    private final String TABLE_UNIQUE_VALUES_LOG;
+
+
+    private final Map COLUMNS_UNIQUE_VALUES;
+    private final Map COLUMNS_UNIQUE_VALUES_LOG;
+
+
+
     public static final int COL_VALUE = 0x0;
 
 
@@ -77,6 +94,9 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
     protected final Keyspace keyspace;
     private final CassandraFig cassandraFig;
 
+    private final Session session;
+    private final CassandraConfig cassandraConfig;
+
 
     /**
      * Construct serialization strategy for keyspace.
@@ -86,13 +106,24 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
      * @param serializationFig The serialization configuration
      */
     public UniqueValueSerializationStrategyImpl( final Keyspace keyspace, final CassandraFig cassandraFig,
-                                                 final SerializationFig serializationFig ) {
+                                                 final SerializationFig serializationFig,
+                                                 final Session session, final CassandraConfig cassandraConfig) {
         this.keyspace = keyspace;
         this.cassandraFig = cassandraFig;
         this.serializationFig = serializationFig;
 
+        this.session = session;
+        this.cassandraConfig = cassandraConfig;
+
         CF_UNIQUE_VALUES = getUniqueValuesCF();
         CF_ENTITY_UNIQUE_VALUE_LOG = getEntityUniqueLogCF();
+
+        TABLE_UNIQUE_VALUES = getUniqueValuesTable().getTableName();
+        TABLE_UNIQUE_VALUES_LOG = getEntityUniqueLogTable().getTableName();
+
+        COLUMNS_UNIQUE_VALUES = getUniqueValuesTable().getColumns();
+        COLUMNS_UNIQUE_VALUES_LOG = getEntityUniqueLogTable().getColumns();
+
     }
 
 
@@ -129,7 +160,6 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
     }
 
 
-    @Override
     public MutationBatch write( final ApplicationScope collectionScope, final UniqueValue value,
                                 final int timeToLive ) {
 
@@ -163,6 +193,86 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
         } );
     }
 
+    @Override
+    public BatchStatement writeCQL( final ApplicationScope collectionScope, final UniqueValue value,
+                           final int timeToLive  ){
+
+
+        Preconditions.checkNotNull( value, "value is required" );
+
+        BatchStatement batch = new BatchStatement();
+
+        Using ttl = null;
+        if(timeToLive > 0){
+
+            ttl = QueryBuilder.ttl(timeToLive);
+
+        }
+
+        final Id entityId = value.getEntityId();
+        final UUID entityVersion = value.getEntityVersion();
+        final Field<?> field = value.getField();
+
+        ValidationUtils.verifyIdentity( entityId );
+        ValidationUtils.verifyVersion( entityVersion );
+
+        final EntityVersion ev = new EntityVersion( entityId, entityVersion );
+        final UniqueFieldEntry uniqueFieldEntry = new UniqueFieldEntry( entityVersion, field );
+
+        ByteBuffer partitionKey = getPartitionKey(collectionScope.getApplication(), value.getEntityId().getType(),
+            field.getTypeName().toString(), field.getName(), field.getValue());
+
+        ByteBuffer logPartitionKey = getLogPartitionKey(collectionScope.getApplication(), value.getEntityId());
+
+
+        if(ttl != null) {
+
+            Statement uniqueValueStatement = QueryBuilder.insertInto(TABLE_UNIQUE_VALUES)
+                .value("key", partitionKey)
+                .value("column1", serializeUniqueValueColumn(ev))
+                .value("value", DataType.serializeValue(COL_VALUE, ProtocolVersion.NEWEST_SUPPORTED))
+                .using(ttl);
+
+            batch.add(uniqueValueStatement);
+
+
+        }else{
+
+            Statement uniqueValueStatement = QueryBuilder.insertInto(TABLE_UNIQUE_VALUES)
+                .value("key", partitionKey)
+                .value("column1", serializeUniqueValueColumn(ev))
+                .value("value", DataType.serializeValue(COL_VALUE, ProtocolVersion.NEWEST_SUPPORTED));
+
+            batch.add(uniqueValueStatement);
+
+        }
+
+        // we always want to retain the log entry, so never write with the TTL
+        Statement uniqueValueLogStatement = QueryBuilder.insertInto(TABLE_UNIQUE_VALUES_LOG)
+            .value("key", logPartitionKey)
+            .value("column1", serializeUniqueValueLogColumn(uniqueFieldEntry))
+            .value("value", DataType.serializeValue(COL_VALUE, ProtocolVersion.NEWEST_SUPPORTED));
+
+        batch.add(uniqueValueLogStatement);
+
+
+
+        return batch;
+
+        /**
+         *  @Override
+        public void doLookup( final ColumnListMutation<EntityVersion> colMutation ) {
+        colMutation.putColumn( ev, COL_VALUE );
+        }
+
+
+         @Override
+         public void doLog( final ColumnListMutation<UniqueFieldEntry> colMutation ) {
+         colMutation.putColumn( uniqueFieldEntry, COL_VALUE );
+         }
+         */
+    }
+
 
     @Override
     public MutationBatch delete( final ApplicationScope scope, UniqueValue value ) {
@@ -236,18 +346,26 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
     @Override
     public UniqueValueSet load( final ApplicationScope colScope, final String type, final Collection<Field> fields )
         throws ConnectionException {
-        return load( colScope, ConsistencyLevel.valueOf( cassandraFig.getAstyanaxReadCL() ), type, fields );
+        return load( colScope, com.netflix.astyanax.model.ConsistencyLevel.valueOf( cassandraFig.getAstyanaxReadCL() ), type, fields );
     }
 
 
     @Override
-    public UniqueValueSet load( final ApplicationScope appScope, final ConsistencyLevel consistencyLevel,
+    public UniqueValueSet load( final ApplicationScope appScope, final com.netflix.astyanax.model.ConsistencyLevel consistencyLevel,
                                 final String type, final Collection<Field> fields ) throws ConnectionException {
 
         Preconditions.checkNotNull( fields, "fields are required" );
         Preconditions.checkArgument( fields.size() > 0, "More than 1 field must be specified" );
 
+        return loadCQL(appScope, com.datastax.driver.core.ConsistencyLevel.LOCAL_QUORUM, type, fields);
 
+        //return loadLegacy( appScope, type, fields);
+
+    }
+
+
+    private UniqueValueSet loadLegacy(final ApplicationScope appScope,
+                                      final String type, final Collection<Field> fields) throws ConnectionException {
         final List<ScopedRowKey<FieldKey>> keys = new ArrayList<>( fields.size() );
 
         final Id applicationId = appScope.getApplication();
@@ -265,16 +383,16 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
 
         final UniqueValueSetImpl uniqueValueSet = new UniqueValueSetImpl( fields.size() );
 
-        Iterator<Row<ScopedRowKey<FieldKey>, EntityVersion>> results =
-            keyspace.prepareQuery( CF_UNIQUE_VALUES ).setConsistencyLevel( consistencyLevel ).getKeySlice( keys )
-                    .withColumnRange( new RangeBuilder().setLimit( 1 ).build() ).execute().getResult().iterator();
+        Iterator<com.netflix.astyanax.model.Row<ScopedRowKey<FieldKey>, EntityVersion>> results =
+            keyspace.prepareQuery( CF_UNIQUE_VALUES ).setConsistencyLevel(com.netflix.astyanax.model.ConsistencyLevel.CL_LOCAL_QUORUM ).getKeySlice( keys )
+                .withColumnRange( new RangeBuilder().setLimit( 1 ).build() ).execute().getResult().iterator();
 
 
         while ( results.hasNext() )
 
         {
 
-            final Row<ScopedRowKey<FieldKey>, EntityVersion> unique = results.next();
+            final com.netflix.astyanax.model.Row<ScopedRowKey<FieldKey>, EntityVersion> unique = results.next();
 
 
             final Field field = parseRowKey( unique.getKey() );
@@ -296,9 +414,112 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
         }
 
         return uniqueValueSet;
+
+    }
+
+    private UniqueValueSet loadCQL( final ApplicationScope appScope, final com.datastax.driver.core.ConsistencyLevel consistencyLevel,
+                                final String type, final Collection<Field> fields ) throws ConnectionException {
+
+        Preconditions.checkNotNull( fields, "fields are required" );
+        Preconditions.checkArgument( fields.size() > 0, "More than 1 field must be specified" );
+
+
+        final Id applicationId = appScope.getApplication();
+
+        // row key = app UUID + app type + entityType + field type + field name + field value
+
+        List<ByteBuffer> partitionKeys = new ArrayList<>( fields.size() );
+        for ( Field field : fields ) {
+
+            //log.info(Bytes.toHexString(getPartitionKey(applicationId, type, field.getTypeName().toString(), field.getName(), field.getValue())));
+
+            partitionKeys.add(getPartitionKey(applicationId, type, field.getTypeName().toString(), field.getName(), field.getValue()));
+
+        }
+
+        final UniqueValueSetImpl uniqueValueSet = new UniqueValueSetImpl( fields.size() );
+
+        final Clause inKey = QueryBuilder.in("key", partitionKeys );
+
+        final Statement statement = QueryBuilder.select().all().from(TABLE_UNIQUE_VALUES)
+            .where(inKey)
+            .setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.LOCAL_QUORUM);
+
+        final ResultSet resultSet = session.execute(statement);
+
+
+        Iterator<com.datastax.driver.core.Row> results = resultSet.iterator();
+
+
+        while( results.hasNext() ){
+
+            final com.datastax.driver.core.Row unique = results.next();
+            ByteBuffer partitionKey = unique.getBytes("key");
+            ByteBuffer column = unique.getBytesUnsafe("column1");
+
+            List<Object> keyContents = deserializePartitionKey(partitionKey);
+            List<Object> columnContents = deserializeUniqueValueColumn(column);
+
+            Field field = null;
+            FieldTypeName fieldType;
+            String name;
+            String value;
+            if(this instanceof UniqueValueSerializationStrategyV2Impl) {
+
+                 fieldType = FieldTypeName.valueOf((String) keyContents.get(3));
+                 name = (String) keyContents.get(4);
+                 value = (String) keyContents.get(5);
+
+            }else{
+
+                fieldType = FieldTypeName.valueOf((String) keyContents.get(5));
+                name = (String) keyContents.get(6);
+                value = (String) keyContents.get(7);
+
+            }
+
+            switch ( fieldType ) {
+                case BOOLEAN:
+                    field = new BooleanField( name, Boolean.parseBoolean( value ) );
+                    break;
+                case DOUBLE:
+                    field = new DoubleField( name, Double.parseDouble( value ) );
+                    break;
+                case FLOAT:
+                    field = new FloatField( name, Float.parseFloat( value ) );
+                    break;
+                case INTEGER:
+                    field =  new IntegerField( name, Integer.parseInt( value ) );
+                    break;
+                case LONG:
+                    field = new LongField( name, Long.parseLong( value ) );
+                    break;
+                case STRING:
+                    field = new StringField( name, value );
+                    break;
+                case UUID:
+                    field = new UUIDField( name, UUID.fromString( value ) );
+                    break;
+            }
+
+            final EntityVersion entityVersion = new EntityVersion(
+                new SimpleId((UUID)columnContents.get(1), (String)columnContents.get(2)), (UUID)columnContents.get(0));
+
+
+            final UniqueValueImpl uniqueValue =
+              new UniqueValueImpl( field, entityVersion.getEntityId(), entityVersion.getEntityVersion() );
+
+            uniqueValueSet.addValue(uniqueValue);
+
+        }
+
+        return uniqueValueSet;
+
     }
 
 
+
+
     @Override
     public Iterator<UniqueValue> getAllUniqueFields( final ApplicationScope collectionScope, final Id entityId ) {
         Preconditions.checkNotNull( collectionScope, "collectionScope is required" );
@@ -378,7 +599,13 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
     @Override
     public Collection<TableDefinition> getTables() {
 
-        return Collections.emptyList();
+        final TableDefinition uniqueValues = getUniqueValuesTable();
+
+        final TableDefinition uniqueValuesLog = getEntityUniqueLogTable();
+
+
+        return Arrays.asList( uniqueValues, uniqueValuesLog );
+
     }
 
 
@@ -389,6 +616,12 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
 
 
     /**
+     * Get the CQL table definition for the unique values log table
+     */
+    protected abstract TableDefinition getUniqueValuesTable();
+
+
+    /**
      * Generate a key that is compatible with the column family
      *
      * @param applicationId The applicationId
@@ -405,10 +638,32 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
     protected abstract Field parseRowKey(final ScopedRowKey<FieldKey> rowKey);
 
 
+    protected abstract List<Object> deserializePartitionKey(ByteBuffer bb);
+
+
+    protected abstract Object serializeUniqueValueLogColumn(UniqueFieldEntry fieldEntry);
+
+    protected abstract ByteBuffer getPartitionKey(Id applicationId, String entityType, String fieldType, String fieldName, Object fieldValue );
+
+    protected abstract ByteBuffer getLogPartitionKey(final Id applicationId, final Id uniqueValueId);
+
+    protected abstract ByteBuffer serializeUniqueValueColumn(EntityVersion entityVersion);
+
+    protected abstract List<Object> deserializeUniqueValueColumn(ByteBuffer bb);
+
+
+
+
+
+        /**
+         * Get the column family for the unique field CF
+         */
+    protected abstract MultiTenantColumnFamily<ScopedRowKey<EntityKey>, UniqueFieldEntry> getEntityUniqueLogCF();
+
     /**
-     * Get the column family for the unique field CF
+     * Get the CQL table definition for the unique values log table
      */
-    protected abstract MultiTenantColumnFamily<ScopedRowKey<EntityKey>, UniqueFieldEntry> getEntityUniqueLogCF();
+    protected abstract TableDefinition getEntityUniqueLogTable();
 
     /**
      * Generate a key that is compatible with the column family

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c609878/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java
index 87b1641..bbfaa2d 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java
@@ -24,6 +24,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 
+import com.datastax.driver.core.BatchStatement;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
@@ -67,40 +68,22 @@ public class UniqueValueSerializationStrategyProxyImpl implements UniqueValueSer
 
 
     @Override
-    public MutationBatch write( final ApplicationScope applicationScope, final UniqueValue uniqueValue ) {
-        final MigrationRelationship<UniqueValueSerializationStrategy> migration = getMigrationRelationShip();
-
-        if ( migration.needsMigration() ) {
-            final MutationBatch aggregateBatch = keyspace.prepareMutationBatch();
-
-            aggregateBatch.mergeShallow( migration.from.write( applicationScope, uniqueValue ) );
-            aggregateBatch.mergeShallow( migration.to.write( applicationScope, uniqueValue ) );
-
-            return aggregateBatch;
-        }
+    public BatchStatement writeCQL(final ApplicationScope applicationScope, final UniqueValue uniqueValue,
+                                   final int timeToLive ){
 
-        return migration.to.write( applicationScope, uniqueValue );
-    }
-
-
-    @Override
-    public MutationBatch write( final ApplicationScope applicationScope, final UniqueValue uniqueValue,
-                                final int timeToLive ) {
         final MigrationRelationship<UniqueValueSerializationStrategy> migration = getMigrationRelationShip();
 
         if ( migration.needsMigration() ) {
-            final MutationBatch aggregateBatch = keyspace.prepareMutationBatch();
-
-            aggregateBatch.mergeShallow( migration.from.write( applicationScope, uniqueValue, timeToLive ) );
-            aggregateBatch.mergeShallow( migration.to.write( applicationScope, uniqueValue, timeToLive ) );
+            migration.from.writeCQL( applicationScope, uniqueValue, timeToLive );
+            migration.to.writeCQL( applicationScope, uniqueValue, timeToLive );
 
-            return aggregateBatch;
         }
 
-        return migration.to.write( applicationScope, uniqueValue, timeToLive );
+        return migration.to.writeCQL( applicationScope, uniqueValue, timeToLive );
     }
 
 
+
     @Override
     public UniqueValueSet load( final ApplicationScope applicationScope, final String type,
                                 final Collection<Field> fields ) throws ConnectionException {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c609878/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java
index 2235f63..75666fa 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java
@@ -20,20 +20,24 @@
 package org.apache.usergrid.persistence.collection.serialization.impl;
 
 
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
+import java.nio.ByteBuffer;
+import java.util.*;
 
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.ProtocolVersion;
+import com.datastax.driver.core.Session;
 import org.apache.cassandra.db.marshal.BytesType;
 
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.collection.serialization.impl.util.LegacyScopeUtils;
+import org.apache.usergrid.persistence.core.CassandraConfig;
 import org.apache.usergrid.persistence.core.CassandraFig;
 import org.apache.usergrid.persistence.core.astyanax.ColumnTypes;
 import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
 import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamily;
 import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
 import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.datastax.CQLUtils;
 import org.apache.usergrid.persistence.core.datastax.TableDefinition;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.field.Field;
@@ -50,6 +54,40 @@ import com.netflix.astyanax.Keyspace;
 public class UniqueValueSerializationStrategyV1Impl  extends UniqueValueSerializationStrategyImpl<CollectionPrefixedKey<Field>, CollectionPrefixedKey<Id>> {
 
 
+
+    private static final String UNIQUE_VALUES_TABLE = CQLUtils.quote("Unique_Values");
+    private static final Collection<String> UNIQUE_VALUES_PARTITION_KEYS = Collections.singletonList("key");
+    private static final Collection<String> UNIQUE_VALUES_COLUMN_KEYS = Collections.singletonList("column1");
+    private static final Map<String, DataType.Name> UNIQUE_VALUES_COLUMNS =
+        new HashMap<String, DataType.Name>() {{
+            put( "key", DataType.Name.BLOB );
+            put( "column1", DataType.Name.BLOB );
+            put( "value", DataType.Name.BLOB ); }};
+    private static final Map<String, String> UNIQUE_VALUES_CLUSTERING_ORDER =
+        new HashMap<String, String>(){{ put( "column1", "ASC" ); }};
+
+
+    private static final String UNIQUE_VALUES_LOG_TABLE = CQLUtils.quote("Entity_Unique_Values");
+    private static final Collection<String> UNIQUE_VALUES_LOG_PARTITION_KEYS = Collections.singletonList("key");
+    private static final Collection<String> UNIQUE_VALUES_LOG_COLUMN_KEYS = Collections.singletonList("column1");
+    private static final Map<String, DataType.Name> UNIQUE_VALUES_LOG_COLUMNS =
+        new HashMap<String, DataType.Name>() {{
+            put( "key", DataType.Name.BLOB );
+            put( "column1", DataType.Name.BLOB );
+            put( "value", DataType.Name.BLOB ); }};
+    private static final Map<String, String> UNIQUE_VALUES_LOG_CLUSTERING_ORDER =
+        new HashMap<String, String>(){{ put( "column1", "ASC" ); }};
+
+
+    private final static TableDefinition uniqueValues =
+        new TableDefinition( UNIQUE_VALUES_TABLE, UNIQUE_VALUES_PARTITION_KEYS, UNIQUE_VALUES_COLUMN_KEYS,
+            UNIQUE_VALUES_COLUMNS, TableDefinition.CacheOption.KEYS, UNIQUE_VALUES_CLUSTERING_ORDER);
+
+    private final static TableDefinition uniqueValuesLog =
+        new TableDefinition( UNIQUE_VALUES_LOG_TABLE, UNIQUE_VALUES_LOG_PARTITION_KEYS, UNIQUE_VALUES_LOG_COLUMN_KEYS,
+            UNIQUE_VALUES_LOG_COLUMNS, TableDefinition.CacheOption.KEYS, UNIQUE_VALUES_LOG_CLUSTERING_ORDER);
+
+
     private static final CollectionScopedRowKeySerializer<Field> ROW_KEY_SER =
         new CollectionScopedRowKeySerializer<>( UniqueFieldRowKeySerializer.get() );
 
@@ -79,9 +117,11 @@ public class UniqueValueSerializationStrategyV1Impl  extends UniqueValueSerializ
      * @param serializationFig The serialization configuration
      */
     @Inject
-    public UniqueValueSerializationStrategyV1Impl( final Keyspace keyspace, final CassandraFig cassandraFig,
-                                                   final SerializationFig serializationFig ) {
-        super( keyspace, cassandraFig, serializationFig );
+    public UniqueValueSerializationStrategyV1Impl(final Keyspace keyspace, final CassandraFig cassandraFig,
+                                                  final SerializationFig serializationFig,
+                                                  final Session session,
+                                                  final CassandraConfig cassandraConfig) {
+        super( keyspace, cassandraFig, serializationFig, session, cassandraConfig );
     }
 
 
@@ -113,6 +153,12 @@ public class UniqueValueSerializationStrategyV1Impl  extends UniqueValueSerializ
         return CF_UNIQUE_VALUES;
     }
 
+    @Override
+    protected TableDefinition getUniqueValuesTable(){
+
+        return uniqueValues;
+    }
+
 
     @Override
     protected MultiTenantColumnFamily<ScopedRowKey<CollectionPrefixedKey<Id>>, UniqueFieldEntry>
@@ -122,6 +168,14 @@ public class UniqueValueSerializationStrategyV1Impl  extends UniqueValueSerializ
 
 
     @Override
+    protected TableDefinition getEntityUniqueLogTable(){
+
+        return uniqueValuesLog;
+
+    }
+
+
+    @Override
     protected CollectionPrefixedKey<Field> createUniqueValueKey( final Id applicationId,
                                                                  final String type, final Field field) {
 
@@ -141,6 +195,242 @@ public class UniqueValueSerializationStrategyV1Impl  extends UniqueValueSerializ
         return rowKey.getKey().getSubKey();
     }
 
+    @Override
+    protected List<Object> deserializePartitionKey(ByteBuffer bb){
+
+
+        /**
+         *  List<Object> keys = new ArrayList<>(8);
+            keys.add(0, appUUID);
+            keys.add(1, applicationType);
+            keys.add(2, appUUID);
+            keys.add(3, applicationType);
+            keys.add(4, entityType);
+            keys.add(5, fieldType);
+            keys.add(6, fieldName);
+            keys.add(7, fieldValueString);
+
+         */
+
+        int count = 0;
+        List<Object> stuff = new ArrayList<>();
+        while(bb.hasRemaining()){
+            ByteBuffer data = CQLUtils.getWithShortLength(bb);
+            if(count == 0 || count == 2){
+                stuff.add(DataType.uuid().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED));
+            }else{
+                stuff.add(DataType.text().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED));
+            }
+            byte equality = bb.get(); // we don't use this but take the equality byte off the buffer
+            count++;
+        }
+
+        return stuff;
+
+    }
+
+    @Override
+    protected Object serializeUniqueValueLogColumn(UniqueFieldEntry fieldEntry){
+
+        /**
+         *         final UUID version = value.getVersion();
+         final Field<?> field = value.getField();
+
+         final FieldTypeName fieldType = field.getTypeName();
+         final String fieldValue = field.getValue().toString().toLowerCase();
+
+
+         DynamicComposite composite = new DynamicComposite(  );
+
+         //we want to sort ascending to descending by version
+         composite.addComponent( version,  UUID_SERIALIZER, ColumnTypes.UUID_TYPE_REVERSED);
+         composite.addComponent( field.getName(), STRING_SERIALIZER );
+         composite.addComponent( fieldValue, STRING_SERIALIZER );
+         composite.addComponent( fieldType.name() , STRING_SERIALIZER);
+         */
+
+        // values are serialized as strings, not sure why, and always lower cased
+        String fieldValueString = fieldEntry.getField().getValue().toString().toLowerCase();
+
+
+        List<Object> keys = new ArrayList<>(4);
+        keys.add(fieldEntry.getVersion());
+        keys.add(fieldEntry.getField().getName());
+        keys.add(fieldValueString);
+        keys.add(fieldEntry.getField().getTypeName().name());
+
+        String comparator = UUID_TYPE_REVERSED;
+
+        int size = 16+fieldEntry.getField().getName().length()+fieldEntry.getField().getValue().toString().length()+
+            fieldEntry.getField().getTypeName().name().length();
+
+        // we always need to add length for the 2 byte comparator short,  2 byte length short and 1 byte equality
+        size += keys.size()*65;
+
+        // uuid type comparator is longest, ensure we allocate buffer using the max size to avoid overflow
+        size += keys.size()*comparator.length();
+
+        ByteBuffer stuff = ByteBuffer.allocate(size);
+
+
+        for (Object key : keys) {
+
+            if(key.equals(fieldEntry.getVersion())) {
+                int p = comparator.indexOf("(reversed=true)");
+                boolean desc = false;
+                if (p >= 0) {
+                    comparator = comparator.substring(0, p);
+                    desc = true;
+                }
+
+                byte a = (byte) 85; // this is the byte value for UUIDType in astyanax used in legacy data
+                if (desc) {
+                    a = (byte) Character.toUpperCase((char) a);
+                }
+
+                stuff.putShort((short) ('\u8000' | a));
+            }else{
+                comparator = "UTF8Type"; // only strings are being serialized other than UUIDs here
+                stuff.putShort((short)comparator.length());
+                stuff.put(DataType.serializeValue(comparator, ProtocolVersion.NEWEST_SUPPORTED));
+            }
+
+            ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
+            if (kb == null) {
+                kb = ByteBuffer.allocate(0);
+            }
+
+            // put a short that indicates how big the buffer is for this item
+            stuff.putShort((short) kb.remaining());
+
+            // put the actual item
+            stuff.put(kb.slice());
+
+            // put an equality byte ( again not used by part of legacy thrift Astyanax schema)
+            stuff.put((byte) 0);
+
+
+        }
+
+        stuff.flip();
+        return stuff.duplicate();
+
+    }
+
+    @Override
+    protected ByteBuffer getPartitionKey(Id applicationId, String entityType, String fieldType, String fieldName, Object fieldValue ){
+
+        return serializeKey(applicationId.getUuid(), applicationId.getType(),
+            entityType, fieldType, fieldName, fieldValue);
+
+    }
+
+    @Override
+    protected ByteBuffer getLogPartitionKey(final Id applicationId, final Id uniqueValueId){
+
+        return serializeLogKey(applicationId.getUuid(), applicationId.getType(),
+            uniqueValueId.getUuid(), uniqueValueId.getType());
+
+    }
+
+    @Override
+    protected ByteBuffer serializeUniqueValueColumn(EntityVersion entityVersion){
+
+        /**
+         *         final Id entityId = ev.getEntityId();
+         final UUID entityUuid = entityId.getUuid();
+         final String entityType = entityId.getType();
+
+         CompositeBuilder builder = Composites.newDynamicCompositeBuilder();
+
+         builder.addUUID( entityVersion );
+         builder.addUUID( entityUuid );
+         builder.addString(entityType );
+         */
+
+        String comparator = "UTF8Type";
+
+        List<Object> keys = new ArrayList<>(3);
+        keys.add(entityVersion.getEntityVersion());
+        keys.add(entityVersion.getEntityId().getUuid());
+        keys.add(entityVersion.getEntityId().getType());
+
+        // UUIDs are 16 bytes
+        int size = 16+16+entityVersion.getEntityId().getType().length();
+
+        // we always need to add length for the 2 byte comparator short,  2 byte length short and 1 byte equality
+        size += keys.size()*5;
+
+        // we always add comparator to the buffer as well
+        size += keys.size()*comparator.length();
+
+        ByteBuffer stuff = ByteBuffer.allocate(size);
+
+        for (Object key : keys) {
+
+            if(key instanceof UUID){
+                comparator = "UUIDType";
+            }else{
+                comparator = "UTF8Type"; // if it's not a UUID, the only other thing we're serializing is text
+            }
+
+            stuff.putShort((short)comparator.length());
+            stuff.put(DataType.serializeValue(comparator, ProtocolVersion.NEWEST_SUPPORTED));
+
+            ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
+            if (kb == null) {
+                kb = ByteBuffer.allocate(0);
+            }
+
+            // put a short that indicates how big the buffer is for this item
+            stuff.putShort((short) kb.remaining());
+
+            // put the actual item
+            stuff.put(kb.slice());
+
+            // put an equality byte ( again not used by part of legacy thrift Astyanax schema)
+            stuff.put((byte) 0);
+
+
+        }
+
+        stuff.flip();
+        return stuff.duplicate();
+
+    }
+
+    @Override
+    protected List<Object> deserializeUniqueValueColumn(ByteBuffer bb){
+
+        List<Object> stuff = new ArrayList<>();
+        int count = 0;
+        while(bb.hasRemaining()){
+
+            // custom columns have a short at beginning for comparator (which we don't use here )
+            ByteBuffer comparator = CQLUtils.getWithShortLength(bb);
+
+            ByteBuffer data = CQLUtils.getWithShortLength(bb);
+
+
+            // first two composites are UUIDs, rest are strings
+            if(count == 0) {
+                stuff.add(new UUID(data.getLong(), data.getLong()));
+            }else if(count ==1){
+                stuff.add(new UUID(data.getLong(), data.getLong()));
+            }else{
+                stuff.add(DataType.text().deserialize(data.duplicate(), ProtocolVersion.NEWEST_SUPPORTED));
+            }
+
+            byte equality = bb.get(); // we don't use this but take the equality byte off the buffer
+
+            count++;
+        }
+
+        return stuff;
+
+    }
+
+
 
     @Override
     protected CollectionPrefixedKey<Id> createEntityUniqueLogKey( final Id applicationId,
@@ -163,4 +453,112 @@ public class UniqueValueSerializationStrategyV1Impl  extends UniqueValueSerializ
     public int getImplementationVersion() {
         return CollectionDataVersions.INITIAL.getVersion();
     }
+
+
+
+    private ByteBuffer serializeKey( UUID appUUID,
+                                     String applicationType,
+                                     String entityType,
+                                     String fieldType,
+                                     String fieldName,
+                                     Object fieldValue  ){
+
+        final String collectionName = LegacyScopeUtils.getCollectionScopeNameFromEntityType( entityType );
+
+//        final CollectionPrefixedKey<Field> uniquePrefixedKey =
+//            new CollectionPrefixedKey<>( collectionName, applicationId, field );
+
+//        //read back the id
+//        final Id orgId = ID_SER.fromComposite( parser );
+//        final Id scopeId = ID_SER.fromComposite( parser );
+//        final String scopeName = parser.readString();
+//        final K value = keySerializer.fromComposite( parser );
+
+
+        // values are serialized as strings, not sure why, and always lower cased
+        String fieldValueString = fieldValue.toString().toLowerCase();
+
+        List<Object> keys = new ArrayList<>(8);
+        keys.add(0, appUUID);
+        keys.add(1, applicationType);
+        keys.add(2, appUUID);
+        keys.add(3, applicationType);
+        keys.add(4, collectionName);
+        keys.add(5, fieldType);
+        keys.add(6, fieldName);
+        keys.add(7, fieldValueString);
+
+
+        // UUIDs are 16 bytes, allocate the buffer accordingly
+        int size = 16 + applicationType.length() + 16 + applicationType.length() + collectionName.length() +
+            fieldType.length() + fieldName.length()+fieldValueString.length();
+
+
+        // we always need to add length for the 2 byte short and 1 byte equality
+        size += keys.size()*3;
+
+        ByteBuffer stuff = ByteBuffer.allocate(size);
+
+        for (Object key : keys) {
+
+            ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
+            if (kb == null) {
+                kb = ByteBuffer.allocate(0);
+            }
+
+            stuff.putShort((short) kb.remaining());
+            stuff.put(kb.slice());
+            stuff.put((byte) 0);
+
+
+        }
+        stuff.flip();
+        return stuff.duplicate();
+
+    }
+
+    private ByteBuffer serializeLogKey(UUID appUUID, String applicationType, UUID entityId, String entityType){
+
+
+       final String collectionName = LegacyScopeUtils.getCollectionScopeNameFromEntityType( entityType );
+//
+//
+//        final CollectionPrefixedKey<Id> collectionPrefixedEntityKey =
+//            new CollectionPrefixedKey<>( collectionName, applicationId, uniqueValueId );
+
+        List<Object> keys = new ArrayList<>(4);
+        keys.add(appUUID);
+        keys.add(applicationType);
+        keys.add(appUUID);
+        keys.add(applicationType);
+        keys.add(collectionName);
+        keys.add(entityId);
+        keys.add(entityType);
+
+        int size = 16+applicationType.length()+16+applicationType.length()+collectionName.length()+16+entityType.length();
+
+        // we always need to add length for the 2 byte short and 1 byte equality
+        size += keys.size()*3;
+
+        ByteBuffer stuff = ByteBuffer.allocate(size);
+
+        for (Object key : keys) {
+
+            ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
+            if (kb == null) {
+                kb = ByteBuffer.allocate(0);
+            }
+
+            stuff.putShort((short) kb.remaining());
+            stuff.put(kb.slice());
+            stuff.put((byte) 0);
+
+
+        }
+        stuff.flip();
+        return stuff.duplicate();
+
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c609878/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java
index 0f233cf..4177c37 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java
@@ -20,13 +20,16 @@
 package org.apache.usergrid.persistence.collection.serialization.impl;
 
 
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
+import java.nio.ByteBuffer;
+import java.util.*;
 
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.ProtocolVersion;
+import com.datastax.driver.core.Session;
 import org.apache.cassandra.db.marshal.BytesType;
 
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.core.CassandraConfig;
 import org.apache.usergrid.persistence.core.CassandraFig;
 import org.apache.usergrid.persistence.core.astyanax.ColumnTypes;
 import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
@@ -34,6 +37,7 @@ import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamily;
 import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
 import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
 import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
+import org.apache.usergrid.persistence.core.datastax.CQLUtils;
 import org.apache.usergrid.persistence.core.datastax.TableDefinition;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.field.Field;
@@ -49,6 +53,37 @@ import com.netflix.astyanax.Keyspace;
 @Singleton
 public class UniqueValueSerializationStrategyV2Impl  extends UniqueValueSerializationStrategyImpl<TypeField, Id> {
 
+    private static final String UNIQUE_VALUES_TABLE = CQLUtils.quote("Unique_Values_V2");
+    private static final Collection<String> UNIQUE_VALUES_PARTITION_KEYS = Collections.singletonList("key");
+    private static final Collection<String> UNIQUE_VALUES_COLUMN_KEYS = Collections.singletonList("column1");
+    private static final Map<String, DataType.Name> UNIQUE_VALUES_COLUMNS =
+        new HashMap<String, DataType.Name>() {{
+            put( "key", DataType.Name.BLOB );
+            put( "column1", DataType.Name.BLOB );
+            put( "value", DataType.Name.BLOB ); }};
+    private static final Map<String, String> UNIQUE_VALUES_CLUSTERING_ORDER =
+        new HashMap<String, String>(){{ put( "column1", "ASC" ); }};
+
+
+    private static final String UNIQUE_VALUES_LOG_TABLE = CQLUtils.quote("Entity_Unique_Values_V2");
+    private static final Collection<String> UNIQUE_VALUES_LOG_PARTITION_KEYS = Collections.singletonList("key");
+    private static final Collection<String> UNIQUE_VALUES_LOG_COLUMN_KEYS = Collections.singletonList("column1");
+    private static final Map<String, DataType.Name> UNIQUE_VALUES_LOG_COLUMNS =
+        new HashMap<String, DataType.Name>() {{
+            put( "key", DataType.Name.BLOB );
+            put( "column1", DataType.Name.BLOB );
+            put( "value", DataType.Name.BLOB ); }};
+    private static final Map<String, String> UNIQUE_VALUES_LOG_CLUSTERING_ORDER =
+        new HashMap<String, String>(){{ put( "column1", "ASC" ); }};
+
+    private final static TableDefinition uniqueValues =
+        new TableDefinition( UNIQUE_VALUES_TABLE, UNIQUE_VALUES_PARTITION_KEYS, UNIQUE_VALUES_COLUMN_KEYS,
+            UNIQUE_VALUES_COLUMNS, TableDefinition.CacheOption.KEYS, UNIQUE_VALUES_CLUSTERING_ORDER);
+
+    private final static TableDefinition uniqueValuesLog =
+        new TableDefinition( UNIQUE_VALUES_LOG_TABLE, UNIQUE_VALUES_LOG_PARTITION_KEYS, UNIQUE_VALUES_LOG_COLUMN_KEYS,
+            UNIQUE_VALUES_LOG_COLUMNS, TableDefinition.CacheOption.KEYS, UNIQUE_VALUES_LOG_CLUSTERING_ORDER);
+
 
     private static final ScopedRowKeySerializer<TypeField>  ROW_KEY_SER = new ScopedRowKeySerializer<>( UniqueTypeFieldRowKeySerializer.get() );
 
@@ -80,8 +115,10 @@ public class UniqueValueSerializationStrategyV2Impl  extends UniqueValueSerializ
      */
     @Inject
     public UniqueValueSerializationStrategyV2Impl( final Keyspace keyspace, final CassandraFig cassandraFig,
-                                                   final SerializationFig serializationFig ) {
-        super( keyspace, cassandraFig, serializationFig );
+                                                   final SerializationFig serializationFig,
+                                                   final Session session,
+                                                   final CassandraConfig cassandraConfig) {
+        super( keyspace, cassandraFig, serializationFig, session, cassandraConfig );
     }
 
 
@@ -104,7 +141,9 @@ public class UniqueValueSerializationStrategyV2Impl  extends UniqueValueSerializ
     @Override
     public Collection<TableDefinition> getTables() {
 
-        return Collections.emptyList();
+
+        return Arrays.asList( uniqueValues, uniqueValuesLog );
+
     }
 
     @Override
@@ -114,6 +153,12 @@ public class UniqueValueSerializationStrategyV2Impl  extends UniqueValueSerializ
 
 
     @Override
+    protected TableDefinition getUniqueValuesTable(){
+        return uniqueValues;
+    }
+
+
+    @Override
     protected MultiTenantColumnFamily<ScopedRowKey<Id>, UniqueFieldEntry>
     getEntityUniqueLogCF() {
         return CF_ENTITY_UNIQUE_VALUE_LOG;
@@ -121,6 +166,13 @@ public class UniqueValueSerializationStrategyV2Impl  extends UniqueValueSerializ
 
 
     @Override
+    protected TableDefinition getEntityUniqueLogTable(){
+        return uniqueValuesLog;
+    }
+
+
+
+    @Override
     protected TypeField createUniqueValueKey( final Id applicationId,  final String type, final Field field) {
         return new TypeField(type,field);
     }
@@ -131,6 +183,238 @@ public class UniqueValueSerializationStrategyV2Impl  extends UniqueValueSerializ
         return rowKey.getKey().getField();
     }
 
+    @Override
+    protected List<Object> deserializePartitionKey(ByteBuffer bb){
+
+
+        /**
+         *  List<Object> keys = new ArrayList<>(6);
+         keys.add(0, appUUID); // UUID
+         keys.add(1, applicationType); // String
+         keys.add(2, entityType); // String
+         keys.add(3, fieldType); // String
+         keys.add(4, fieldName); // String
+         keys.add(5, fieldValueString); // String
+
+         */
+
+        List<Object> stuff = new ArrayList<>();
+        while(bb.hasRemaining()){
+            ByteBuffer data = CQLUtils.getWithShortLength(bb);
+            if(stuff.size() == 0){
+                stuff.add(DataType.uuid().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED));
+            }else{
+                stuff.add(DataType.text().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED));
+            }
+            byte equality = bb.get(); // we don't use this but take the equality byte off the buffer
+
+        }
+
+        return stuff;
+
+    }
+
+    @Override
+    protected Object serializeUniqueValueLogColumn(UniqueFieldEntry fieldEntry){
+
+        /**
+         *         final UUID version = value.getVersion();
+         final Field<?> field = value.getField();
+
+         final FieldTypeName fieldType = field.getTypeName();
+         final String fieldValue = field.getValue().toString().toLowerCase();
+
+
+         DynamicComposite composite = new DynamicComposite(  );
+
+         //we want to sort ascending to descending by version
+         composite.addComponent( version,  UUID_SERIALIZER, ColumnTypes.UUID_TYPE_REVERSED);
+         composite.addComponent( field.getName(), STRING_SERIALIZER );
+         composite.addComponent( fieldValue, STRING_SERIALIZER );
+         composite.addComponent( fieldType.name() , STRING_SERIALIZER);
+         */
+
+        // values are serialized as strings, not sure why, and always lower cased
+        String fieldValueString = fieldEntry.getField().getValue().toString().toLowerCase();
+
+
+        List<Object> keys = new ArrayList<>(4);
+        keys.add(fieldEntry.getVersion());
+        keys.add(fieldEntry.getField().getName());
+        keys.add(fieldValueString);
+        keys.add(fieldEntry.getField().getTypeName().name());
+
+        String comparator = UUID_TYPE_REVERSED;
+
+        int size = 16+fieldEntry.getField().getName().length()+fieldEntry.getField().getValue().toString().length()+
+            fieldEntry.getField().getTypeName().name().length();
+
+        // we always need to add length for the 2 byte comparator short,  2 byte length short and 1 byte equality
+        size += keys.size()*65;
+
+        // uuid type comparator is longest, ensure we allocate buffer using the max size to avoid overflow
+        size += keys.size()*comparator.length();
+
+        ByteBuffer stuff = ByteBuffer.allocate(size);
+
+
+        for (Object key : keys) {
+
+            if(key.equals(fieldEntry.getVersion())) {
+                int p = comparator.indexOf("(reversed=true)");
+                boolean desc = false;
+                if (p >= 0) {
+                    comparator = comparator.substring(0, p);
+                    desc = true;
+                }
+
+                byte a = (byte) 85; // this is the byte value for UUIDType in astyanax used in legacy data
+                if (desc) {
+                    a = (byte) Character.toUpperCase((char) a);
+                }
+
+                stuff.putShort((short) ('\u8000' | a));
+            }else{
+                comparator = "UTF8Type"; // only strings are being serialized other than UUIDs here
+                stuff.putShort((short)comparator.length());
+                stuff.put(DataType.serializeValue(comparator, ProtocolVersion.NEWEST_SUPPORTED));
+            }
+
+            ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
+            if (kb == null) {
+                kb = ByteBuffer.allocate(0);
+            }
+
+            // put a short that indicates how big the buffer is for this item
+            stuff.putShort((short) kb.remaining());
+
+            // put the actual item
+            stuff.put(kb.slice());
+
+            // put an equality byte ( again not used by part of legacy thrift Astyanax schema)
+            stuff.put((byte) 0);
+
+
+        }
+
+        stuff.flip();
+        return stuff.duplicate();
+
+    }
+
+    @Override
+    protected ByteBuffer getPartitionKey(Id applicationId, String entityType, String fieldType, String fieldName, Object fieldValue ){
+
+        return serializeKey(applicationId.getUuid(), applicationId.getType(),
+            entityType, fieldType, fieldName, fieldValue);
+
+    }
+
+    @Override
+    protected ByteBuffer getLogPartitionKey(final Id applicationId, final Id uniqueValueId){
+
+        return serializeLogKey(applicationId.getUuid(), applicationId.getType(),
+            uniqueValueId.getUuid(), uniqueValueId.getType());
+
+    }
+
+    @Override
+    protected ByteBuffer serializeUniqueValueColumn(EntityVersion entityVersion){
+
+        /**
+         *         final Id entityId = ev.getEntityId();
+         final UUID entityUuid = entityId.getUuid();
+         final String entityType = entityId.getType();
+
+         CompositeBuilder builder = Composites.newDynamicCompositeBuilder();
+
+         builder.addUUID( entityVersion );
+         builder.addUUID( entityUuid );
+         builder.addString(entityType );
+         */
+
+        String comparator = "UTF8Type";
+
+        List<Object> keys = new ArrayList<>(3);
+        keys.add(entityVersion.getEntityVersion());
+        keys.add(entityVersion.getEntityId().getUuid());
+        keys.add(entityVersion.getEntityId().getType());
+
+        // UUIDs are 16 bytes
+        int size = 16+16+entityVersion.getEntityId().getType().length();
+
+        // we always need to add length for the 2 byte comparator short,  2 byte length short and 1 byte equality
+        size += keys.size()*5;
+
+        // we always add comparator to the buffer as well
+        size += keys.size()*comparator.length();
+
+        ByteBuffer stuff = ByteBuffer.allocate(size);
+
+        for (Object key : keys) {
+
+            if(key instanceof UUID){
+                comparator = "UUIDType";
+            }else{
+                comparator = "UTF8Type"; // if it's not a UUID, the only other thing we're serializing is text
+            }
+
+            stuff.putShort((short)comparator.length());
+            stuff.put(DataType.serializeValue(comparator, ProtocolVersion.NEWEST_SUPPORTED));
+
+            ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
+            if (kb == null) {
+                kb = ByteBuffer.allocate(0);
+            }
+
+            // put a short that indicates how big the buffer is for this item
+            stuff.putShort((short) kb.remaining());
+
+            // put the actual item
+            stuff.put(kb.slice());
+
+            // put an equality byte ( again not used by part of legacy thrift Astyanax schema)
+            stuff.put((byte) 0);
+
+
+        }
+
+        stuff.flip();
+        return stuff.duplicate();
+
+    }
+
+    @Override
+    protected List<Object> deserializeUniqueValueColumn(ByteBuffer bb){
+
+        List<Object> stuff = new ArrayList<>();
+        int count = 0;
+        while(bb.hasRemaining()){
+
+            // custom columns have a short at beginning for comparator (which we don't use here )
+            ByteBuffer comparator = CQLUtils.getWithShortLength(bb);
+
+            ByteBuffer data = CQLUtils.getWithShortLength(bb);
+
+
+            // first two composites are UUIDs, rest are strings
+            if(count == 0) {
+                stuff.add(new UUID(data.getLong(), data.getLong()));
+            }else if(count ==1){
+                stuff.add(new UUID(data.getLong(), data.getLong()));
+            }else{
+                stuff.add(DataType.text().deserialize(data.duplicate(), ProtocolVersion.NEWEST_SUPPORTED));
+            }
+
+            byte equality = bb.get(); // we don't use this but take the equality byte off the buffer
+
+            count++;
+        }
+
+        return stuff;
+
+    }
+
 
     @Override
     protected Id createEntityUniqueLogKey( final Id applicationId, final Id uniqueValueId ) {
@@ -142,4 +426,87 @@ public class UniqueValueSerializationStrategyV2Impl  extends UniqueValueSerializ
     public int getImplementationVersion() {
         return CollectionDataVersions.LOG_REMOVAL.getVersion();
     }
+
+
+
+    // row key = app UUID + app type + app UUID + app type + field type + field name + field value
+    private ByteBuffer serializeKey(UUID appUUID,
+                                    String applicationType,
+                                    String entityType,
+                                    String fieldType,
+                                    String fieldName,
+                                    Object fieldValue  ){
+
+        // values are serialized as strings, not sure why, and always lower cased
+        String fieldValueString = fieldValue.toString().toLowerCase();
+
+        List<Object> keys = new ArrayList<>(6);
+        keys.add(0, appUUID);
+        keys.add(1, applicationType);
+        keys.add(2, entityType);
+        keys.add(3, fieldType);
+        keys.add(4, fieldName);
+        keys.add(5, fieldValueString);
+
+
+        // UUIDs are 16 bytes, allocate the buffer accordingly
+        int size = 16 + applicationType.length() + entityType.length() + fieldType.length() + fieldName.length()+fieldValueString.length();
+
+
+        // we always need to add length for the 2 byte short and 1 byte equality
+        size += keys.size()*3;
+
+        ByteBuffer stuff = ByteBuffer.allocate(size);
+
+        for (Object key : keys) {
+
+            ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
+            if (kb == null) {
+                kb = ByteBuffer.allocate(0);
+            }
+
+            stuff.putShort((short) kb.remaining());
+            stuff.put(kb.slice());
+            stuff.put((byte) 0);
+
+
+        }
+        stuff.flip();
+        return stuff.duplicate();
+
+    }
+
+    private ByteBuffer serializeLogKey(UUID appUUID, String applicationType, UUID entityId, String entityType){
+
+        List<Object> keys = new ArrayList<>(4);
+        keys.add(appUUID);
+        keys.add(applicationType);
+        keys.add(entityId);
+        keys.add(entityType);
+
+        int size = 16+applicationType.length()+16+entityType.length();
+
+        // we always need to add length for the 2 byte short and 1 byte equality
+        size += keys.size()*3;
+
+        ByteBuffer stuff = ByteBuffer.allocate(size);
+
+        for (Object key : keys) {
+
+            ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
+            if (kb == null) {
+                kb = ByteBuffer.allocate(0);
+            }
+
+            stuff.putShort((short) kb.remaining());
+            stuff.put(kb.slice());
+            stuff.put((byte) 0);
+
+
+        }
+        stuff.flip();
+        return stuff.duplicate();
+
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c609878/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
index a110ed7..8d52d8b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
@@ -26,6 +26,8 @@ import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.datastax.driver.core.Session;
+import org.apache.cassandra.cql.BatchStatement;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,6 +73,7 @@ public class MvccEntityDataMigrationImpl implements DataMigration{
     private static final Logger logger = LoggerFactory.getLogger( MvccEntityDataMigrationImpl.class );
 
     private final Keyspace keyspace;
+    private final Session session;
     private final VersionedMigrationSet<MvccEntitySerializationStrategy> allVersions;
     private final MvccEntitySerializationStrategyV3Impl mvccEntitySerializationStrategyV3;
     private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
@@ -80,12 +83,14 @@ public class MvccEntityDataMigrationImpl implements DataMigration{
 
     @Inject
     public MvccEntityDataMigrationImpl( final Keyspace keyspace,
+                                        final Session session,
                                         final VersionedMigrationSet<MvccEntitySerializationStrategy> allVersions,
                                         final MvccEntitySerializationStrategyV3Impl mvccEntitySerializationStrategyV3,
                                         final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
                                         final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
                                         final MigrationDataProvider<EntityIdScope> migrationDataProvider ) {
         this.keyspace = keyspace;
+        this.session = session;
         this.allVersions = allVersions;
         this.mvccEntitySerializationStrategyV3 = mvccEntitySerializationStrategyV3;
         this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
@@ -163,8 +168,9 @@ public class MvccEntityDataMigrationImpl implements DataMigration{
 
                         final List<Id> toSaveIds = new ArrayList<>( entities.size() );
 
+                    final com.datastax.driver.core.BatchStatement uniqueBatch = new com.datastax.driver.core.BatchStatement();
 
-                        for ( EntityToSaveMessage message : entities ) {
+                    for ( EntityToSaveMessage message : entities ) {
                             try {
                                 final MutationBatch entityRewrite = migration.to.write(message.scope, message.entity);
 
@@ -197,17 +203,14 @@ public class MvccEntityDataMigrationImpl implements DataMigration{
                                 // time with
                                 // no TTL so that cleanup can clean up
                                 // older values
+
+
                                 for (final Field field : EntityUtils.getUniqueFields(message.entity.getEntity().get())) {
 
                                     final UniqueValue written = new UniqueValueImpl(field, entityId, version);
 
-                                    final MutationBatch mb = uniqueValueSerializationStrategy.write(message.scope, written);
-
+                                    uniqueBatch.add(uniqueValueSerializationStrategy.writeCQL(message.scope, written, -1));
 
-                                    // merge into our
-                                    // existing mutation
-                                    // batch
-                                    totalBatch.mergeShallow(mb);
                                 }
 
 
@@ -232,7 +235,7 @@ public class MvccEntityDataMigrationImpl implements DataMigration{
 
                         }
 
-                        executeBatch( migration.to.getImplementationVersion(), totalBatch, observer, atomicLong );
+                        executeBatch( migration.to.getImplementationVersion(), totalBatch, observer, atomicLong, uniqueBatch );
 
                         //now run our cleanup task
 
@@ -252,10 +255,13 @@ public class MvccEntityDataMigrationImpl implements DataMigration{
     }
 
 
-    protected void executeBatch( final int targetVersion, final MutationBatch batch, final ProgressObserver po,
-                                 final AtomicLong count ) {
+    protected void executeBatch(final int targetVersion, final MutationBatch batch, final ProgressObserver po,
+                                final AtomicLong count, com.datastax.driver.core.BatchStatement uniqueBatch) {
         try {
+
             batch.execute();
+            session.execute(uniqueBatch);
+
 
             po.update( targetVersion, "Finished copying " + count + " entities to the new format" );
         }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c609878/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
index ad6eac6..b18b095 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
@@ -1,7 +1,13 @@
 package org.apache.usergrid.persistence.collection.mvcc.stage.delete;
 
 
+import com.datastax.driver.core.Session;
+import com.google.inject.Inject;
+import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
+import org.apache.usergrid.persistence.core.test.ITRunner;
+import org.apache.usergrid.persistence.core.test.UseModules;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
 
 import org.apache.usergrid.persistence.collection.MvccEntity;
@@ -32,6 +38,9 @@ import static org.mockito.Mockito.when;
 /** @author tnine */
 public class MarkCommitTest extends AbstractMvccEntityStageTest {
 
+    @Inject
+
+
     /** Standard flow */
     @Test
     public void testStartStage() throws Exception {
@@ -39,6 +48,8 @@ public class MarkCommitTest extends AbstractMvccEntityStageTest {
 
         final ApplicationScope context = mock( ApplicationScope.class );
 
+        final Session session = mock(Session.class);
+
 
         //mock returning a mock mutation when we do a log entry write
         final MvccLogEntrySerializationStrategy logStrategy = mock( MvccLogEntrySerializationStrategy.class );
@@ -71,7 +82,7 @@ public class MarkCommitTest extends AbstractMvccEntityStageTest {
 
 
         //run the stage
-        WriteCommit newStage = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy );
+        WriteCommit newStage = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, session );
 
 
         //verify the observable is correct

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c609878/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
index 58642d3..60281d4 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
@@ -18,7 +18,13 @@
 package org.apache.usergrid.persistence.collection.mvcc.stage.write;
 
 
+import com.datastax.driver.core.Session;
+import com.google.inject.Inject;
+import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
+import org.apache.usergrid.persistence.core.test.ITRunner;
+import org.apache.usergrid.persistence.core.test.UseModules;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
 
 import org.apache.usergrid.persistence.collection.MvccEntity;
@@ -53,6 +59,8 @@ public class WriteCommitTest extends AbstractMvccEntityStageTest {
 
         final ApplicationScope context = mock( ApplicationScope.class );
 
+        final Session session = mock(Session.class);
+
 
         //mock returning a mock mutation when we do a log entry write
         final MvccLogEntrySerializationStrategy logStrategy = mock( MvccLogEntrySerializationStrategy.class );
@@ -84,7 +92,7 @@ public class WriteCommitTest extends AbstractMvccEntityStageTest {
 
 
         //run the stage
-        WriteCommit newStage = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy );
+        WriteCommit newStage = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, session );
 
 
         Entity result = newStage.call( new CollectionIoEvent<MvccEntity>( context, mvccEntityInput ) ).getEvent().getEntity().get();
@@ -116,6 +124,9 @@ public class WriteCommitTest extends AbstractMvccEntityStageTest {
         /**
          * Write up mock mutations so we don't npe on the our operations, but rather on the input
          */
+
+        final Session session = mock(Session.class);
+
         final MvccLogEntrySerializationStrategy logStrategy = mock( MvccLogEntrySerializationStrategy.class );
         final MutationBatch logMutation = mock( MutationBatch.class );
 
@@ -131,7 +142,7 @@ public class WriteCommitTest extends AbstractMvccEntityStageTest {
         when( mvccEntityStrategy.write( any( ApplicationScope.class ), any( MvccEntity.class ) ) )
                 .thenReturn( entityMutation );
 
-        new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy ).call( event );
+        new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, session ).call( event );
     }
 }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c609878/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
index 09876fb..3ddc14d 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
@@ -18,6 +18,7 @@
 package org.apache.usergrid.persistence.collection.mvcc.stage.write;
 
 
+import com.datastax.driver.core.Session;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -59,7 +60,8 @@ public class WriteUniqueVerifyTest {
     @Rule
     public MigrationManagerRule migrationManagerRule;
 
-
+    @Inject
+    private Session session;
 
     @Inject
     private SerializationFig fig;
@@ -82,7 +84,7 @@ public class WriteUniqueVerifyTest {
         final MvccEntity mvccEntity = fromEntity( entity );
 
         // run the stage
-        WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, fig, keyspace,cassandraConfig );
+        WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, fig, keyspace,cassandraConfig, session );
 
        newStage.call(
             new CollectionIoEvent<>( collectionScope, mvccEntity ) ) ;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c609878/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java
index fcf22cf..3ffdb65 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java
@@ -23,6 +23,8 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.UUID;
 
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.Session;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
@@ -64,6 +66,8 @@ public abstract class UniqueValueSerializationStrategyImplTest {
     @Rule
     public MigrationManagerRule migrationManagerRule;
 
+    @Inject
+    private Session session;
 
     private UniqueValueSerializationStrategy strategy;
 
@@ -91,7 +95,9 @@ public abstract class UniqueValueSerializationStrategyImplTest {
         Id entityId = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
         UUID version = UUIDGenerator.newTimeUUID();
         UniqueValue stored = new UniqueValueImpl( field, entityId, version );
-        strategy.write( scope, stored ).execute();
+        //strategy.write( scope, stored ).execute();
+        BatchStatement batch = strategy.writeCQL(scope, stored, -1);
+        session.execute(batch);
 
         UniqueValueSet fields = strategy.load( scope, entityId.getType(), Collections.<Field>singleton( field ) );
 
@@ -127,7 +133,9 @@ public abstract class UniqueValueSerializationStrategyImplTest {
         Id entityId = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
         UUID version = UUIDGenerator.newTimeUUID();
         UniqueValue stored = new UniqueValueImpl( field, entityId, version );
-        strategy.write( scope, stored, 5 ).execute();
+        //strategy.write( scope, stored, 5 ).execute();
+        BatchStatement batch = strategy.writeCQL(scope, stored, 5);
+        session.execute(batch);
 
         Thread.sleep( 1000 );
 
@@ -179,7 +187,10 @@ public abstract class UniqueValueSerializationStrategyImplTest {
         Id entityId = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
         UUID version = UUIDGenerator.newTimeUUID();
         UniqueValue stored = new UniqueValueImpl( field, entityId, version );
-        strategy.write( scope, stored ).execute();
+
+        //strategy.write( scope, stored ).execute();
+        BatchStatement batch = strategy.writeCQL( scope, stored, -1);
+        session.execute(batch);
 
         strategy.delete( scope, stored ).execute();
 
@@ -207,8 +218,9 @@ public abstract class UniqueValueSerializationStrategyImplTest {
         Id entityId = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
         UUID version = UUIDGenerator.newTimeUUID();
         UniqueValue stored = new UniqueValueImpl( field, entityId, version );
-        strategy.write( scope, stored ).execute();
-
+        //strategy.write( scope, stored ).execute();
+        BatchStatement batch = strategy.writeCQL( scope, stored, -1);
+        session.execute(batch);
 
         UniqueValueSet fields = strategy.load( scope, entityId.getType(), Collections.<Field>singleton( field ) );
 
@@ -278,9 +290,13 @@ public abstract class UniqueValueSerializationStrategyImplTest {
         UniqueValue version1Field1Value = new UniqueValueImpl( version1Field1, entityId, version1 );
         UniqueValue version1Field2Value = new UniqueValueImpl( version1Field2, entityId, version1 );
 
-        final MutationBatch batch = strategy.write( scope, version1Field1Value );
-        batch.mergeShallow( strategy.write( scope, version1Field2Value ) );
+        //final MutationBatch batch = strategy.write( scope, version1Field1Value );
+        //batch.mergeShallow( strategy.write( scope, version1Field2Value ) );
 
+        final BatchStatement batch = new BatchStatement();
+
+        batch.add(strategy.writeCQL( scope, version1Field1Value, -1));
+        batch.add(strategy.writeCQL( scope, version1Field2Value, -1));
 
         //write V2 of everything
         final UUID version2 = UUIDGenerator.newTimeUUID();
@@ -292,10 +308,15 @@ public abstract class UniqueValueSerializationStrategyImplTest {
         UniqueValue version2Field1Value = new UniqueValueImpl( version2Field1, entityId, version2 );
         UniqueValue version2Field2Value = new UniqueValueImpl( version2Field2, entityId, version2 );
 
-        batch.mergeShallow( strategy.write( scope, version2Field1Value ) );
-        batch.mergeShallow( strategy.write( scope, version2Field2Value ) );
+        //batch.mergeShallow( strategy.write( scope, version2Field1Value ) );
+        //batch.mergeShallow( strategy.write( scope, version2Field2Value ) );
+
+        batch.add(strategy.writeCQL( scope, version2Field1Value, -1));
+        batch.add(strategy.writeCQL( scope, version2Field2Value, -1));
+
+        session.execute(batch);
 
-        batch.execute();
+        //batch.execute();
 
 
         UniqueValueSet fields = strategy.load( scope, entityId.getType(), Arrays.<Field>asList( version1Field1, version1Field2 ) );