You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/10/07 21:34:39 UTC

[2/4] First pass at updating interfaces

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/FieldSerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/FieldSerializer.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/FieldSerializer.java
new file mode 100644
index 0000000..292f550
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/FieldSerializer.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
+import org.apache.usergrid.persistence.model.field.DoubleField;
+import org.apache.usergrid.persistence.model.field.Field;
+import org.apache.usergrid.persistence.model.field.IntegerField;
+import org.apache.usergrid.persistence.model.field.LongField;
+import org.apache.usergrid.persistence.model.field.StringField;
+import org.apache.usergrid.persistence.model.field.UUIDField;
+
+import com.netflix.astyanax.model.CompositeBuilder;
+import com.netflix.astyanax.model.CompositeParser;
+
+// TODO: replace with "real" serializer
+
+/**
+ * Serialize Field for use as part of row-key in Unique Values Column Family.
+ */
+public class FieldSerializer implements CompositeFieldSerializer<Field> {
+
+    private static final Logger LOG = LoggerFactory.getLogger( FieldSerializer.class );
+
+    public enum FieldType {
+        BOOLEAN_FIELD,
+        DOUBLE_FIELD,
+        INTEGER_FIELD,
+        LONG_FIELD,
+        STRING_FIELD,
+        UUID_FIELD
+    };
+
+    private static final FieldSerializer INSTANCE = new FieldSerializer();
+
+    @Override
+    public void toComposite( final CompositeBuilder builder, final Field field ) {
+
+        builder.addString( field.getName() );
+
+        // TODO: use the real field value serializer(s) here? Store hash instead?
+        builder.addString( field.getValue().toString() );
+         
+        String simpleName = field.getClass().getSimpleName();
+        int nameIndex = simpleName.lastIndexOf(".");
+        String fieldType = simpleName.substring(nameIndex + 1, simpleName.length() - "Field".length());
+        fieldType = StringUtils.upperCase( fieldType + "_FIELD" );
+
+        builder.addString( fieldType );
+    }
+
+    @Override
+    public Field fromComposite( final CompositeParser composite ) {
+
+        final String name = composite.readString();
+        final String value = composite.readString();
+        final String typeString = composite.readString();
+
+        final FieldType fieldType = FieldType.valueOf( typeString );
+
+        switch (fieldType) {
+            case DOUBLE_FIELD: 
+                return new DoubleField(name, Double.parseDouble(value));
+            case INTEGER_FIELD: 
+                return new IntegerField(name, Integer.parseInt(value));
+            case LONG_FIELD: 
+                return new LongField(name, Long.parseLong(value));
+            case STRING_FIELD: 
+                return new StringField(name, value);
+            case UUID_FIELD: 
+                return new UUIDField(name, UUID.fromString(value));
+            default:
+                throw new RuntimeException("Unknown unique field type");
+        }
+    }
+
+
+    /**
+     * Get the singleton serializer
+     */
+    public static FieldSerializer get() {
+        return INSTANCE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
index 4d930f2..89012aa 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
@@ -20,8 +20,7 @@ package org.apache.usergrid.persistence.collection.serialization.impl;
 
 import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
-import org.apache.usergrid.persistence.collection.mvcc.stage.write.UniqueValueSerializationStrategy;
-import org.apache.usergrid.persistence.collection.mvcc.stage.write.UniqueValueSerializationStrategyImpl;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.core.migration.Migration;
 
 import com.google.inject.AbstractModule;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueImpl.java
new file mode 100644
index 0000000..2fdae1a
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueImpl.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.field.Field;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Represents a Unique Value of a field within a collection.
+ */
+public class UniqueValueImpl implements UniqueValue {
+    private final CollectionScope collectionScope;
+    private final Field field;
+    private final Id entityId;
+    private final UUID entityVersion;
+
+    public UniqueValueImpl(
+            final CollectionScope scope, final Field field, Id entityId, final UUID version ) {
+
+        Preconditions.checkNotNull( scope, "scope is required" );
+        Preconditions.checkNotNull( field, "field is required" );
+//        Preconditions.checkNotNull( version, "version is required" );
+        Preconditions.checkNotNull( entityId, "entityId is required" );
+
+        this.collectionScope = scope;
+        this.field = field;
+        this.entityVersion = version;
+        this.entityId = entityId;
+    }
+
+    @Override
+    public CollectionScope getCollectionScope() {
+        return collectionScope;
+    }
+
+    @Override
+    public Field getField() {
+        return field;
+    }
+
+    @Override
+    public UUID getEntityVersion() {
+        return entityVersion;
+    }
+
+    @Override
+    public Id getEntityId() {
+        return entityId;
+    }
+
+    
+    @Override
+    public boolean equals( final Object o ) {
+        if ( this == o ) {
+            return true;
+        }
+        if ( o == null || getClass() != o.getClass() ) {
+            return false;
+        }
+
+        final UniqueValueImpl that = ( UniqueValueImpl ) o;
+
+        if ( !getCollectionScope().equals( that.getCollectionScope() ) ) {
+            return false;
+        }
+
+        if ( !getField().equals( that.getField()) ) {
+            return false;
+        }
+
+        if ( !getEntityVersion().equals( that.getEntityVersion() ) ) {
+            return false;
+        }
+
+        if ( !getEntityId().equals( that.getEntityId() ) ) {
+            return false;
+        }
+
+        return true;
+    }
+
+
+    @Override
+    public int hashCode() {
+        int result = 31 * getCollectionScope().hashCode();
+        result = 31 * result + getField().hashCode();
+        result = 31 * result + getEntityVersion().hashCode();
+        result = 31 * result + getEntityId().hashCode();
+        return result;
+    }
+
+
+    @Override
+    public String toString() {
+        return "UniqueValueImpl{" +
+                ", collectionScope =" + collectionScope.getName() +
+                ", field =" + field +
+                ", entityVersion=" + entityVersion +
+                ", entityId =" + entityId +
+                '}';
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
new file mode 100644
index 0000000..d480691
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.marshal.BytesType;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
+import org.apache.usergrid.persistence.core.astyanax.ColumnTypes;
+import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
+import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
+import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.migration.Migration;
+import org.apache.usergrid.persistence.model.field.Field;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.netflix.astyanax.ColumnListMutation;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import com.netflix.astyanax.model.ColumnList;
+import com.netflix.astyanax.model.Row;
+import com.netflix.astyanax.util.RangeBuilder;
+
+
+/**
+ * Reads and writes to UniqueValues column family.
+ */
+public class UniqueValueSerializationStrategyImpl implements UniqueValueSerializationStrategy, Migration {
+
+    private static final Logger log = LoggerFactory.getLogger( UniqueValueSerializationStrategyImpl.class );
+
+    // TODO: use "real" field serializer here instead once it is ready
+    private static final CollectionScopedRowKeySerializer<Field> ROW_KEY_SER =
+            new CollectionScopedRowKeySerializer<Field>( FieldSerializer.get() );
+
+    private static final EntityVersionSerializer ENTITY_VERSION_SER = new EntityVersionSerializer();
+
+    private static final MultiTennantColumnFamily<CollectionScope, Field, EntityVersion> CF_UNIQUE_VALUES =
+            new MultiTennantColumnFamily<CollectionScope, Field, EntityVersion>( "Unique_Values", ROW_KEY_SER,
+                    ENTITY_VERSION_SER );
+
+    protected final Keyspace keyspace;
+
+
+    /**
+     * Construct serialization strategy for keyspace.
+     *
+     * @param keyspace Keyspace in which to store Unique Values.
+     */
+    @Inject
+    public UniqueValueSerializationStrategyImpl( final Keyspace keyspace ) {
+        this.keyspace = keyspace;
+    }
+
+
+    @Override
+    public java.util.Collection getColumnFamilies() {
+
+        MultiTennantColumnFamilyDefinition cf =
+                new MultiTennantColumnFamilyDefinition( CF_UNIQUE_VALUES, BytesType.class.getSimpleName(),
+                        ColumnTypes.DYNAMIC_COMPOSITE_TYPE, BytesType.class.getSimpleName(),
+                        MultiTennantColumnFamilyDefinition.CacheOption.KEYS );
+
+        return Collections.singleton( cf );
+    }
+
+
+    public MutationBatch write( UniqueValue uniqueValue ) {
+        return write( uniqueValue, Integer.MAX_VALUE );
+    }
+
+
+    @Override
+    public MutationBatch write( UniqueValue value, Integer timeToLive ) {
+
+        Preconditions.checkNotNull( value, "value is required" );
+        Preconditions.checkNotNull( timeToLive, "timeToLive is required" );
+
+        log.debug( "Writing unique value scope={} id={} version={} name={} value={} ttl={} ", new Object[] {
+                value.getCollectionScope().getName(), value.getEntityId(), value.getEntityVersion(),
+                value.getField().getName(), value.getField().getValue(), timeToLive
+        } );
+
+        final EntityVersion ev = new EntityVersion( value.getEntityId(), value.getEntityVersion() );
+
+        final Integer ttl;
+        if ( timeToLive.equals( Integer.MAX_VALUE ) ) {
+            ttl = null;
+        }
+        else {
+            ttl = timeToLive;
+        }
+
+        return doWrite( value.getCollectionScope(), value.getField(), new UniqueValueSerializationStrategyImpl.RowOp() {
+
+            @Override
+            public void doOp( final ColumnListMutation<EntityVersion> colMutation ) {
+                colMutation.putColumn( ev, 0x0, ttl );
+            }
+        } );
+    }
+
+
+    @Override
+    public MutationBatch delete( UniqueValue value ) {
+
+        Preconditions.checkNotNull( value, "value is required" );
+
+        final EntityVersion ev = new EntityVersion( value.getEntityId(), value.getEntityVersion() );
+
+        return doWrite( value.getCollectionScope(), value.getField(), new UniqueValueSerializationStrategyImpl.RowOp() {
+
+            @Override
+            public void doOp( final ColumnListMutation<EntityVersion> colMutation ) {
+                colMutation.deleteColumn( ev );
+            }
+        } );
+    }
+
+
+    /**
+     * Do the column update or delete for the given column and row key
+     *
+     * @param context We need to use this when getting the keyspace
+     */
+    private MutationBatch doWrite( CollectionScope context, Field field, RowOp op ) {
+        final MutationBatch batch = keyspace.prepareMutationBatch();
+        op.doOp( batch.withRow( CF_UNIQUE_VALUES, ScopedRowKey.fromKey( context, field ) ) );
+        return batch;
+    }
+
+
+    @Override
+    public UniqueValueSet load( final CollectionScope colScope, final Collection<Field> fields )
+            throws ConnectionException {
+
+        Preconditions.checkNotNull( fields, "fields are required" );
+        Preconditions.checkArgument( fields.size() > 0, "More than 1 field msut be specified" );
+
+        final List<ScopedRowKey<CollectionScope, Field>> keys = new ArrayList<>( fields.size() );
+
+        for ( Field field : fields ) {
+            final ScopedRowKey<CollectionScope, Field> rowKey = ScopedRowKey.fromKey( colScope, field );
+
+            keys.add( rowKey );
+        }
+
+        final UniqueValueSetImpl uniqueValueSet = new UniqueValueSetImpl( fields.size() );
+
+        Iterator<Row<ScopedRowKey<CollectionScope, Field>, EntityVersion>> results =
+                keyspace.prepareQuery( CF_UNIQUE_VALUES ).getKeySlice( keys )
+                        .withColumnRange( new RangeBuilder().setLimit( 1 ).build() ).execute().getResult().iterator();
+
+
+        while ( results.hasNext() )
+
+        {
+
+            final Row<ScopedRowKey<CollectionScope, Field>, EntityVersion> unique = results.next();
+
+
+            final Field field = unique.getKey().getKey();
+
+            final ColumnList<EntityVersion> columnList = unique.getColumns();
+
+            //sanity check, nothing to do, skip it
+            if ( columnList.size() < 1 ) {
+                continue;
+            }
+
+            final EntityVersion entityVersion = columnList.getColumnByIndex( 0 ).getName();
+
+
+            final UniqueValueImpl uniqueValue = new UniqueValueImpl( colScope, field, entityVersion.getEntityId(),
+                    entityVersion.getEntityVersion() );
+
+            uniqueValueSet.addValue( uniqueValue );
+        }
+
+        return uniqueValueSet;
+    }
+
+
+    /**
+     * Simple callback to perform puts and deletes with a common row setup code
+     */
+    private static interface RowOp {
+        void doOp( ColumnListMutation<EntityVersion> colMutation );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSetImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSetImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSetImpl.java
new file mode 100644
index 0000000..8dd9528
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSetImpl.java
@@ -0,0 +1,85 @@
+package org.apache.usergrid.persistence.collection.serialization.impl;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
+
+
+public class UniqueValueSetImpl implements UniqueValueSet {
+
+    private final Map<String, UniqueValue> values;
+
+    public UniqueValueSetImpl(final int expectedMaxSize) {
+        values = new HashMap<>(expectedMaxSize);
+    }
+
+
+    public void addValue(UniqueValue value){
+        values.put( value.getField().getName(), value );
+    }
+
+    @Override
+    public UniqueValue getValue( final String fieldName ) {
+        return values.get( fieldName );
+    }
+
+
+    @Override
+    public Iterator<UniqueValue> iterator() {
+        return new UniqueValueIterator(values.entrySet());
+    }
+
+
+    /**
+     * Inner class of unique value iterator
+     */
+    private static final class
+            UniqueValueIterator implements Iterator<UniqueValue>{
+
+        private final Iterator<Map.Entry<String, UniqueValue>> sourceIterator;
+
+        public UniqueValueIterator( final Set<Map.Entry<String, UniqueValue>> entries ) {
+            this.sourceIterator = entries.iterator();
+        }
+
+
+        @Override
+        public boolean hasNext() {
+            return sourceIterator.hasNext();
+        }
+
+
+        @Override
+        public UniqueValue next() {
+            return sourceIterator.next().getValue();
+        }
+
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException( "Remove is unsupported" );
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
index c90d079..c2876b0 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
@@ -1,6 +1,8 @@
 package org.apache.usergrid.persistence.collection.mvcc.stage.delete;
 
 
+import java.security.Key;
+
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
@@ -13,11 +15,12 @@ import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
 import org.apache.usergrid.persistence.collection.mvcc.stage.AbstractMvccEntityStageTest;
 import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
 import org.apache.usergrid.persistence.collection.mvcc.stage.TestEntityGenerator;
-import org.apache.usergrid.persistence.collection.mvcc.stage.write.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteCommit;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.model.entity.Entity;
 
+import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.MutationBatch;
 
 import static org.junit.Assert.assertEquals;
@@ -111,13 +114,16 @@ public class MarkCommitTest extends AbstractMvccEntityStageTest {
         final MutationBatch entityMutation = mock( MutationBatch.class );
         final SerializationFig serializationFig = mock(SerializationFig.class);
         final UniqueValueSerializationStrategy uniqueValueSerializationStrategy = mock(UniqueValueSerializationStrategy.class);
+        final Keyspace keyspace = mock( Keyspace.class );
+
+        when(keyspace.prepareMutationBatch()).thenReturn( entityMutation );
 
         when( logStrategy.write( any( CollectionScope.class ), any( MvccLogEntry.class ) ) ).thenReturn( logMutation );
         when( mvccEntityStrategy.write( any( CollectionScope.class ), any( MvccEntity.class ) ) )
                 .thenReturn( entityMutation );
 
 
-        new MarkCommit( logStrategy, mvccEntityStrategy, uniqueValueSerializationStrategy, serializationFig ).call( event );
+        new MarkCommit( logStrategy, mvccEntityStrategy, uniqueValueSerializationStrategy, serializationFig, keyspace ).call( event );
 
         //TODO: This doesn't assert anything, this needs fixed (should be a fail technically)
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/EntityVersionSerializerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/EntityVersionSerializerTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/EntityVersionSerializerTest.java
index 42e481e..889cba9 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/EntityVersionSerializerTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/EntityVersionSerializerTest.java
@@ -23,6 +23,8 @@ import java.nio.ByteBuffer;
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.apache.usergrid.persistence.collection.serialization.impl.EntityVersion;
+import org.apache.usergrid.persistence.collection.serialization.impl.EntityVersionSerializer;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/FieldSerializerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/FieldSerializerTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/FieldSerializerTest.java
index e3ec59d..dcff324 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/FieldSerializerTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/FieldSerializerTest.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.apache.usergrid.persistence.collection.serialization.impl.FieldSerializer;
 import org.apache.usergrid.persistence.model.field.Field;
 import org.apache.usergrid.persistence.model.field.IntegerField;
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueSerializationStrategyImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueSerializationStrategyImplTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueSerializationStrategyImplTest.java
index c323883..f03baba 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueSerializationStrategyImplTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueSerializationStrategyImplTest.java
@@ -18,11 +18,11 @@
 package org.apache.usergrid.persistence.collection.mvcc.stage.write;
 
 
+import java.util.Collections;
 import java.util.UUID;
 
 import org.jukito.UseModules;
 import org.junit.Assert;
-import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -33,16 +33,21 @@ import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
 import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
 import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
-import org.apache.usergrid.persistence.core.cassandra.CassandraRule;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
+import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
 import org.apache.usergrid.persistence.core.cassandra.ITRunner;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.field.Field;
 import org.apache.usergrid.persistence.model.field.IntegerField;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
 import com.google.inject.Inject;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
+
 @RunWith( ITRunner.class )
 @UseModules( TestCollectionModule.class )
 public class UniqueValueSerializationStrategyImplTest {
@@ -51,7 +56,7 @@ public class UniqueValueSerializationStrategyImplTest {
     @Inject
     @Rule
     public MigrationManagerRule migrationManagerRule;
-    
+
     @Inject
     UniqueValueSerializationStrategy strategy;
 
@@ -59,16 +64,18 @@ public class UniqueValueSerializationStrategyImplTest {
     @Test
     public void testBasicOperation() throws ConnectionException, InterruptedException {
 
-        CollectionScope scope = new CollectionScopeImpl(
-                new SimpleId( "organization" ), new SimpleId( "test" ), "test" );
+        CollectionScope scope =
+                new CollectionScopeImpl( new SimpleId( "organization" ), new SimpleId( "test" ), "test" );
 
         IntegerField field = new IntegerField( "count", 5 );
-        Id entityId = new SimpleId( UUIDGenerator.newTimeUUID(), "entity");
+        Id entityId = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
         UUID version = UUIDGenerator.newTimeUUID();
         UniqueValue stored = new UniqueValueImpl( scope, field, entityId, version );
         strategy.write( stored ).execute();
 
-        UniqueValue retrieved = strategy.load( scope, field );
+        UniqueValueSet fields = strategy.load( scope, Collections.<Field>singleton( field ) );
+
+        UniqueValue retrieved = fields.getValue( field.getName() );
         Assert.assertNotNull( retrieved );
         Assert.assertEquals( stored, retrieved );
     }
@@ -76,13 +83,13 @@ public class UniqueValueSerializationStrategyImplTest {
 
     @Test
     public void testWriteWithTTL() throws InterruptedException, ConnectionException {
-        
-        CollectionScope scope = new CollectionScopeImpl(
-                new SimpleId( "organization" ), new SimpleId( "test" ), "test" );
+
+        CollectionScope scope =
+                new CollectionScopeImpl( new SimpleId( "organization" ), new SimpleId( "test" ), "test" );
 
         // write object that lives 2 seconds
         IntegerField field = new IntegerField( "count", 5 );
-        Id entityId = new SimpleId( UUIDGenerator.newTimeUUID(), "entity");
+        Id entityId = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
         UUID version = UUIDGenerator.newTimeUUID();
         UniqueValue stored = new UniqueValueImpl( scope, field, entityId, version );
         strategy.write( stored, 2 ).execute();
@@ -90,14 +97,19 @@ public class UniqueValueSerializationStrategyImplTest {
         Thread.sleep( 1000 );
 
         // waited one sec, should be still here
-        UniqueValue retrieved = strategy.load( scope, field );
+        UniqueValueSet fields = strategy.load( scope, Collections.<Field>singleton( field ) );
+
+        UniqueValue retrieved = fields.getValue( field.getName() );
+
         Assert.assertNotNull( retrieved );
         Assert.assertEquals( stored, retrieved );
 
         Thread.sleep( 1500 );
 
         // wait another second, should be gone now
-        UniqueValue nullExpected = strategy.load( scope, field );
+        fields = strategy.load( scope, Collections.<Field>singleton( field ) );
+
+        UniqueValue nullExpected = fields.getValue( field.getName() );
         Assert.assertNull( nullExpected );
     }
 
@@ -105,18 +117,22 @@ public class UniqueValueSerializationStrategyImplTest {
     @Test
     public void testDelete() throws ConnectionException {
 
-        CollectionScope scope = new CollectionScopeImpl(
-                new SimpleId( "organization" ), new SimpleId( "test" ), "test" );
+        CollectionScope scope =
+                new CollectionScopeImpl( new SimpleId( "organization" ), new SimpleId( "test" ), "test" );
 
         IntegerField field = new IntegerField( "count", 5 );
-        Id entityId = new SimpleId( UUIDGenerator.newTimeUUID(), "entity");
+        Id entityId = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
         UUID version = UUIDGenerator.newTimeUUID();
         UniqueValue stored = new UniqueValueImpl( scope, field, entityId, version );
         strategy.write( stored ).execute();
 
         strategy.delete( stored ).execute();
 
-        UniqueValue nullExpected = strategy.load( scope, field );
+        UniqueValueSet fields = strategy.load( scope, Collections.<Field>singleton( field ) );
+
+        UniqueValue nullExpected = fields.getValue( field.getName() );
+
+
         Assert.assertNull( nullExpected );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
index 981a14d..b0ccb4f 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
@@ -30,6 +30,7 @@ import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
 import org.apache.usergrid.persistence.collection.mvcc.stage.AbstractMvccEntityStageTest;
 import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
 import org.apache.usergrid.persistence.collection.mvcc.stage.TestEntityGenerator;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.model.entity.Entity;
 
 import com.netflix.astyanax.MutationBatch;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerifyTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerifyTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerifyTest.java
index 64e5e2e..6f31412 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerifyTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerifyTest.java
@@ -36,6 +36,9 @@ import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
 import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
 import org.apache.usergrid.persistence.collection.mvcc.stage.AbstractMvccEntityStageTest;
 import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 import org.apache.usergrid.persistence.model.field.StringField;
@@ -96,20 +99,10 @@ public class WriteOptimisticVerifyTest extends AbstractMvccEntityStageTest {
         // Run the stage
         WriteOptimisticVerify newStage = new WriteOptimisticVerify( noConflictLog );
 
-        CollectionIoEvent<MvccEntity> result;
-        result = newStage.call( new CollectionIoEvent<MvccEntity>( collectionScope, mvccEntity ) );
 
-        assertSame("Context was correct", collectionScope, result.getEntityCollection()) ;
+        newStage.call( new CollectionIoEvent<>( collectionScope, mvccEntity ) );
 
-        // Verify the entity is correct
-        MvccEntity entry = result.getEvent();
 
-        // Verify UUID and version in both the MvccEntity and the entity itself. Here assertSame 
-        // is used on purpose as we want to make sure the same instance is used, not a copy.
-        // This way the caller's runtime type is retained.
-        assertSame( "Id correct", entity.getId(), entry.getId() );
-        assertSame( "Version did not not match entityId", entity.getVersion(), entry.getVersion() );
-        assertSame( "Entity correct", entity, entry.getEntity().get() );
     }
 
     @Test
@@ -159,7 +152,7 @@ public class WriteOptimisticVerifyTest extends AbstractMvccEntityStageTest {
         RollbackAction rollbackAction = new RollbackAction( mvccLog, uvstrat );
 
         try {
-            newStage.call( new CollectionIoEvent<MvccEntity>(scope, mvccEntity));
+            newStage.call( new CollectionIoEvent<>(scope, mvccEntity));
 
         } catch (WriteOptimisticVerifyException e) {
             log.info("Error", e);

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyStageTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyStageTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyStageTest.java
deleted file mode 100644
index d4f6507..0000000
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyStageTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.mvcc.stage.write;
-
-
-import org.jukito.UseModules;
-
-import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-import org.apache.usergrid.persistence.collection.mvcc.stage.AbstractMvccEntityStageTest;
-import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
-import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
-
-import static org.mockito.Mockito.mock;
-
-
-/**
- * TODO: Update the test to correctly test for detecting more than 1 duplicate and exception handling correctly
- *
- * @author tnine
- */
-@UseModules( TestCollectionModule.class )
-public class WriteUniqueVerifyStageTest extends AbstractMvccEntityStageTest {
-
-    @Override
-    protected void validateStage( final CollectionIoEvent<MvccEntity> event ) {
-        UniqueValueSerializationStrategy uvstrat = mock( UniqueValueSerializationStrategy.class );
-        SerializationFig fig = mock( SerializationFig.class );
-        new WriteUniqueVerify( uvstrat, fig ).call( event );
-    }
-}
-
-

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
index 15aff3d..ba89503 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
@@ -29,15 +29,19 @@ import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
 import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.core.cassandra.ITRunner;
 import org.apache.usergrid.persistence.model.entity.Entity;
 
 import com.google.inject.Inject;
+import com.netflix.astyanax.Keyspace;
 
 import static org.apache.usergrid.persistence.collection.mvcc.stage.TestEntityGenerator.fromEntity;
 import static org.apache.usergrid.persistence.collection.mvcc.stage.TestEntityGenerator.generateEntity;
 import static org.junit.Assert.assertSame;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
 
 
 @RunWith( ITRunner.class )
@@ -58,43 +62,12 @@ public class WriteUniqueVerifyTest {
     private SerializationFig fig;
 
 
-    /**
-     * Standard flow
-     */
-    @Test( timeout = 5000 )
-    public void testStartStage() throws Exception {
-
-        final CollectionScope collectionScope = mock( CollectionScope.class );
-
-        // set up the mock to return the entity from the start phase
-        final Entity entity = generateEntity();
-
-        final MvccEntity mvccEntity = fromEntity( entity );
-
-        // run the stage
-        WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, fig );
-
-        CollectionIoEvent<MvccEntity> result = newStage.call( 
-            new CollectionIoEvent<MvccEntity>( collectionScope, mvccEntity ) )
-                .toBlocking().last();
-
-        assertSame( "Context was correct", collectionScope, result.getEntityCollection() );
-
-        // verify the entity is correct
-        MvccEntity entry = result.getEvent();
-
-        // verify uuid and version in both the MvccEntity and the entity itself. assertSame is 
-        // used on purpose.  We want to make sure the same instance is used, not a copy.
-        // this way the caller's runtime type is retained.
-        assertSame( "id correct", entity.getId(), entry.getId() );
-        assertSame( "version did not not match entityId", entity.getVersion(), entry.getVersion() );
-        assertSame( "Entity correct", entity, entry.getEntity().get() );
-    }
 
 
     @Test
     public void testNoFields() {
         final CollectionScope collectionScope = mock( CollectionScope.class );
+        final Keyspace keyspace = mock(Keyspace.class);
 
         // set up the mock to return the entity from the start phase
         final Entity entity = generateEntity();
@@ -102,13 +75,14 @@ public class WriteUniqueVerifyTest {
         final MvccEntity mvccEntity = fromEntity( entity );
 
         // run the stage
-        WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, fig );
+        WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, fig, keyspace );
+
+       newStage.call(
+            new CollectionIoEvent<>( collectionScope, mvccEntity ) ) ;
 
-        CollectionIoEvent<MvccEntity> result = newStage.call( 
-            new CollectionIoEvent<MvccEntity>( collectionScope, mvccEntity ) )
-                .toBlocking().last();
+       //if we get here, it's a success.  We want to test no exceptions are thrown
 
-        assertSame( "Context was correct", collectionScope, result.getEntityCollection() );
+        verify(keyspace, never()).prepareMutationBatch();
     }
 
 }