You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/02/25 02:17:46 UTC

incubator-usergrid git commit: Adds test and verifies api works as expected

Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-405 9f65a973b -> 49fa8123d


Adds test and verifies api works as expected


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/49fa8123
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/49fa8123
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/49fa8123

Branch: refs/heads/USERGRID-405
Commit: 49fa8123d09b8afe8ff3496a1ce7019d26e9eacf
Parents: 9f65a97
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Feb 24 18:17:43 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Feb 24 18:17:43 2015 -0700

----------------------------------------------------------------------
 .../UniqueValueSerializationStrategy.java       |  16 +-
 .../impl/UniqueFieldEntrySerializer.java        |  20 +-
 .../UniqueValueSerializationStrategyImpl.java   | 228 +++++++++++++++----
 ...niqueValueSerializationStrategyImplTest.java | 143 +++++++++++-
 4 files changed, 336 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/49fa8123/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
index a5bec0c..5b7898e 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
@@ -24,7 +24,6 @@ import java.util.Iterator;
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.serialization.impl.UniqueFieldEntry;
 import org.apache.usergrid.persistence.core.migration.schema.Migration;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.field.Field;
@@ -35,45 +34,46 @@ import org.apache.usergrid.persistence.model.field.Field;
  */
 public interface UniqueValueSerializationStrategy extends Migration {
 
+
     /**
      * Write the specified UniqueValue to Cassandra with optional timeToLive in milliseconds.
      *
      * @param uniqueValue Object to be written
      * @return MutatationBatch that encapsulates operation, caller may or may not execute.
      */
-    public MutationBatch write( CollectionScope scope,  UniqueValue uniqueValue );
+    public MutationBatch write( CollectionScope collectionScope,  UniqueValue uniqueValue );
 
     /**
      * Write the specified UniqueValue to Cassandra with optional timeToLive in milliseconds.
      *
      * @param uniqueValue Object to be written
-     * @param timeToLive How long object should live in seconds
+     * @param timeToLive How long object should live in seconds.  -1 implies store forever
      * @return MutatationBatch that encapsulates operation, caller may or may not execute.
      */
-    public MutationBatch write( CollectionScope scope,  UniqueValue uniqueValue, Integer timeToLive );
+    public MutationBatch write( CollectionScope collectionScope,  UniqueValue uniqueValue, int timeToLive );
 
     /**
      * Load UniqueValue that matches field from collection or null if that value does not exist.
      *
-     * @param colScope Collection scope in which to look for field name/value
+     * @param collectionScope scope in which to look for field name/value
      * @param fields Field name/value to search for
      *
      * @return UniqueValueSet containing fields from the collection that exist in cassandra
      *
      * @throws ConnectionException on error connecting to Cassandra
      */
-    public UniqueValueSet load( CollectionScope colScope, Collection<Field> fields ) throws ConnectionException;
+    public UniqueValueSet load( CollectionScope collectionScope, Collection<Field> fields ) throws ConnectionException;
 
 
     /**
      * Loads the currently persisted history of every unique value the entity has held.  This will
      * start from the max version and return values in descending version order.  Note that for entities
      * with more than one unique field, sequential fields can be returned with the same version.
-     * @param scope The scope the entity is stored in
+     * @param collectionScope The scope the entity is stored in
      * @param entityId
      * @return
      */
-    public Iterator<UniqueFieldEntry> getAllUniqueFields(CollectionScope scope, Id entityId);
+    public Iterator<UniqueValue> getAllUniqueFields(CollectionScope collectionScope, Id entityId);
 
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/49fa8123/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueFieldEntrySerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueFieldEntrySerializer.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueFieldEntrySerializer.java
index 6ec323f..51bfd7b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueFieldEntrySerializer.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueFieldEntrySerializer.java
@@ -26,6 +26,7 @@ package org.apache.usergrid.persistence.collection.serialization.impl;
 import java.nio.ByteBuffer;
 import java.util.UUID;
 
+import org.apache.usergrid.persistence.core.astyanax.ColumnTypes;
 import org.apache.usergrid.persistence.core.astyanax.DynamicCompositeParserImpl;
 import org.apache.usergrid.persistence.model.field.BooleanField;
 import org.apache.usergrid.persistence.model.field.DoubleField;
@@ -43,6 +44,8 @@ import com.netflix.astyanax.model.Composites;
 import com.netflix.astyanax.model.DynamicComposite;
 import com.netflix.astyanax.serializers.AbstractSerializer;
 import com.netflix.astyanax.serializers.DynamicCompositeSerializer;
+import com.netflix.astyanax.serializers.StringSerializer;
+import com.netflix.astyanax.serializers.UUIDSerializer;
 
 
 /**
@@ -51,6 +54,8 @@ import com.netflix.astyanax.serializers.DynamicCompositeSerializer;
 public class UniqueFieldEntrySerializer extends AbstractSerializer<UniqueFieldEntry> {
 
 
+    private static final UUIDSerializer UUID_SERIALIZER = UUIDSerializer.get();
+    private static final StringSerializer STRING_SERIALIZER = StringSerializer.get();
     private static final UniqueFieldEntrySerializer INSTANCE = new UniqueFieldEntrySerializer();
 
 
@@ -59,8 +64,6 @@ public class UniqueFieldEntrySerializer extends AbstractSerializer<UniqueFieldEn
     @Override
     public ByteBuffer toByteBuffer( final UniqueFieldEntry value ) {
 
-        CompositeBuilder builder = Composites.newDynamicCompositeBuilder();
-
 
         final UUID version = value.getVersion();
         final Field<?> field = value.getField();
@@ -69,12 +72,15 @@ public class UniqueFieldEntrySerializer extends AbstractSerializer<UniqueFieldEn
         final String fieldValue = field.getValue().toString().toLowerCase();
 
 
-        builder.addUUID( version );
-        builder.addString( field.getName() );
-        builder.addString( fieldValue );
-        builder.addString( fieldType.name() );
+        DynamicComposite composite = new DynamicComposite(  );
+
+        //we want to sort ascending to descending by version
+        composite.addComponent( version,  UUID_SERIALIZER, ColumnTypes.UUID_TYPE_REVERSED);
+        composite.addComponent( field.getName(), STRING_SERIALIZER );
+        composite.addComponent( fieldValue, STRING_SERIALIZER );
+        composite.addComponent( fieldType.name() , STRING_SERIALIZER);
 
-        return builder.build();
+        return composite.serialize();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/49fa8123/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
index 03624e9..24072d1 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
@@ -19,8 +19,8 @@ package org.apache.usergrid.persistence.collection.serialization.impl;
 
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
@@ -31,19 +31,26 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.marshal.BytesType;
 
 import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.exception.DataCorruptionException;
+import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
+import org.apache.usergrid.persistence.collection.util.EntityUtils;
+import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
+import org.apache.usergrid.persistence.core.astyanax.ColumnParser;
 import org.apache.usergrid.persistence.core.astyanax.ColumnTypes;
-import org.apache.usergrid.persistence.core.astyanax.FieldBufferSerializer;
 import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
 import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
 import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
 import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
+import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.field.Field;
 
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import com.netflix.astyanax.ColumnListMutation;
@@ -52,8 +59,8 @@ import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 import com.netflix.astyanax.model.Column;
 import com.netflix.astyanax.model.Row;
-import com.netflix.astyanax.serializers.DynamicCompositeSerializer;
-import com.netflix.astyanax.serializers.UUIDSerializer;
+import com.netflix.astyanax.query.RowQuery;
+import com.netflix.astyanax.serializers.AbstractSerializer;
 import com.netflix.astyanax.util.RangeBuilder;
 
 
@@ -73,6 +80,7 @@ public class UniqueValueSerializationStrategyImpl implements UniqueValueSerializ
     private static final MultiTennantColumnFamily<ScopedRowKey<CollectionPrefixedKey<Field>>, EntityVersion>
         CF_UNIQUE_VALUES = new MultiTennantColumnFamily<>( "Unique_Values", ROW_KEY_SER, ENTITY_VERSION_SER );
 
+
     private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
 
 
@@ -80,12 +88,12 @@ public class UniqueValueSerializationStrategyImpl implements UniqueValueSerializ
         new CollectionScopedRowKeySerializer<>( ID_SER );
 
 
+
     private static final MultiTennantColumnFamily<ScopedRowKey<CollectionPrefixedKey<Id>>, UniqueFieldEntry>
-        CF_ENTITY_UNIQUE_VALUE =
+        CF_ENTITY_UNIQUE_VALUES =
         new MultiTennantColumnFamily<>( "Entity_Unique_Values", ENTITY_ROW_KEY_SER, UniqueFieldEntrySerializer.get() );
 
-    private static final FieldBufferSerializer FIELD_BUFFER_SERIALIZER = FieldBufferSerializer.get();
-
+    public static final int COL_VALUE = 0x0;
 
 
     protected final Keyspace keyspace;
@@ -102,73 +110,105 @@ public class UniqueValueSerializationStrategyImpl implements UniqueValueSerializ
     }
 
 
-    @Override
-    public Collection<MultiTennantColumnFamilyDefinition> getColumnFamilies() {
+    public MutationBatch write( final CollectionScope collectionScope, UniqueValue value ) {
 
-        MultiTennantColumnFamilyDefinition cf =
-                new MultiTennantColumnFamilyDefinition( CF_UNIQUE_VALUES, BytesType.class.getSimpleName(),
-                        ColumnTypes.DYNAMIC_COMPOSITE_TYPE, BytesType.class.getSimpleName(),
-                        MultiTennantColumnFamilyDefinition.CacheOption.KEYS );
 
-        return Collections.singleton( cf );
-    }
+        Preconditions.checkNotNull( value, "value is required" );
+
 
+        final Id entityId = value.getEntityId();
+        final UUID entityVersion = value.getEntityVersion();
+        final Field<?> field = value.getField();
+
+        ValidationUtils.verifyIdentity( entityId );
+        ValidationUtils.verifyVersion( entityVersion );
+
+        log.debug( "Writing unique value collectionScope={} id={} version={} name={} value={} ttl={} ", new Object[] {
+            collectionScope.getName(), entityId, entityVersion, value.getField().getName(), value.getField().getValue()
+        } );
+
+        final EntityVersion ev = new EntityVersion( entityId, entityVersion );
+        final UniqueFieldEntry uniqueFieldEntry = new UniqueFieldEntry( entityVersion, field );
+
+        return doWrite( collectionScope, value, new UniqueValueSerializationStrategyImpl.RowOp() {
+
+            @Override
+            public void doLookup( final ColumnListMutation<EntityVersion> colMutation ) {
+                colMutation.putColumn( ev, COL_VALUE );
+            }
 
-    public MutationBatch write(final CollectionScope scope,  UniqueValue uniqueValue ) {
-        return write( scope, uniqueValue, Integer.MAX_VALUE );
+
+            @Override
+            public void doLog( final ColumnListMutation<UniqueFieldEntry> colMutation ) {
+                colMutation.putColumn( uniqueFieldEntry, COL_VALUE );
+            }
+        } );
     }
 
 
     @Override
-    public MutationBatch write(final CollectionScope scope,  final UniqueValue value, final Integer timeToLive ) {
+    public MutationBatch write( final CollectionScope collectionScope, final UniqueValue value, final int timeToLive ) {
 
         Preconditions.checkNotNull( value, "value is required" );
-        Preconditions.checkNotNull( timeToLive, "timeToLive is required" );
+        Preconditions.checkArgument( timeToLive > 0, "timeToLive must be greater than 0 is required" );
 
         final Id entityId = value.getEntityId();
         final UUID entityVersion = value.getEntityVersion();
+        final Field<?> field = value.getField();
 
         ValidationUtils.verifyIdentity( entityId );
-              ValidationUtils.verifyVersion( entityVersion );
+        ValidationUtils.verifyVersion( entityVersion );
 
-        log.debug( "Writing unique value scope={} id={} version={} name={} value={} ttl={} ", new Object[] {
-               scope.getName(), entityId, entityVersion,
-                value.getField().getName(), value.getField().getValue(), timeToLive
-        } );
+        final EntityVersion ev = new EntityVersion( entityId, entityVersion );
+        final UniqueFieldEntry uniqueFieldEntry = new UniqueFieldEntry( entityVersion, field );
 
-        final EntityVersion ev = new EntityVersion( value.getEntityId(), value.getEntityVersion() );
+        return doWrite( collectionScope, value, new UniqueValueSerializationStrategyImpl.RowOp() {
 
-        final Integer ttl;
-        if ( timeToLive.equals( Integer.MAX_VALUE ) ) {
-            ttl = null;
-        }
-        else {
-            ttl = timeToLive;
-        }
+            @Override
+            public void doLookup( final ColumnListMutation<EntityVersion> colMutation ) {
+                colMutation.putColumn( ev, COL_VALUE, timeToLive );
+            }
 
-        return doWrite( scope, value.getField(), new UniqueValueSerializationStrategyImpl.RowOp() {
 
+            //we purposefully leave out TTL.  Worst case we issue deletes against tombstoned columns
+            //best case, we clean up an invalid secondary index entry when the log is used
             @Override
-            public void doOp( final ColumnListMutation<EntityVersion> colMutation ) {
-                colMutation.putColumn( ev, 0x0, ttl );
+            public void doLog( final ColumnListMutation<UniqueFieldEntry> colMutation ) {
+                colMutation.putColumn( uniqueFieldEntry, COL_VALUE );
             }
         } );
     }
 
 
     @Override
-    public MutationBatch delete(final CollectionScope scope,  UniqueValue value ) {
+    public MutationBatch delete( final CollectionScope scope, UniqueValue value ) {
 
         Preconditions.checkNotNull( value, "value is required" );
 
-        final EntityVersion ev = new EntityVersion( value.getEntityId(), value.getEntityVersion() );
 
-        return doWrite( scope, value.getField(), new UniqueValueSerializationStrategyImpl.RowOp() {
+        final Id entityId = value.getEntityId();
+        final UUID entityVersion = value.getEntityVersion();
+        final Field<?> field = value.getField();
+
+        ValidationUtils.verifyIdentity( entityId );
+        ValidationUtils.verifyVersion( entityVersion );
+
+
+        final EntityVersion ev = new EntityVersion( entityId, entityVersion );
+        final UniqueFieldEntry uniqueFieldEntry = new UniqueFieldEntry( entityVersion, field );
+
+        return doWrite( scope, value, new UniqueValueSerializationStrategyImpl.RowOp() {
 
             @Override
-            public void doOp( final ColumnListMutation<EntityVersion> colMutation ) {
+            public void doLookup( final ColumnListMutation<EntityVersion> colMutation ) {
                 colMutation.deleteColumn( ev );
             }
+
+
+            @Override
+            public void doLog( final ColumnListMutation<UniqueFieldEntry> colMutation ) {
+                colMutation.deleteColumn( uniqueFieldEntry );
+            }
         } );
     }
 
@@ -176,20 +216,35 @@ public class UniqueValueSerializationStrategyImpl implements UniqueValueSerializ
     /**
      * Do the column update or delete for the given column and row key
      *
-     * @param context We need to use this when getting the keyspace
+     * @param collectionScope We need to use this when getting the keyspace
      */
-    private MutationBatch doWrite( CollectionScope context, Field field, RowOp op ) {
+    private MutationBatch doWrite( CollectionScope collectionScope, UniqueValue uniqueValue, RowOp op ) {
         final MutationBatch batch = keyspace.prepareMutationBatch();
-        final CollectionPrefixedKey<Field> collectionPrefixedKey = new CollectionPrefixedKey<>( context.getName(), context.getOwner(), field );
+        final CollectionPrefixedKey<Field> uniquePrefixedKey =
+            new CollectionPrefixedKey<>( collectionScope.getName(), collectionScope.getOwner(), uniqueValue.getField() );
+
+
+        op.doLookup( batch
+            .withRow( CF_UNIQUE_VALUES, ScopedRowKey.fromKey( collectionScope.getApplication(), uniquePrefixedKey ) ) );
+
+
+        final Id ownerId = collectionScope.getOwner();
+        final String collectionName = collectionScope.getName();
+
+        final CollectionPrefixedKey<Id> collectionPrefixedEntityKey =
+            new CollectionPrefixedKey<>( collectionName, ownerId, uniqueValue.getEntityId() );
+
+
+        op.doLog( batch.withRow( CF_ENTITY_UNIQUE_VALUES,
+            ScopedRowKey.fromKey( collectionScope.getApplication(), collectionPrefixedEntityKey ) ) );
 
 
-        op.doOp( batch.withRow( CF_UNIQUE_VALUES, ScopedRowKey.fromKey( context.getApplication(), collectionPrefixedKey ) ) );
         return batch;
     }
 
 
     @Override
-    public UniqueValueSet load(final CollectionScope colScope, final Collection<Field> fields )
+    public UniqueValueSet load(final CollectionScope collectionScope, final Collection<Field> fields )
             throws ConnectionException {
 
         Preconditions.checkNotNull( fields, "fields are required" );
@@ -198,9 +253,9 @@ public class UniqueValueSerializationStrategyImpl implements UniqueValueSerializ
 
         final List<ScopedRowKey<CollectionPrefixedKey<Field>>> keys = new ArrayList<>( fields.size() );
 
-        final Id applicationId = colScope.getApplication();
-        final Id ownerId = colScope.getOwner();
-        final String collectionName = colScope.getName();
+        final Id applicationId = collectionScope.getApplication();
+        final Id ownerId = collectionScope.getOwner();
+        final String collectionName = collectionScope.getName();
 
         for ( Field field : fields ) {
 
@@ -249,8 +304,31 @@ public class UniqueValueSerializationStrategyImpl implements UniqueValueSerializ
 
 
     @Override
-    public Iterator<UniqueFieldEntry> getAllUniqueFields( final CollectionScope scope, final Id entityId ) {
-        return null;
+    public Iterator<UniqueValue> getAllUniqueFields( final CollectionScope collectionScope, final Id entityId ) {
+
+
+        Preconditions.checkNotNull( collectionScope, "collectionScope is required" );
+        Preconditions.checkNotNull( entityId, "entity id is required" );
+
+
+        final Id applicationId = collectionScope.getApplication();
+        final Id ownerId = collectionScope.getOwner();
+        final String collectionName = collectionScope.getName();
+
+        final CollectionPrefixedKey<Id> collectionPrefixedKey =
+                new CollectionPrefixedKey<>( collectionName, ownerId, entityId );
+
+
+        final ScopedRowKey<CollectionPrefixedKey<Id>> rowKey =
+                ScopedRowKey.fromKey( applicationId, collectionPrefixedKey );
+
+
+        RowQuery<ScopedRowKey<CollectionPrefixedKey<Id>>, UniqueFieldEntry> query =
+                keyspace.prepareQuery( CF_ENTITY_UNIQUE_VALUES ).getKey( rowKey ).withColumnRange(
+                    ( UniqueFieldEntry ) null, null, false, 1000 );
+
+        return new ColumnNameIterator( query, new UniqueEntryParser( entityId ), false );
+
     }
 
 
@@ -258,6 +336,56 @@ public class UniqueValueSerializationStrategyImpl implements UniqueValueSerializ
      * Simple callback to perform puts and deletes with a common row setup code
      */
     private static interface RowOp {
-        void doOp( ColumnListMutation<EntityVersion> colMutation );
+
+        /**
+         * Execute the mutation into the lookup CF_UNIQUE_VALUES row
+         * @param colMutation
+         */
+        void doLookup( ColumnListMutation<EntityVersion> colMutation );
+
+        /**
+         * Execute the mutation into the lCF_ENTITY_UNIQUE_VALUESLUE row
+         * @param colMutation
+         */
+        void doLog( ColumnListMutation<UniqueFieldEntry> colMutation);
+    }
+
+
+
+    /**
+     * Converts raw columns to the expected output
+     */
+    private static final class UniqueEntryParser implements ColumnParser<UniqueFieldEntry, UniqueValue> {
+
+        private final Id entityId;
+
+
+        private UniqueEntryParser( final Id entityId ) {this.entityId = entityId;}
+
+
+        @Override
+        public UniqueValue parseColumn( final Column<UniqueFieldEntry> column ) {
+            final UniqueFieldEntry entry = column.getName();
+
+            return new UniqueValueImpl( entry.getField(), entityId, entry.getVersion() );
+        }
+    }
+
+
+
+    @Override
+    public Collection<MultiTennantColumnFamilyDefinition> getColumnFamilies() {
+
+        final MultiTennantColumnFamilyDefinition uniqueLookupCF =
+                new MultiTennantColumnFamilyDefinition( CF_UNIQUE_VALUES, BytesType.class.getSimpleName(),
+                        ColumnTypes.DYNAMIC_COMPOSITE_TYPE, BytesType.class.getSimpleName(),
+                        MultiTennantColumnFamilyDefinition.CacheOption.KEYS );
+
+        final MultiTennantColumnFamilyDefinition uniqueLogCF =
+                        new MultiTennantColumnFamilyDefinition( CF_ENTITY_UNIQUE_VALUES, BytesType.class.getSimpleName(),
+                                ColumnTypes.DYNAMIC_COMPOSITE_TYPE, BytesType.class.getSimpleName(),
+                                MultiTennantColumnFamilyDefinition.CacheOption.KEYS );
+
+        return Arrays.asList( uniqueLookupCF, uniqueLogCF);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/49fa8123/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueSerializationStrategyImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueSerializationStrategyImplTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueSerializationStrategyImplTest.java
index c9fd56a..b03be04 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueSerializationStrategyImplTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueSerializationStrategyImplTest.java
@@ -18,6 +18,7 @@
 package org.apache.usergrid.persistence.collection.mvcc.stage.write;
 
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.UUID;
@@ -48,16 +49,18 @@ import org.apache.usergrid.persistence.model.field.StringField;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
 import com.google.inject.Inject;
+import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 
 @RunWith(ITRunner.class)
 @UseModules(TestCollectionModule.class)
 public class UniqueValueSerializationStrategyImplTest {
-    private static final Logger LOG = LoggerFactory.getLogger( UniqueValueSerializationStrategyImplTest.class );
+
 
     @Inject
     @Rule
@@ -85,15 +88,18 @@ public class UniqueValueSerializationStrategyImplTest {
         Assert.assertNotNull( retrieved );
         assertEquals( stored, retrieved );
 
-        Iterator<UniqueFieldEntry> allFieldsWritten = strategy.getAllUniqueFields( scope, entityId );
+        Iterator<UniqueValue> allFieldsWritten = strategy.getAllUniqueFields( scope, entityId );
 
         assertTrue(allFieldsWritten.hasNext());
 
         //test this interface. In most cases, we won't know the field name, so we want them all
-        UniqueFieldEntry allFieldsValue = allFieldsWritten.next();
+        UniqueValue allFieldsValue = allFieldsWritten.next();
         Assert.assertNotNull( allFieldsValue );
-        assertEquals( stored, allFieldsValue.getField() );
-        assertEquals(version, allFieldsValue.getVersion());
+
+        assertEquals( field, allFieldsValue.getField() );
+        assertEquals(version, allFieldsValue.getEntityVersion());
+
+        assertFalse(allFieldsWritten.hasNext());
 
     }
 
@@ -128,6 +134,25 @@ public class UniqueValueSerializationStrategyImplTest {
 
         UniqueValue nullExpected = fields.getValue( field.getName() );
         Assert.assertNull( nullExpected );
+
+
+        //we still want to retain the log entry, even if we don't retain the unique value.  Deleting something
+        //that doesn't exist is a tombstone, but so is the timeout.
+        Iterator<UniqueValue> allFieldsWritten = strategy.getAllUniqueFields( scope, entityId );
+
+        assertTrue( allFieldsWritten.hasNext() );
+
+        //test this interface. In most cases, we won't know the field name, so we want them all
+        UniqueValue writtenFieldEntry = allFieldsWritten.next();
+        Assert.assertNotNull( writtenFieldEntry );
+
+        assertEquals( field, writtenFieldEntry.getField() );
+        assertEquals( version, writtenFieldEntry.getEntityVersion() );
+
+        assertFalse(allFieldsWritten.hasNext());
+
+
+
     }
 
 
@@ -151,11 +176,16 @@ public class UniqueValueSerializationStrategyImplTest {
 
 
         Assert.assertNull( nullExpected );
+
+
+        Iterator<UniqueValue> allFieldsWritten = strategy.getAllUniqueFields( scope, entityId );
+
+        assertFalse("No entries left",  allFieldsWritten.hasNext() );
     }
 
 
     @Test
-    public void testCaptializationFixes() throws ConnectionException {
+    public void testCapitalizationFixes() throws ConnectionException {
         CollectionScope scope =
                 new CollectionScopeImpl( new SimpleId( "organization" ), new SimpleId( "test" ), "test" );
 
@@ -195,5 +225,106 @@ public class UniqueValueSerializationStrategyImplTest {
         assertEquals( field.getName(), value.getField().getName() );
 
         assertEquals( entityId, value.getEntityId() );
+
+
+        Iterator<UniqueValue> allFieldsWritten = strategy.getAllUniqueFields( scope, entityId );
+
+        assertTrue( allFieldsWritten.hasNext() );
+
+        //test this interface. In most cases, we won't know the field name, so we want them all
+        UniqueValue writtenFieldEntry = allFieldsWritten.next();
+        Assert.assertNotNull( writtenFieldEntry );
+
+        assertEquals( field.getName(), writtenFieldEntry.getField().getName() );
+        assertEquals( field.getValue().toLowerCase(), writtenFieldEntry.getField().getValue() );
+        assertEquals( version, writtenFieldEntry.getEntityVersion() );
+
+        assertFalse(allFieldsWritten.hasNext());
+    }
+
+
+
+    @Test
+    public void twoFieldsPerVersion() throws ConnectionException, InterruptedException {
+
+        CollectionScope scope =
+                new CollectionScopeImpl( new SimpleId( "organization" ), new SimpleId( "test" ), "test" );
+
+
+
+        Id entityId = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
+        final UUID version1 = UUIDGenerator.newTimeUUID();
+
+
+        //write V1 of everything
+        IntegerField version1Field1 = new IntegerField( "count", 1 );
+        StringField version1Field2 = new StringField("field", "v1value");
+
+
+        UniqueValue version1Field1Value = new UniqueValueImpl( version1Field1, entityId, version1 );
+        UniqueValue version1Field2Value = new UniqueValueImpl( version1Field2, entityId, version1 );
+
+        final MutationBatch batch = strategy.write( scope, version1Field1Value );
+        batch.mergeShallow( strategy.write( scope, version1Field2Value ) );
+
+
+        //write V2 of everything
+        final UUID version2 = UUIDGenerator.newTimeUUID();
+
+        IntegerField version2Field1 = new IntegerField( "count", 2 );
+        StringField version2Field2 = new StringField( "field", "v2value" );
+
+
+        UniqueValue version2Field1Value = new UniqueValueImpl( version2Field1, entityId, version2 );
+        UniqueValue version2Field2Value = new UniqueValueImpl( version2Field2, entityId, version2 );
+
+        batch.mergeShallow( strategy.write( scope, version2Field1Value ) );
+        batch.mergeShallow( strategy.write( scope, version2Field2Value ) );
+
+        batch.execute();
+
+
+        UniqueValueSet fields = strategy.load( scope, Arrays.<Field>asList( version1Field1, version1Field2 ) );
+
+        UniqueValue retrieved = fields.getValue( version1Field1.getName() );
+
+        assertEquals( version1Field1Value, retrieved );
+
+
+        retrieved = fields.getValue( version1Field2.getName() );
+        assertEquals( version1Field2Value, retrieved );
+
+
+        Iterator<UniqueValue> allFieldsWritten = strategy.getAllUniqueFields( scope, entityId );
+
+        assertTrue(allFieldsWritten.hasNext());
+
+        //test this interface. In most cases, we won't know the field name, so we want them all
+        UniqueValue allFieldsValue = allFieldsWritten.next();
+
+        //version 2 fields should come first, ordered by field name
+        assertEquals( version2Field1, allFieldsValue.getField() );
+        assertEquals( version2, allFieldsValue.getEntityVersion() );
+
+        allFieldsValue = allFieldsWritten.next();
+
+        assertEquals( version2Field2, allFieldsValue.getField() );
+        assertEquals( version2, allFieldsValue.getEntityVersion() );
+
+
+        //version 1 should come next ordered by field name
+        allFieldsValue = allFieldsWritten.next();
+
+        assertEquals( version1Field1, allFieldsValue.getField() );
+        assertEquals( version1, allFieldsValue.getEntityVersion() );
+
+        allFieldsValue = allFieldsWritten.next();
+
+        assertEquals( version1Field2, allFieldsValue.getField() );
+        assertEquals( version1, allFieldsValue.getEntityVersion() );
+
+        assertFalse(allFieldsWritten.hasNext());
+
     }
+
 }