You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/10/07 21:34:40 UTC

[3/4] git commit: First pass at updating interfaces

First pass at updating interfaces


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/fd841758
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/fd841758
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/fd841758

Branch: refs/heads/two-dot-o-events
Commit: fd841758a0649474337f9522d5f21a51fb439933
Parents: 17cc01d
Author: Todd Nine <to...@apache.org>
Authored: Tue Oct 7 13:33:08 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Tue Oct 7 13:33:08 2014 -0600

----------------------------------------------------------------------
 .../collection/guice/CollectionModule.java      |   5 +-
 .../impl/EntityCollectionManagerImpl.java       |  37 ++--
 .../mvcc/event/PostProcessObserver.java         |  39 ----
 .../mvcc/stage/delete/MarkCommit.java           | 153 +++++++------
 .../mvcc/stage/write/EntityVersion.java         |  64 ------
 .../stage/write/EntityVersionSerializer.java    |  71 ------
 .../mvcc/stage/write/FieldSerializer.java       | 107 ---------
 .../mvcc/stage/write/RollbackAction.java        |   3 +
 .../mvcc/stage/write/UniqueValue.java           |  39 ----
 .../mvcc/stage/write/UniqueValueImpl.java       | 124 -----------
 .../write/UniqueValueSerializationStrategy.java |  66 ------
 .../UniqueValueSerializationStrategyImpl.java   | 194 ----------------
 .../mvcc/stage/write/WriteCommit.java           |   5 +-
 .../mvcc/stage/write/WriteOptimisticVerify.java |  10 +-
 .../mvcc/stage/write/WriteUniqueVerify.java     | 206 ++++++++---------
 .../collection/serialization/UniqueValue.java   |  55 +++++
 .../UniqueValueSerializationStrategy.java       |  68 ++++++
 .../serialization/UniqueValueSet.java           |  32 +++
 .../serialization/impl/EntityVersion.java       |  64 ++++++
 .../impl/EntityVersionSerializer.java           |  71 ++++++
 .../serialization/impl/FieldSerializer.java     | 107 +++++++++
 .../serialization/impl/SerializationModule.java |   3 +-
 .../serialization/impl/UniqueValueImpl.java     | 125 +++++++++++
 .../UniqueValueSerializationStrategyImpl.java   | 219 +++++++++++++++++++
 .../serialization/impl/UniqueValueSetImpl.java  |  85 +++++++
 .../mvcc/stage/delete/MarkCommitTest.java       |  10 +-
 .../write/EntityVersionSerializerTest.java      |   2 +
 .../mvcc/stage/write/FieldSerializerTest.java   |   1 +
 ...niqueValueSerializationStrategyImplTest.java |  50 +++--
 .../mvcc/stage/write/WriteCommitTest.java       |   1 +
 .../stage/write/WriteOptimisticVerifyTest.java  |  17 +-
 .../stage/write/WriteUniqueVerifyStageTest.java |  48 ----
 .../mvcc/stage/write/WriteUniqueVerifyTest.java |  48 +---
 33 files changed, 1101 insertions(+), 1028 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index 9d949e8..3336166 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -29,12 +29,11 @@ import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerialization
 import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLogGenerator;
 import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLogGeneratorImpl;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-import org.apache.usergrid.persistence.collection.mvcc.stage.write.UniqueValueSerializationStrategy;
-import org.apache.usergrid.persistence.collection.mvcc.stage.write.UniqueValueSerializationStrategyImpl;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueSerializationStrategyImpl;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.collection.serialization.impl.SerializationModule;
-import org.apache.usergrid.persistence.collection.service.UUIDService;
 import org.apache.usergrid.persistence.collection.service.impl.ServiceModule;
 import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl;
 import org.apache.usergrid.persistence.core.task.TaskExecutor;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 094baa6..6299acb 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -138,7 +138,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
 
 
         // create our observable and start the write
-        CollectionIoEvent<Entity> writeData = new CollectionIoEvent<Entity>( collectionScope, entity );
+        final CollectionIoEvent<Entity> writeData = new CollectionIoEvent<Entity>( collectionScope, entity );
 
         Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner( writeData,writeStart );
 
@@ -147,15 +147,10 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
         // observable = Concurrent.concurrent( observable, Schedulers.io(), new WaitZip(), 
         //                  writeVerifyUnique, writeOptimisticVerify );
 
-        observable.doOnNext( new Action1<CollectionIoEvent<MvccEntity>>() {
-            @Override
-            public void call( final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent ) {
-                //Queue future write here (verify)
-            }
-        } ).map( writeCommit ).doOnNext( new Action1<Entity>() {
+        observable.map( writeCommit ).doOnNext( new Action1<Entity>() {
             @Override
             public void call( final Entity entity ) {
-                //fork background processing here (start)
+                //TODO fire a task here
 
                 //post-processing to come later. leave it empty for now.
             }
@@ -175,7 +170,13 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
         Preconditions.checkNotNull( entityId.getType(), "Entity type is required in this stage" );
 
         return Observable.from( new CollectionIoEvent<Id>( collectionScope, entityId ) )
-                         .map( markStart ).map( markCommit );
+                         .map( markStart ).doOnNext( markCommit ).map( new Func1<CollectionIoEvent<MvccEntity>,
+                        Void>() {
+                    @Override
+                    public Void call( final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent ) {
+                        return null;
+                    }
+                } );
     }
 
 
@@ -217,7 +218,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
 
                //we an update, signal the fix
 
-                //TODO T.N Change this to use request collapsing
+                //TODO T.N Change this to fire a task
                 Observable.from( new CollectionIoEvent<Id>(collectionScope, entityId ) ).map( load ).subscribeOn( Schedulers.io() ).subscribe();
 
 
@@ -226,27 +227,29 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
     }
 
     // fire the stages
-    public Observable<CollectionIoEvent<MvccEntity>> stageRunner( CollectionIoEvent<Entity> writeData,WriteStart writeState ) {
+    public Observable<CollectionIoEvent<MvccEntity>> stageRunner( CollectionIoEvent<Entity> writeData, WriteStart writeState ) {
 
-        return Observable.from( writeData ).map( writeState ).flatMap(
-                new Func1<CollectionIoEvent<MvccEntity>, Observable<CollectionIoEvent<MvccEntity>>>() {
+        return Observable.from( writeData ).map( writeState ).doOnNext( new Action1<CollectionIoEvent<MvccEntity>>() {
 
                     @Override
-                    public Observable<CollectionIoEvent<MvccEntity>> call(
+                    public void call(
                             final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent ) {
 
                         Observable<CollectionIoEvent<MvccEntity>> unique =
                                 Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() )
-                                          .flatMap( writeVerifyUnique );
+                                          .doOnNext( writeVerifyUnique );
 
 
                         // optimistic verification
                         Observable<CollectionIoEvent<MvccEntity>> optimistic =
                                 Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() )
-                                          .map( writeOptimisticVerify );
+                                          .doOnNext( writeOptimisticVerify );
+
+
+                        //wait for both to finish
+                        Observable.merge( unique, optimistic ).toBlocking().last();
 
 
-                        return Observable.merge( unique, optimistic).last();
                     }
                 } );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/event/PostProcessObserver.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/event/PostProcessObserver.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/event/PostProcessObserver.java
deleted file mode 100644
index b06957d..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/event/PostProcessObserver.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.collection.mvcc.event;
-
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-
-
-/**
- * @author: tnine
- */
-public interface PostProcessObserver {
-
-
-    /**
-     * The entity was comitted by the MVCC system.  Post processing needs to occur
-     *
-     * @param scope The scope used in the write pipeline
-     * @param entity The entity used in the write pipeline
-     *
-     */
-    public void postCommit(CollectionScope scope,  MvccEntity entity );
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
index 4e8d6d5..61a2a36 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
@@ -19,7 +19,6 @@
 package org.apache.usergrid.persistence.collection.mvcc.stage.delete;
 
 
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
@@ -28,7 +27,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
 import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
@@ -37,31 +35,31 @@ import org.apache.usergrid.persistence.collection.mvcc.entity.MvccValidationUtil
 import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
 import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
 import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
-import org.apache.usergrid.persistence.collection.mvcc.stage.write.UniqueValue;
-import org.apache.usergrid.persistence.collection.mvcc.stage.write.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
 import org.apache.usergrid.persistence.core.rx.ObservableIterator;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.field.Field;
 
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import rx.Observable;
-import rx.functions.Func1;
+import rx.functions.Action1;
 
 
 /**
- * This phase should invoke any finalization, and mark the entity 
- * as committed in the data store before returning
+ * This phase should invoke any finalization, and mark the entity as committed in the data store before returning
  */
 @Singleton
-public class MarkCommit implements Func1<CollectionIoEvent<MvccEntity>, Void> {
+public class MarkCommit implements Action1<CollectionIoEvent<MvccEntity>> {
 
     private static final Logger LOG = LoggerFactory.getLogger( MarkCommit.class );
 
@@ -69,28 +67,29 @@ public class MarkCommit implements Func1<CollectionIoEvent<MvccEntity>, Void> {
     private final MvccEntitySerializationStrategy entityStrat;
     private final SerializationFig serializationFig;
     private final UniqueValueSerializationStrategy uniqueValueStrat;
+    private final Keyspace keyspace;
+
 
     @Inject
     public MarkCommit( final MvccLogEntrySerializationStrategy logStrat,
                        final MvccEntitySerializationStrategy entityStrat,
-                       final UniqueValueSerializationStrategy uniqueValueStrat,
-                       final SerializationFig serializationFig) {
+                       final UniqueValueSerializationStrategy uniqueValueStrat, final SerializationFig serializationFig,
+                       final Keyspace keyspace ) {
+
 
-        Preconditions.checkNotNull( 
-                logStrat, "logEntrySerializationStrategy is required" );
-        Preconditions.checkNotNull( 
-                entityStrat, "entitySerializationStrategy is required" );
+        Preconditions.checkNotNull( logStrat, "logEntrySerializationStrategy is required" );
+        Preconditions.checkNotNull( entityStrat, "entitySerializationStrategy is required" );
 
         this.logStrat = logStrat;
         this.entityStrat = entityStrat;
         this.serializationFig = serializationFig;
         this.uniqueValueStrat = uniqueValueStrat;
+        this.keyspace = keyspace;
     }
 
 
-
     @Override
-    public Void call( final CollectionIoEvent<MvccEntity> idIoEvent ) {
+    public void call( final CollectionIoEvent<MvccEntity> idIoEvent ) {
 
         final MvccEntity entity = idIoEvent.getEvent();
 
@@ -103,64 +102,86 @@ public class MarkCommit implements Func1<CollectionIoEvent<MvccEntity>, Void> {
         final CollectionScope collectionScope = idIoEvent.getEntityCollection();
 
 
-        final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version,
-                Stage.COMMITTED, MvccLogEntry.State.DELETED );
+        LOG.debug("Inserting tombstone for entity {} at version {}", entityId, version );
 
-        final MutationBatch logMutation = logStrat.write( collectionScope, startEntry );
+        final MvccLogEntry startEntry =
+                new MvccLogEntryImpl( entityId, version, Stage.COMMITTED, MvccLogEntry.State.DELETED );
+
+        final MutationBatch entityStateBatch = logStrat.write( collectionScope, startEntry );
 
         //insert a "cleared" value into the versions.  Post processing should actually delete
-        MutationBatch entityMutation = entityStrat.mark( collectionScope, entityId, version );
-
-        //merge the 2 into 1 mutation
-        logMutation.mergeShallow( entityMutation );
-
-        //set up the post processing queue
-        //delete unique fields
-        Observable<List<Field>> deleteFieldsObservable = Observable.create(new ObservableIterator<Field>("deleteColumns") {
-            @Override
-            protected Iterator<Field> getIterator() {
-                Iterator<MvccEntity> entities = entityStrat.load(collectionScope, entityId, entity.getVersion(), 1);
-                Iterator<Field> fieldIterator = Collections.emptyIterator();
-                if (entities.hasNext()) {
-                    Optional<Entity> oe = entities.next().getEntity();
-                    if (oe.isPresent()) {
-                        fieldIterator = oe.get().getFields().iterator();
-                    }
-                }
-                return fieldIterator;
-            }
-        }).buffer(serializationFig.getBufferSize())
-                .map(new Func1<List<Field>, List<Field>>() {
-                    @Override
-                    public List<Field> call(List<Field> fields) {
-                        for (Field field : fields) {
-                            try {
-                                UniqueValue value = uniqueValueStrat.load(collectionScope, field);
-                                if (value != null) {
-                                    logMutation.mergeShallow(uniqueValueStrat.delete(value));
-                                }
-                            } catch (ConnectionException ce) {
-                                LOG.error("Failed to delete Unique Value", ce);
-                            }
-                        }
-                        return fields;
-                    }
-                });
-        deleteFieldsObservable.toBlocking().firstOrDefault(null);
 
         try {
-            logMutation.execute();
+            final MutationBatch entityBatch = entityStrat.mark( collectionScope, entityId, version );
+            entityStateBatch.mergeShallow( entityBatch );
+            entityStateBatch.execute();
         }
         catch ( ConnectionException e ) {
-            LOG.error( "Failed to execute write asynchronously ", e );
-            throw new CollectionRuntimeException( entity, collectionScope,
-                    "Failed to execute write asynchronously ", e );
+            throw new RuntimeException( "Unable to mark entry as deleted" );
         }
 
-        /**
-         * We're done executing.
-         */
 
-        return null;
+        //TODO Refactor this logic into a a class that can be invoked from anywhere
+        //load every entity we have history of
+        Observable<List<MvccEntity>> deleteFieldsObservable =
+                Observable.create( new ObservableIterator<MvccEntity>( "deleteColumns" ) {
+                    @Override
+                    protected Iterator<MvccEntity> getIterator() {
+                        Iterator<MvccEntity> entities =
+                                entityStrat.load( collectionScope, entityId, entity.getVersion(), 100 );
+
+                        return entities;
+                    }
+                } )       //buffer them for efficiency
+                          .buffer( serializationFig.getBufferSize() ).doOnNext(
+
+                        new Action1<List<MvccEntity>>() {
+                            @Override
+                            public void call( final List<MvccEntity> mvccEntities ) {
+
+
+                                final MutationBatch batch = keyspace.prepareMutationBatch();
+
+                                for ( MvccEntity mvccEntity : mvccEntities ) {
+                                    if ( !mvccEntity.getEntity().isPresent() ) {
+                                        continue;
+                                    }
+
+                                    final UUID entityVersion = mvccEntity.getVersion();
+
+                                    final Entity entity = mvccEntity.getEntity().get();
+
+                                    //remove all unique fields from the index
+                                    for ( final Field field : entity.getFields() ) {
+
+                                        if(!field.isUnique()){
+                                            continue;
+                                        }
+
+                                        final UniqueValue unique = new UniqueValueImpl( collectionScope, field, entityId, entityVersion );
+
+                                        final MutationBatch deleteMutation = uniqueValueStrat.delete( unique );
+
+                                        batch.mergeShallow( deleteMutation );
+                                    }
+                                }
+
+                                try {
+                                    batch.execute();
+                                }
+                                catch ( ConnectionException e1 ) {
+                                    throw new RuntimeException( "Unable to execute " +
+                                            "unique value " +
+                                            "delete", e1 );
+                                }
+                            }
+                        }
+
+
+                                                                       );
+
+        final int removedCount = deleteFieldsObservable.count().toBlocking().last();
+
+        LOG.debug("Removed unique values for {} entities of entity {}", removedCount, entityId );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/EntityVersion.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/EntityVersion.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/EntityVersion.java
deleted file mode 100644
index d62ae87..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/EntityVersion.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.mvcc.stage.write;
-
-
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.model.entity.Id;
-
-/**
- * Combine entity ID and entity version for use as column name for UniqueValues Column Family.
- */
-class EntityVersion {
-    private final Id entityId;
-    private final UUID entityVersion;
-
-    public EntityVersion(Id id, UUID uuid) {
-        this.entityId = id;
-        this.entityVersion = uuid;
-    }
-
-    public Id getEntityId() {
-        return entityId;
-    }
-
-    public UUID getEntityVersion() {
-        return entityVersion;
-    }
-
-    public boolean equals( Object o ) {
-
-        if ( o == null || !(o instanceof EntityVersion) ) {
-            return false;
-        }
-
-        EntityVersion other = (EntityVersion)o;
-
-        if ( !other.getEntityId().equals( getEntityId() )) {
-            return false;
-        }
-
-        if ( !other.getEntityVersion().equals( getEntityVersion() )) {
-            return false;
-        }
-
-        return true;
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/EntityVersionSerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/EntityVersionSerializer.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/EntityVersionSerializer.java
deleted file mode 100644
index 86b5aff..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/EntityVersionSerializer.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.mvcc.stage.write;
-
-
-import java.nio.ByteBuffer;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.model.entity.SimpleId;
-
-import com.google.common.base.Preconditions;
-import com.netflix.astyanax.model.CompositeBuilder;
-import com.netflix.astyanax.model.Composites;
-import com.netflix.astyanax.model.DynamicComposite;
-import com.netflix.astyanax.serializers.AbstractSerializer;
-import com.netflix.astyanax.serializers.StringSerializer;
-import com.netflix.astyanax.serializers.UUIDSerializer;
-
-/**
- * Serialize EntityVersion, entity ID and version, for use a column name in Unique Values Column Family. 
- */
-public class EntityVersionSerializer extends AbstractSerializer<EntityVersion> {
-
-    private static final Logger LOG = LoggerFactory.getLogger( EntityVersionSerializer.class );
-
-    @Override
-    public ByteBuffer toByteBuffer(final EntityVersion ev) {
-
-        CompositeBuilder builder = Composites.newDynamicCompositeBuilder();
-
-        builder.addTimeUUID( ev.getEntityVersion() );
-        builder.addTimeUUID( ev.getEntityId().getUuid() );
-        builder.addString( ev.getEntityId().getType() );
-
-        return builder.build();
-    }
-
-    @Override
-    public EntityVersion fromByteBuffer(final ByteBuffer byteBuffer) {
-
-        // would use Composites.newDynamicCompositeParser(byteBuffer) but it is not implemented
-
-        DynamicComposite composite = DynamicComposite.fromByteBuffer(byteBuffer);
-        Preconditions.checkArgument(composite.size() == 3, "Composite should have 3 elements");
-
-        final UUID version      = composite.get( 0, UUIDSerializer.get() );
-        final UUID entityId     = composite.get( 1, UUIDSerializer.get() );
-        final String entityType = composite.get( 2, StringSerializer.get() );
-        
-        return new EntityVersion( new SimpleId( entityId, entityType ), version);
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/FieldSerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/FieldSerializer.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/FieldSerializer.java
deleted file mode 100644
index 3719e3e..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/FieldSerializer.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.mvcc.stage.write;
-
-
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.commons.lang3.StringUtils;
-
-import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
-import org.apache.usergrid.persistence.model.field.DoubleField;
-import org.apache.usergrid.persistence.model.field.Field;
-import org.apache.usergrid.persistence.model.field.IntegerField;
-import org.apache.usergrid.persistence.model.field.LongField;
-import org.apache.usergrid.persistence.model.field.StringField;
-import org.apache.usergrid.persistence.model.field.UUIDField;
-
-import com.netflix.astyanax.model.CompositeBuilder;
-import com.netflix.astyanax.model.CompositeParser;
-
-// TODO: replace with "real" serializer
-
-/**
- * Serialize Field for use as part of row-key in Unique Values Column Family.
- */
-public class FieldSerializer implements CompositeFieldSerializer<Field> {
-
-    private static final Logger LOG = LoggerFactory.getLogger( FieldSerializer.class );
-
-    public enum FieldType {
-        BOOLEAN_FIELD,
-        DOUBLE_FIELD,
-        INTEGER_FIELD,
-        LONG_FIELD,
-        STRING_FIELD,
-        UUID_FIELD
-    };
-
-    private static final FieldSerializer INSTANCE = new FieldSerializer();
-
-    @Override
-    public void toComposite( final CompositeBuilder builder, final Field field ) {
-
-        builder.addString( field.getName() );
-
-        // TODO: use the real field value serializer(s) here? Store hash instead?
-        builder.addString( field.getValue().toString() );
-         
-        String simpleName = field.getClass().getSimpleName();
-        int nameIndex = simpleName.lastIndexOf(".");
-        String fieldType = simpleName.substring(nameIndex + 1, simpleName.length() - "Field".length());
-        fieldType = StringUtils.upperCase( fieldType + "_FIELD" );
-
-        builder.addString( fieldType );
-    }
-
-    @Override
-    public Field fromComposite( final CompositeParser composite ) {
-
-        final String name = composite.readString();
-        final String value = composite.readString();
-        final String typeString = composite.readString();
-
-        final FieldType fieldType = FieldType.valueOf( typeString );
-
-        switch (fieldType) {
-            case DOUBLE_FIELD: 
-                return new DoubleField(name, Double.parseDouble(value));
-            case INTEGER_FIELD: 
-                return new IntegerField(name, Integer.parseInt(value));
-            case LONG_FIELD: 
-                return new LongField(name, Long.parseLong(value));
-            case STRING_FIELD: 
-                return new StringField(name, value);
-            case UUID_FIELD: 
-                return new UUIDField(name, UUID.fromString(value));
-            default:
-                throw new RuntimeException("Unknown unique field type");
-        }
-    }
-
-
-    /**
-     * Get the singleton serializer
-     */
-    public static FieldSerializer get() {
-        return INSTANCE;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java
index 7448816..dfccb34 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java
@@ -25,6 +25,9 @@ import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
 import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.field.Field;
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValue.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValue.java
deleted file mode 100644
index de1594a..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValue.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.mvcc.stage.write;
-
-
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.field.Field;
-
-/**
- * Represents a Unique Value of a field within a collection.
- */
-public interface UniqueValue {
-
-    public CollectionScope getCollectionScope();
-
-    public Id getEntityId();
-
-    public Field getField();
-
-    public UUID getEntityVersion();
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueImpl.java
deleted file mode 100644
index 7c86491..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueImpl.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.mvcc.stage.write;
-
-
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.field.Field;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Represents a Unique Value of a field within a collection.
- */
-public class UniqueValueImpl implements UniqueValue {
-    private final CollectionScope collectionScope;
-    private final Field field;
-    private final Id entityId;
-    private final UUID entityVersion;
-
-    public UniqueValueImpl(
-            final CollectionScope scope, final Field field, Id entityId, final UUID version ) {
-
-        Preconditions.checkNotNull( scope, "scope is required" );
-        Preconditions.checkNotNull( field, "field is required" );
-//        Preconditions.checkNotNull( version, "version is required" );
-        Preconditions.checkNotNull( entityId, "entityId is required" );
-
-        this.collectionScope = scope;
-        this.field = field;
-        this.entityVersion = version;
-        this.entityId = entityId;
-    }
-
-    @Override
-    public CollectionScope getCollectionScope() {
-        return collectionScope;
-    }
-
-    @Override
-    public Field getField() {
-        return field;
-    }
-
-    @Override
-    public UUID getEntityVersion() {
-        return entityVersion;
-    }
-
-    @Override
-    public Id getEntityId() {
-        return entityId;
-    }
-
-    
-    @Override
-    public boolean equals( final Object o ) {
-        if ( this == o ) {
-            return true;
-        }
-        if ( o == null || getClass() != o.getClass() ) {
-            return false;
-        }
-
-        final UniqueValueImpl that = ( UniqueValueImpl ) o;
-
-        if ( !getCollectionScope().equals( that.getCollectionScope() ) ) {
-            return false;
-        }
-
-        if ( !getField().equals( that.getField()) ) {
-            return false;
-        }
-
-        if ( !getEntityVersion().equals( that.getEntityVersion() ) ) {
-            return false;
-        }
-
-        if ( !getEntityId().equals( that.getEntityId() ) ) {
-            return false;
-        }
-
-        return true;
-    }
-
-
-    @Override
-    public int hashCode() {
-        int result = 31 * getCollectionScope().hashCode();
-        result = 31 * result + getField().hashCode();
-        result = 31 * result + getEntityVersion().hashCode();
-        result = 31 * result + getEntityId().hashCode();
-        return result;
-    }
-
-
-    @Override
-    public String toString() {
-        return "UniqueValueImpl{" +
-                ", collectionScope =" + collectionScope.getName() +
-                ", field =" + field +
-                ", entityVersion=" + entityVersion +
-                ", entityId =" + entityId +
-                '}';
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueSerializationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueSerializationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueSerializationStrategy.java
deleted file mode 100644
index 9773644..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueSerializationStrategy.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.mvcc.stage.write;
-
-
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.model.field.Field;
-
-
-/**
- * Reads and writes to UniqueValues column family.
- */
-public interface UniqueValueSerializationStrategy {
-
-    /**
-     * Write the specified UniqueValue to Cassandra with optional timeToLive in milliseconds.
-     * 
-     * @param uniqueValue Object to be written
-     * @return MutatationBatch that encapsulates operation, caller may or may not execute.
-     */
-    public MutationBatch write( UniqueValue uniqueValue );
-
-    /**
-     * Write the specified UniqueValue to Cassandra with optional timeToLive in milliseconds.
-     * 
-     * @param uniqueValue Object to be written
-     * @param timeToLive How long object should live in seconds 
-     * @return MutatationBatch that encapsulates operation, caller may or may not execute.
-     */
-    public MutationBatch write( UniqueValue uniqueValue, Integer timeToLive );
-
-    /**
-     * Load UniqueValue that matches field from collection or null if that value does not exist.
-     * 
-     * @param colScope Collection scope in which to look for field name/value
-     * @param field Field name/value to search for
-     * @return UniqueValue or null if not found
-     * @throws ConnectionException on error connecting to Cassandra
-     */
-    public UniqueValue load( CollectionScope colScope, Field field ) throws ConnectionException;
-
-    /**
-     * Delete the specified Unique Value from Cassandra.
-     * 
-     * @param uniqueValue Object to be deleted.
-     * @return MutatationBatch that encapsulates operation, caller may or may not execute.
-     */
-    public MutationBatch delete( UniqueValue uniqueValue );
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueSerializationStrategyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueSerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueSerializationStrategyImpl.java
deleted file mode 100644
index b7e113e..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueSerializationStrategyImpl.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.mvcc.stage.write;
-
-
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-import com.netflix.astyanax.ColumnListMutation;
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
-import com.netflix.astyanax.model.ColumnList;
-import com.netflix.astyanax.util.RangeBuilder;
-import java.util.Collections;
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.serialization.impl.CollectionScopedRowKeySerializer;
-import org.apache.usergrid.persistence.core.astyanax.ColumnTypes;
-import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
-import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
-import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
-import org.apache.usergrid.persistence.core.migration.Migration;
-import org.apache.usergrid.persistence.model.field.Field;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Reads and writes to UniqueValues column family.
- */
-public class UniqueValueSerializationStrategyImpl 
-    implements UniqueValueSerializationStrategy, Migration {
-
-    private static final Logger log = 
-            LoggerFactory.getLogger( UniqueValueSerializationStrategyImpl.class );
-
-    // TODO: use "real" field serializer here instead once it is ready
-    private static final CollectionScopedRowKeySerializer<Field> ROW_KEY_SER =
-            new CollectionScopedRowKeySerializer<Field>( FieldSerializer.get() );
-
-    private static final EntityVersionSerializer ENTITY_VERSION_SER = new EntityVersionSerializer();
-
-    private static final MultiTennantColumnFamily<CollectionScope, Field, EntityVersion>
-        CF_UNIQUE_VALUES =
-            new MultiTennantColumnFamily<CollectionScope, Field, EntityVersion>( "Unique_Values",
-                ROW_KEY_SER,
-                ENTITY_VERSION_SER );
-
-    protected final Keyspace keyspace;
-
-
-    /**
-     * Construct serialization strategy for keyspace.
-     * @param keyspace Keyspace in which to store Unique Values.
-     */
-    @Inject
-    public UniqueValueSerializationStrategyImpl( final Keyspace keyspace ) {
-        this.keyspace = keyspace;
-    }
-
-
-    @Override
-    public java.util.Collection getColumnFamilies() {
-
-        MultiTennantColumnFamilyDefinition cf = new MultiTennantColumnFamilyDefinition(
-                CF_UNIQUE_VALUES,
-                BytesType.class.getSimpleName(),
-                ColumnTypes.DYNAMIC_COMPOSITE_TYPE,
-                BytesType.class.getSimpleName(), MultiTennantColumnFamilyDefinition.CacheOption.KEYS );
-
-        return Collections.singleton( cf );
-    }
-
-
-    public MutationBatch write( UniqueValue uniqueValue ) {
-        return write( uniqueValue, Integer.MAX_VALUE );
-    }
-
-
-    @Override
-    public MutationBatch write( UniqueValue value, Integer timeToLive ) {
-
-        Preconditions.checkNotNull( value, "value is required" );
-        Preconditions.checkNotNull( timeToLive, "timeToLive is required" );
-
-        log.debug("Writing unique value scope={} id={} version={} name={} value={} ttl={} ",
-            new Object[] { 
-                value.getCollectionScope().getName(),
-                value.getEntityId(),
-                value.getEntityVersion(),
-                value.getField().getName(),
-                value.getField().getValue(),
-                timeToLive
-         });
-
-        final EntityVersion ev = new EntityVersion( value.getEntityId(), value.getEntityVersion() );
-
-        final Integer ttl;
-        if ( timeToLive.equals( Integer.MAX_VALUE )) {
-            ttl = null;
-        } else {
-            ttl = timeToLive;
-        }
-
-        return doWrite( value.getCollectionScope(), value.getField(),
-            new UniqueValueSerializationStrategyImpl.RowOp() {
-
-            @Override
-            public void doOp( final ColumnListMutation<EntityVersion> colMutation ) {
-                colMutation.putColumn( ev, 0x0, ttl );
-            }
-        } );
-    }
-
-
-    @Override
-    public MutationBatch delete(UniqueValue value) {
-
-        Preconditions.checkNotNull( value, "value is required" );
-
-        final EntityVersion ev = new EntityVersion( value.getEntityId(), value.getEntityVersion() );
-
-        return doWrite( value.getCollectionScope(), value.getField(),
-            new UniqueValueSerializationStrategyImpl.RowOp() {
-
-            @Override
-            public void doOp( final ColumnListMutation<EntityVersion> colMutation ) {
-                colMutation.deleteColumn(ev);
-            }
-        } );
-    }
-
-
-    /**
-     * Do the column update or delete for the given column and row key
-     * @param context We need to use this when getting the keyspace
-     */
-    private MutationBatch doWrite( CollectionScope context, Field field, RowOp op ) {
-        final MutationBatch batch = keyspace.prepareMutationBatch();
-        op.doOp( batch.withRow( CF_UNIQUE_VALUES, ScopedRowKey.fromKey( context, field ) ) );
-        return batch;
-    }
-
-
-    @Override
-    public UniqueValue load( CollectionScope colScope, Field field ) throws ConnectionException {
-
-        Preconditions.checkNotNull( field, "field is required" );
-
-        ColumnList<EntityVersion> result;
-        try {
-            result = keyspace.prepareQuery( CF_UNIQUE_VALUES )
-                .getKey( ScopedRowKey.fromKey( colScope, field ) )
-                .withColumnRange(new RangeBuilder().setLimit(1).build())
-                .execute()
-                .getResult();
-        }
-        catch ( NotFoundException nfe ) {
-            return null;
-        }
-
-        if ( result.isEmpty() ) {
-            return null;
-        }
-
-        EntityVersion ev = result.getColumnByIndex(0).getName();
-
-        return new UniqueValueImpl( colScope, field, ev.getEntityId(), ev.getEntityVersion() );
-    }
-
-
-    /**
-     * Simple callback to perform puts and deletes with a common row setup code
-     */
-    private static interface RowOp {
-        void doOp( ColumnListMutation<EntityVersion> colMutation );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
index 0004ddb..49e967f 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
@@ -33,6 +33,9 @@ import org.apache.usergrid.persistence.collection.mvcc.entity.MvccValidationUtil
 import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
 import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
 import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.collection.util.EntityUtils;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.model.entity.Entity;
@@ -110,7 +113,7 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Entity>
 
             if ( field.isUnique() ) {
 
-                UniqueValue written  = new UniqueValueImpl( ioEvent.getEntityCollection(), field, 
+                UniqueValue written  = new UniqueValueImpl( ioEvent.getEntityCollection(), field,
                     mvccEntity.getEntity().get().getId(), mvccEntity.getEntity().get().getVersion());
                 MutationBatch mb = uniqueValueStrat.write( written );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java
index b5e5873..dcdb408 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java
@@ -38,6 +38,7 @@ import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
+import rx.functions.Action1;
 import rx.functions.Func1;
 
 
@@ -46,7 +47,7 @@ import rx.functions.Func1;
  */
 @Singleton
 public class WriteOptimisticVerify 
-    implements Func1<CollectionIoEvent<MvccEntity>, CollectionIoEvent<MvccEntity>> {
+    implements Action1<CollectionIoEvent<MvccEntity>> {
 
     private static final Logger log = LoggerFactory.getLogger( WriteOptimisticVerify.class );
 
@@ -59,7 +60,7 @@ public class WriteOptimisticVerify
 
 
     @Override
-    public CollectionIoEvent<MvccEntity> call( final CollectionIoEvent<MvccEntity> ioevent ) {
+    public void call( final CollectionIoEvent<MvccEntity> ioevent ) {
         MvccValidationUtils.verifyMvccEntityWithEntity( ioevent.getEvent() );
 
         // If the version was included on the entity write operation (delete or write) we need
@@ -74,7 +75,7 @@ public class WriteOptimisticVerify
         CollectionScope collectionScope = ioevent.getEntityCollection();
 
         if(entity.getVersion() == null){
-            return ioevent;
+            return;
         }
 
         try {
@@ -98,8 +99,7 @@ public class WriteOptimisticVerify
                 "Error reading entity log", e );
         }
 
-        // No op, just emit the value
-        return ioevent;
+
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
index 713703f..e6d37fc 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
@@ -18,36 +18,46 @@
 package org.apache.usergrid.persistence.collection.mvcc.stage.write;
 
 
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccValidationUtils;
 import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
+import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
 import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.field.Field;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
 import rx.Observable;
-import rx.functions.Func1;
-import rx.functions.FuncN;
-import rx.schedulers.Schedulers;
+import rx.functions.Action1;
 
 
 /**
  * This phase execute all unique value verification on the MvccEntity.
  */
 @Singleton
-public class WriteUniqueVerify implements 
-        Func1<CollectionIoEvent<MvccEntity>, Observable<? extends CollectionIoEvent<MvccEntity>>> {
+public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>> {
 
     private static final Logger LOG = LoggerFactory.getLogger( WriteUniqueVerify.class );
 
@@ -55,16 +65,16 @@ public class WriteUniqueVerify implements
 
     protected final SerializationFig serializationFig;
 
+    protected final Keyspace keyspace;
+
 
     @Inject
-    public WriteUniqueVerify( 
-            final UniqueValueSerializationStrategy uniqueValueSerializiationStrategy, 
-            final SerializationFig serializationFig ) {
+    public WriteUniqueVerify( final UniqueValueSerializationStrategy uniqueValueSerializiationStrategy,
+                              final SerializationFig serializationFig, final Keyspace keyspace ) {
+        this.keyspace = keyspace;
 
-        Preconditions.checkNotNull( uniqueValueSerializiationStrategy, 
-                "uniqueValueSerializationStrategy is required" );
-        Preconditions.checkNotNull( serializationFig, 
-                "serializationFig is required" );
+        Preconditions.checkNotNull( uniqueValueSerializiationStrategy, "uniqueValueSerializationStrategy is required" );
+        Preconditions.checkNotNull( serializationFig, "serializationFig is required" );
 
         this.uniqueValueStrat = uniqueValueSerializiationStrategy;
         this.serializationFig = serializationFig;
@@ -72,8 +82,7 @@ public class WriteUniqueVerify implements
 
 
     @Override
-    public Observable<? extends CollectionIoEvent<MvccEntity>> 
-        call(final CollectionIoEvent<MvccEntity> ioevent ) {
+    public void call( final CollectionIoEvent<MvccEntity> ioevent ) {
 
         MvccValidationUtils.verifyMvccEntityWithEntity( ioevent.getEvent() );
 
@@ -81,131 +90,100 @@ public class WriteUniqueVerify implements
 
         final Entity entity = mvccEntity.getEntity().get();
 
-        // use simple thread pool to verify fields in parallel
+        final Id entityId = entity.getId();
+
+        final UUID entityVersion = entity.getVersion();
 
-        // We want to use concurrent to fork all validations this way they're wrapped by timeouts 
-        // and Hystrix thread pools for JMX operations.  See the WriteCommand in the 
-        // EntityCollectionManagerImpl. 
+        final CollectionScope scope = ioevent.getEntityCollection();
+
+        // use simple thread pool to verify fields in parallel
 
-        // TODO: still needs to be added to the Concurrent utility class?
+        final Collection<Field> entityFields = entity.getFields();
 
-        final List<Observable<FieldUniquenessResult>> fields =
-                new ArrayList<Observable<FieldUniquenessResult>>();
+        //allocate our max size, worst case
+        final List<Field> uniqueFields = new ArrayList<>( entityFields.size() );
 
+        final MutationBatch batch = keyspace.prepareMutationBatch();
         //
         // Construct all the functions for verifying we're unique
         //
-        for ( final Field field : entity.getFields() ) {
+        for ( final Field field : entityFields ) {
 
             // if it's unique, create a function to validate it and add it to the list of 
             // concurrent validations
             if ( field.isUnique() ) {
 
-                Observable<FieldUniquenessResult> result =  Observable.from( field )
-                        .subscribeOn( Schedulers.io() )
-                        .map(new Func1<Field,  FieldUniquenessResult>() {
-
-                    @Override
-                    public FieldUniquenessResult call(Field field ) {
-
-                        // use write-first then read strategy
-                        UniqueValue written = new UniqueValueImpl( 
-                            ioevent.getEntityCollection(), field, entity.getId(), mvccEntity.getVersion() );
-
-                        // use TTL in case something goes wrong before entity is finally committed
-                        MutationBatch mb = uniqueValueStrat.write( written, serializationFig.getTimeout() );
-
-                        try {
-                            mb.execute();
-                        }
-                        catch ( ConnectionException ex ) {
-                            throw new RuntimeException("Unable to write to cassandra", ex );
-                        }
-
-                        // does the database value match what we wrote?
-                        UniqueValue loaded;
-                        try {
-                            loaded = uniqueValueStrat.load( ioevent.getEntityCollection(), field );
-                        }
-                        catch ( ConnectionException ex ) {
-                            throw new RuntimeException("Unable to read from cassandra", ex );
-                        }
-
-                        return new FieldUniquenessResult( 
-                            field, loaded.getEntityId().equals( written.getEntityId() ) );
-                    }
-                } );
-
-                fields.add(result);
+
+                // use write-first then read strategy
+                final UniqueValue written = new UniqueValueImpl( scope, field, entityId, entityVersion );
+
+                // use TTL in case something goes wrong before entity is finally committed
+                final MutationBatch mb = uniqueValueStrat.write( written, serializationFig.getTimeout() );
+
+                batch.mergeShallow( mb );
+
+
+                uniqueFields.add( field );
             }
         }
 
-        //short circuit.  If we zip up nothing, we block forever.
-        if(fields.size() == 0){
-            return Observable.from(ioevent );
+        //short circuit nothing to do
+        if ( uniqueFields.size() == 0 ) {
+            return;
         }
 
-        //
-        // Zip the results up
-        //
-        final FuncN<CollectionIoEvent<MvccEntity>> zipFunction = 
-                new FuncN<CollectionIoEvent<MvccEntity>>() {
-            
-            @Override
-            public CollectionIoEvent<MvccEntity> call( final Object... args ) {
-
-                Map<String, Field> uniquenessVioloations = new HashMap<String, Field>();
-
-                for ( Object resultObj : args ) {
-                    FieldUniquenessResult result = ( FieldUniquenessResult ) resultObj;
-                    if ( !result.isUnique() ) {
-                        Field field = result.getField();
-                        uniquenessVioloations.put( field.getName(), field );
-                    }
-                }
-
-                if ( !uniquenessVioloations.isEmpty() ) {
-                    throw new WriteUniqueVerifyException( 
-                            mvccEntity, ioevent.getEntityCollection(), uniquenessVioloations );
-                }
-                    
-                //return the original event
-                return ioevent;
-            }
-        };
 
-        return Observable.zip( fields, zipFunction );
-    }
+        //perform the write
+        try {
+            batch.execute();
+        }
+        catch ( ConnectionException ex ) {
+            throw new RuntimeException( "Unable to write to cassandra", ex );
+        }
 
 
-    static class FieldUniquenessResult {
-        private Field field;
-        private Boolean unique;
 
+        //now get the set of fields back
+        final UniqueValueSet uniqueValues;
 
-        public FieldUniquenessResult( Field f, Boolean u ) {
-            this.field = f;
-            this.unique = u;
+        try {
+            uniqueValues = uniqueValueStrat.load( scope, uniqueFields );
+        }
+        catch ( ConnectionException e ) {
+            throw new RuntimeException( "Unable to read from cassandra", e );
         }
 
 
-        public Boolean isUnique() {
-            return unique;
-        }
+        final Map<String, Field> uniquenessViolations = new HashMap<>( uniqueFields.size());
 
 
-        public void setUnique( Boolean isUnique ) {
-            this.unique = isUnique;
-        }
+        //loop through each field that was unique
+        for ( final Field field : uniqueFields ) {
+
+            final UniqueValue uniqueValue = uniqueValues.getValue( field.getName() );
+
+            if ( uniqueValue == null ) {
+                throw new RuntimeException(
+                        String.format( "Could not retrieve unique value for field %s, unable to verify",
+                                field.getName() ) );
+            }
+
 
+            final Id returnedEntityId = uniqueValue.getEntityId();
 
-        public Field getField() {
-            return field;
+
+            if ( !entityId.equals( returnedEntityId ) ) {
+                uniquenessViolations.put( field.getName(), field );
+            }
         }
 
 
-        public void setField( Field field ) {
-            this.field = field;
+        //We have violations, throw an exception
+        if ( !uniquenessViolations.isEmpty() ) {
+            throw new WriteUniqueVerifyException( mvccEntity, ioevent.getEntityCollection(), uniquenessViolations );
         }
+
+
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValue.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValue.java
new file mode 100644
index 0000000..9749101
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValue.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.collection.serialization;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.field.Field;
+
+/**
+ * Represents a Unique Value of a field within a collection.
+ */
+public interface UniqueValue {
+
+    /**
+     * The scope of this value
+     * @return
+     */
+    public CollectionScope getCollectionScope();
+
+    /**
+     * The entity Id that owns this value
+     * @return
+     */
+    public Id getEntityId();
+
+    /**
+     * The field value
+     * @return
+     */
+    public Field getField();
+
+    /**
+     * The version of the entity that owns this value
+     * @return
+     */
+    public UUID getEntityVersion();
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
new file mode 100644
index 0000000..efcc60d
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.collection.serialization;
+
+
+import java.util.Collection;
+
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.model.field.Field;
+
+
+/**
+ * Reads and writes to UniqueValues column family.
+ */
+public interface UniqueValueSerializationStrategy {
+
+    /**
+     * Write the specified UniqueValue to Cassandra with optional timeToLive in milliseconds.
+     * 
+     * @param uniqueValue Object to be written
+     * @return MutatationBatch that encapsulates operation, caller may or may not execute.
+     */
+    public MutationBatch write( UniqueValue uniqueValue );
+
+    /**
+     * Write the specified UniqueValue to Cassandra with optional timeToLive in milliseconds.
+     * 
+     * @param uniqueValue Object to be written
+     * @param timeToLive How long object should live in seconds 
+     * @return MutatationBatch that encapsulates operation, caller may or may not execute.
+     */
+    public MutationBatch write( UniqueValue uniqueValue, Integer timeToLive );
+
+    /**
+     * Load UniqueValue that matches field from collection or null if that value does not exist.
+     * 
+     * @param colScope Collection scope in which to look for field name/value
+     * @param fields Field name/value to search for
+     * @return UniqueValueSet containing fields from the collection that exist in cassandra
+     * @throws ConnectionException on error connecting to Cassandra
+     */
+    public UniqueValueSet load( CollectionScope colScope, Collection<Field> fields ) throws ConnectionException;
+
+    /**
+     * Delete the specified Unique Value from Cassandra.
+     * 
+     * @param uniqueValue Object to be deleted.
+     * @return MutatationBatch that encapsulates operation, caller may or may not execute.
+     */
+    public MutationBatch delete( UniqueValue uniqueValue );
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSet.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSet.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSet.java
new file mode 100644
index 0000000..5436a11
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSet.java
@@ -0,0 +1,32 @@
+package org.apache.usergrid.persistence.collection.serialization;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+/**
+ * A read only view of unique values
+ */
+public interface UniqueValueSet extends Iterable<UniqueValue> {
+
+    /**
+     * Get the unique value for the field
+     * @param fieldName
+     * @return
+     */
+    public UniqueValue getValue(final String fieldName);
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityVersion.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityVersion.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityVersion.java
new file mode 100644
index 0000000..274cf5d
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityVersion.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.model.entity.Id;
+
+/**
+ * Combine entity ID and entity version for use as column name for UniqueValues Column Family.
+ */
+public class EntityVersion {
+    private final Id entityId;
+    private final UUID entityVersion;
+
+    public EntityVersion(Id id, UUID uuid) {
+        this.entityId = id;
+        this.entityVersion = uuid;
+    }
+
+    public Id getEntityId() {
+        return entityId;
+    }
+
+    public UUID getEntityVersion() {
+        return entityVersion;
+    }
+
+    public boolean equals( Object o ) {
+
+        if ( o == null || !(o instanceof EntityVersion) ) {
+            return false;
+        }
+
+        EntityVersion other = (EntityVersion)o;
+
+        if ( !other.getEntityId().equals( getEntityId() )) {
+            return false;
+        }
+
+        if ( !other.getEntityVersion().equals( getEntityVersion() )) {
+            return false;
+        }
+
+        return true;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityVersionSerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityVersionSerializer.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityVersionSerializer.java
new file mode 100644
index 0000000..810a1fc
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityVersionSerializer.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+
+import com.google.common.base.Preconditions;
+import com.netflix.astyanax.model.CompositeBuilder;
+import com.netflix.astyanax.model.Composites;
+import com.netflix.astyanax.model.DynamicComposite;
+import com.netflix.astyanax.serializers.AbstractSerializer;
+import com.netflix.astyanax.serializers.StringSerializer;
+import com.netflix.astyanax.serializers.UUIDSerializer;
+
+/**
+ * Serialize EntityVersion, entity ID and version, for use a column name in Unique Values Column Family. 
+ */
+public class EntityVersionSerializer extends AbstractSerializer<EntityVersion> {
+
+    private static final Logger LOG = LoggerFactory.getLogger( EntityVersionSerializer.class );
+
+    @Override
+    public ByteBuffer toByteBuffer(final EntityVersion ev) {
+
+        CompositeBuilder builder = Composites.newDynamicCompositeBuilder();
+
+        builder.addTimeUUID( ev.getEntityVersion() );
+        builder.addTimeUUID( ev.getEntityId().getUuid() );
+        builder.addString( ev.getEntityId().getType() );
+
+        return builder.build();
+    }
+
+    @Override
+    public EntityVersion fromByteBuffer(final ByteBuffer byteBuffer) {
+
+        // would use Composites.newDynamicCompositeParser(byteBuffer) but it is not implemented
+
+        DynamicComposite composite = DynamicComposite.fromByteBuffer(byteBuffer);
+        Preconditions.checkArgument(composite.size() == 3, "Composite should have 3 elements");
+
+        final UUID version      = composite.get( 0, UUIDSerializer.get() );
+        final UUID entityId     = composite.get( 1, UUIDSerializer.get() );
+        final String entityType = composite.get( 2, StringSerializer.get() );
+        
+        return new EntityVersion( new SimpleId( entityId, entityType ), version);
+    }
+    
+}