You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/10/07 21:34:40 UTC
[3/4] git commit: First pass at updating interfaces
First pass at updating interfaces
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/fd841758
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/fd841758
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/fd841758
Branch: refs/heads/two-dot-o-events
Commit: fd841758a0649474337f9522d5f21a51fb439933
Parents: 17cc01d
Author: Todd Nine <to...@apache.org>
Authored: Tue Oct 7 13:33:08 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Tue Oct 7 13:33:08 2014 -0600
----------------------------------------------------------------------
.../collection/guice/CollectionModule.java | 5 +-
.../impl/EntityCollectionManagerImpl.java | 37 ++--
.../mvcc/event/PostProcessObserver.java | 39 ----
.../mvcc/stage/delete/MarkCommit.java | 153 +++++++------
.../mvcc/stage/write/EntityVersion.java | 64 ------
.../stage/write/EntityVersionSerializer.java | 71 ------
.../mvcc/stage/write/FieldSerializer.java | 107 ---------
.../mvcc/stage/write/RollbackAction.java | 3 +
.../mvcc/stage/write/UniqueValue.java | 39 ----
.../mvcc/stage/write/UniqueValueImpl.java | 124 -----------
.../write/UniqueValueSerializationStrategy.java | 66 ------
.../UniqueValueSerializationStrategyImpl.java | 194 ----------------
.../mvcc/stage/write/WriteCommit.java | 5 +-
.../mvcc/stage/write/WriteOptimisticVerify.java | 10 +-
.../mvcc/stage/write/WriteUniqueVerify.java | 206 ++++++++---------
.../collection/serialization/UniqueValue.java | 55 +++++
.../UniqueValueSerializationStrategy.java | 68 ++++++
.../serialization/UniqueValueSet.java | 32 +++
.../serialization/impl/EntityVersion.java | 64 ++++++
.../impl/EntityVersionSerializer.java | 71 ++++++
.../serialization/impl/FieldSerializer.java | 107 +++++++++
.../serialization/impl/SerializationModule.java | 3 +-
.../serialization/impl/UniqueValueImpl.java | 125 +++++++++++
.../UniqueValueSerializationStrategyImpl.java | 219 +++++++++++++++++++
.../serialization/impl/UniqueValueSetImpl.java | 85 +++++++
.../mvcc/stage/delete/MarkCommitTest.java | 10 +-
.../write/EntityVersionSerializerTest.java | 2 +
.../mvcc/stage/write/FieldSerializerTest.java | 1 +
...niqueValueSerializationStrategyImplTest.java | 50 +++--
.../mvcc/stage/write/WriteCommitTest.java | 1 +
.../stage/write/WriteOptimisticVerifyTest.java | 17 +-
.../stage/write/WriteUniqueVerifyStageTest.java | 48 ----
.../mvcc/stage/write/WriteUniqueVerifyTest.java | 48 +---
33 files changed, 1101 insertions(+), 1028 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index 9d949e8..3336166 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -29,12 +29,11 @@ import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerialization
import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLogGenerator;
import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLogGeneratorImpl;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-import org.apache.usergrid.persistence.collection.mvcc.stage.write.UniqueValueSerializationStrategy;
-import org.apache.usergrid.persistence.collection.mvcc.stage.write.UniqueValueSerializationStrategyImpl;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueSerializationStrategyImpl;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
import org.apache.usergrid.persistence.collection.serialization.impl.SerializationModule;
-import org.apache.usergrid.persistence.collection.service.UUIDService;
import org.apache.usergrid.persistence.collection.service.impl.ServiceModule;
import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl;
import org.apache.usergrid.persistence.core.task.TaskExecutor;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 094baa6..6299acb 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -138,7 +138,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
// create our observable and start the write
- CollectionIoEvent<Entity> writeData = new CollectionIoEvent<Entity>( collectionScope, entity );
+ final CollectionIoEvent<Entity> writeData = new CollectionIoEvent<Entity>( collectionScope, entity );
Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner( writeData,writeStart );
@@ -147,15 +147,10 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
// observable = Concurrent.concurrent( observable, Schedulers.io(), new WaitZip(),
// writeVerifyUnique, writeOptimisticVerify );
- observable.doOnNext( new Action1<CollectionIoEvent<MvccEntity>>() {
- @Override
- public void call( final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent ) {
- //Queue future write here (verify)
- }
- } ).map( writeCommit ).doOnNext( new Action1<Entity>() {
+ observable.map( writeCommit ).doOnNext( new Action1<Entity>() {
@Override
public void call( final Entity entity ) {
- //fork background processing here (start)
+ //TODO fire a task here
//post-processing to come later. leave it empty for now.
}
@@ -175,7 +170,13 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
Preconditions.checkNotNull( entityId.getType(), "Entity type is required in this stage" );
return Observable.from( new CollectionIoEvent<Id>( collectionScope, entityId ) )
- .map( markStart ).map( markCommit );
+ .map( markStart ).doOnNext( markCommit ).map( new Func1<CollectionIoEvent<MvccEntity>,
+ Void>() {
+ @Override
+ public Void call( final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent ) {
+ return null;
+ }
+ } );
}
@@ -217,7 +218,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
//we an update, signal the fix
- //TODO T.N Change this to use request collapsing
+ //TODO T.N Change this to fire a task
Observable.from( new CollectionIoEvent<Id>(collectionScope, entityId ) ).map( load ).subscribeOn( Schedulers.io() ).subscribe();
@@ -226,27 +227,29 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
}
// fire the stages
- public Observable<CollectionIoEvent<MvccEntity>> stageRunner( CollectionIoEvent<Entity> writeData,WriteStart writeState ) {
+ public Observable<CollectionIoEvent<MvccEntity>> stageRunner( CollectionIoEvent<Entity> writeData, WriteStart writeState ) {
- return Observable.from( writeData ).map( writeState ).flatMap(
- new Func1<CollectionIoEvent<MvccEntity>, Observable<CollectionIoEvent<MvccEntity>>>() {
+ return Observable.from( writeData ).map( writeState ).doOnNext( new Action1<CollectionIoEvent<MvccEntity>>() {
@Override
- public Observable<CollectionIoEvent<MvccEntity>> call(
+ public void call(
final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent ) {
Observable<CollectionIoEvent<MvccEntity>> unique =
Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() )
- .flatMap( writeVerifyUnique );
+ .doOnNext( writeVerifyUnique );
// optimistic verification
Observable<CollectionIoEvent<MvccEntity>> optimistic =
Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() )
- .map( writeOptimisticVerify );
+ .doOnNext( writeOptimisticVerify );
+
+
+ //wait for both to finish
+ Observable.merge( unique, optimistic ).toBlocking().last();
- return Observable.merge( unique, optimistic).last();
}
} );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/event/PostProcessObserver.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/event/PostProcessObserver.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/event/PostProcessObserver.java
deleted file mode 100644
index b06957d..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/event/PostProcessObserver.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.collection.mvcc.event;
-
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-
-
-/**
- * @author: tnine
- */
-public interface PostProcessObserver {
-
-
- /**
- * The entity was comitted by the MVCC system. Post processing needs to occur
- *
- * @param scope The scope used in the write pipeline
- * @param entity The entity used in the write pipeline
- *
- */
- public void postCommit(CollectionScope scope, MvccEntity entity );
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
index 4e8d6d5..61a2a36 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
@@ -19,7 +19,6 @@
package org.apache.usergrid.persistence.collection.mvcc.stage.delete;
-import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
@@ -28,7 +27,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
@@ -37,31 +35,31 @@ import org.apache.usergrid.persistence.collection.mvcc.entity.MvccValidationUtil
import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
-import org.apache.usergrid.persistence.collection.mvcc.stage.write.UniqueValue;
-import org.apache.usergrid.persistence.collection.mvcc.stage.write.UniqueValueSerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
import org.apache.usergrid.persistence.core.rx.ObservableIterator;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.field.Field;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.Observable;
-import rx.functions.Func1;
+import rx.functions.Action1;
/**
- * This phase should invoke any finalization, and mark the entity
- * as committed in the data store before returning
+ * This phase should invoke any finalization, and mark the entity as committed in the data store before returning
*/
@Singleton
-public class MarkCommit implements Func1<CollectionIoEvent<MvccEntity>, Void> {
+public class MarkCommit implements Action1<CollectionIoEvent<MvccEntity>> {
private static final Logger LOG = LoggerFactory.getLogger( MarkCommit.class );
@@ -69,28 +67,29 @@ public class MarkCommit implements Func1<CollectionIoEvent<MvccEntity>, Void> {
private final MvccEntitySerializationStrategy entityStrat;
private final SerializationFig serializationFig;
private final UniqueValueSerializationStrategy uniqueValueStrat;
+ private final Keyspace keyspace;
+
@Inject
public MarkCommit( final MvccLogEntrySerializationStrategy logStrat,
final MvccEntitySerializationStrategy entityStrat,
- final UniqueValueSerializationStrategy uniqueValueStrat,
- final SerializationFig serializationFig) {
+ final UniqueValueSerializationStrategy uniqueValueStrat, final SerializationFig serializationFig,
+ final Keyspace keyspace ) {
+
- Preconditions.checkNotNull(
- logStrat, "logEntrySerializationStrategy is required" );
- Preconditions.checkNotNull(
- entityStrat, "entitySerializationStrategy is required" );
+ Preconditions.checkNotNull( logStrat, "logEntrySerializationStrategy is required" );
+ Preconditions.checkNotNull( entityStrat, "entitySerializationStrategy is required" );
this.logStrat = logStrat;
this.entityStrat = entityStrat;
this.serializationFig = serializationFig;
this.uniqueValueStrat = uniqueValueStrat;
+ this.keyspace = keyspace;
}
-
@Override
- public Void call( final CollectionIoEvent<MvccEntity> idIoEvent ) {
+ public void call( final CollectionIoEvent<MvccEntity> idIoEvent ) {
final MvccEntity entity = idIoEvent.getEvent();
@@ -103,64 +102,86 @@ public class MarkCommit implements Func1<CollectionIoEvent<MvccEntity>, Void> {
final CollectionScope collectionScope = idIoEvent.getEntityCollection();
- final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version,
- Stage.COMMITTED, MvccLogEntry.State.DELETED );
+ LOG.debug("Inserting tombstone for entity {} at version {}", entityId, version );
- final MutationBatch logMutation = logStrat.write( collectionScope, startEntry );
+ final MvccLogEntry startEntry =
+ new MvccLogEntryImpl( entityId, version, Stage.COMMITTED, MvccLogEntry.State.DELETED );
+
+ final MutationBatch entityStateBatch = logStrat.write( collectionScope, startEntry );
//insert a "cleared" value into the versions. Post processing should actually delete
- MutationBatch entityMutation = entityStrat.mark( collectionScope, entityId, version );
-
- //merge the 2 into 1 mutation
- logMutation.mergeShallow( entityMutation );
-
- //set up the post processing queue
- //delete unique fields
- Observable<List<Field>> deleteFieldsObservable = Observable.create(new ObservableIterator<Field>("deleteColumns") {
- @Override
- protected Iterator<Field> getIterator() {
- Iterator<MvccEntity> entities = entityStrat.load(collectionScope, entityId, entity.getVersion(), 1);
- Iterator<Field> fieldIterator = Collections.emptyIterator();
- if (entities.hasNext()) {
- Optional<Entity> oe = entities.next().getEntity();
- if (oe.isPresent()) {
- fieldIterator = oe.get().getFields().iterator();
- }
- }
- return fieldIterator;
- }
- }).buffer(serializationFig.getBufferSize())
- .map(new Func1<List<Field>, List<Field>>() {
- @Override
- public List<Field> call(List<Field> fields) {
- for (Field field : fields) {
- try {
- UniqueValue value = uniqueValueStrat.load(collectionScope, field);
- if (value != null) {
- logMutation.mergeShallow(uniqueValueStrat.delete(value));
- }
- } catch (ConnectionException ce) {
- LOG.error("Failed to delete Unique Value", ce);
- }
- }
- return fields;
- }
- });
- deleteFieldsObservable.toBlocking().firstOrDefault(null);
try {
- logMutation.execute();
+ final MutationBatch entityBatch = entityStrat.mark( collectionScope, entityId, version );
+ entityStateBatch.mergeShallow( entityBatch );
+ entityStateBatch.execute();
}
catch ( ConnectionException e ) {
- LOG.error( "Failed to execute write asynchronously ", e );
- throw new CollectionRuntimeException( entity, collectionScope,
- "Failed to execute write asynchronously ", e );
+ throw new RuntimeException( "Unable to mark entry as deleted" );
}
- /**
- * We're done executing.
- */
- return null;
+ //TODO Refactor this logic into a a class that can be invoked from anywhere
+ //load every entity we have history of
+ Observable<List<MvccEntity>> deleteFieldsObservable =
+ Observable.create( new ObservableIterator<MvccEntity>( "deleteColumns" ) {
+ @Override
+ protected Iterator<MvccEntity> getIterator() {
+ Iterator<MvccEntity> entities =
+ entityStrat.load( collectionScope, entityId, entity.getVersion(), 100 );
+
+ return entities;
+ }
+ } ) //buffer them for efficiency
+ .buffer( serializationFig.getBufferSize() ).doOnNext(
+
+ new Action1<List<MvccEntity>>() {
+ @Override
+ public void call( final List<MvccEntity> mvccEntities ) {
+
+
+ final MutationBatch batch = keyspace.prepareMutationBatch();
+
+ for ( MvccEntity mvccEntity : mvccEntities ) {
+ if ( !mvccEntity.getEntity().isPresent() ) {
+ continue;
+ }
+
+ final UUID entityVersion = mvccEntity.getVersion();
+
+ final Entity entity = mvccEntity.getEntity().get();
+
+ //remove all unique fields from the index
+ for ( final Field field : entity.getFields() ) {
+
+ if(!field.isUnique()){
+ continue;
+ }
+
+ final UniqueValue unique = new UniqueValueImpl( collectionScope, field, entityId, entityVersion );
+
+ final MutationBatch deleteMutation = uniqueValueStrat.delete( unique );
+
+ batch.mergeShallow( deleteMutation );
+ }
+ }
+
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException e1 ) {
+ throw new RuntimeException( "Unable to execute " +
+ "unique value " +
+ "delete", e1 );
+ }
+ }
+ }
+
+
+ );
+
+ final int removedCount = deleteFieldsObservable.count().toBlocking().last();
+
+ LOG.debug("Removed unique values for {} entities of entity {}", removedCount, entityId );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/EntityVersion.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/EntityVersion.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/EntityVersion.java
deleted file mode 100644
index d62ae87..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/EntityVersion.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.mvcc.stage.write;
-
-
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.model.entity.Id;
-
-/**
- * Combine entity ID and entity version for use as column name for UniqueValues Column Family.
- */
-class EntityVersion {
- private final Id entityId;
- private final UUID entityVersion;
-
- public EntityVersion(Id id, UUID uuid) {
- this.entityId = id;
- this.entityVersion = uuid;
- }
-
- public Id getEntityId() {
- return entityId;
- }
-
- public UUID getEntityVersion() {
- return entityVersion;
- }
-
- public boolean equals( Object o ) {
-
- if ( o == null || !(o instanceof EntityVersion) ) {
- return false;
- }
-
- EntityVersion other = (EntityVersion)o;
-
- if ( !other.getEntityId().equals( getEntityId() )) {
- return false;
- }
-
- if ( !other.getEntityVersion().equals( getEntityVersion() )) {
- return false;
- }
-
- return true;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/EntityVersionSerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/EntityVersionSerializer.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/EntityVersionSerializer.java
deleted file mode 100644
index 86b5aff..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/EntityVersionSerializer.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.mvcc.stage.write;
-
-
-import java.nio.ByteBuffer;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.model.entity.SimpleId;
-
-import com.google.common.base.Preconditions;
-import com.netflix.astyanax.model.CompositeBuilder;
-import com.netflix.astyanax.model.Composites;
-import com.netflix.astyanax.model.DynamicComposite;
-import com.netflix.astyanax.serializers.AbstractSerializer;
-import com.netflix.astyanax.serializers.StringSerializer;
-import com.netflix.astyanax.serializers.UUIDSerializer;
-
-/**
- * Serialize EntityVersion, entity ID and version, for use a column name in Unique Values Column Family.
- */
-public class EntityVersionSerializer extends AbstractSerializer<EntityVersion> {
-
- private static final Logger LOG = LoggerFactory.getLogger( EntityVersionSerializer.class );
-
- @Override
- public ByteBuffer toByteBuffer(final EntityVersion ev) {
-
- CompositeBuilder builder = Composites.newDynamicCompositeBuilder();
-
- builder.addTimeUUID( ev.getEntityVersion() );
- builder.addTimeUUID( ev.getEntityId().getUuid() );
- builder.addString( ev.getEntityId().getType() );
-
- return builder.build();
- }
-
- @Override
- public EntityVersion fromByteBuffer(final ByteBuffer byteBuffer) {
-
- // would use Composites.newDynamicCompositeParser(byteBuffer) but it is not implemented
-
- DynamicComposite composite = DynamicComposite.fromByteBuffer(byteBuffer);
- Preconditions.checkArgument(composite.size() == 3, "Composite should have 3 elements");
-
- final UUID version = composite.get( 0, UUIDSerializer.get() );
- final UUID entityId = composite.get( 1, UUIDSerializer.get() );
- final String entityType = composite.get( 2, StringSerializer.get() );
-
- return new EntityVersion( new SimpleId( entityId, entityType ), version);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/FieldSerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/FieldSerializer.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/FieldSerializer.java
deleted file mode 100644
index 3719e3e..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/FieldSerializer.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.mvcc.stage.write;
-
-
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.commons.lang3.StringUtils;
-
-import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
-import org.apache.usergrid.persistence.model.field.DoubleField;
-import org.apache.usergrid.persistence.model.field.Field;
-import org.apache.usergrid.persistence.model.field.IntegerField;
-import org.apache.usergrid.persistence.model.field.LongField;
-import org.apache.usergrid.persistence.model.field.StringField;
-import org.apache.usergrid.persistence.model.field.UUIDField;
-
-import com.netflix.astyanax.model.CompositeBuilder;
-import com.netflix.astyanax.model.CompositeParser;
-
-// TODO: replace with "real" serializer
-
-/**
- * Serialize Field for use as part of row-key in Unique Values Column Family.
- */
-public class FieldSerializer implements CompositeFieldSerializer<Field> {
-
- private static final Logger LOG = LoggerFactory.getLogger( FieldSerializer.class );
-
- public enum FieldType {
- BOOLEAN_FIELD,
- DOUBLE_FIELD,
- INTEGER_FIELD,
- LONG_FIELD,
- STRING_FIELD,
- UUID_FIELD
- };
-
- private static final FieldSerializer INSTANCE = new FieldSerializer();
-
- @Override
- public void toComposite( final CompositeBuilder builder, final Field field ) {
-
- builder.addString( field.getName() );
-
- // TODO: use the real field value serializer(s) here? Store hash instead?
- builder.addString( field.getValue().toString() );
-
- String simpleName = field.getClass().getSimpleName();
- int nameIndex = simpleName.lastIndexOf(".");
- String fieldType = simpleName.substring(nameIndex + 1, simpleName.length() - "Field".length());
- fieldType = StringUtils.upperCase( fieldType + "_FIELD" );
-
- builder.addString( fieldType );
- }
-
- @Override
- public Field fromComposite( final CompositeParser composite ) {
-
- final String name = composite.readString();
- final String value = composite.readString();
- final String typeString = composite.readString();
-
- final FieldType fieldType = FieldType.valueOf( typeString );
-
- switch (fieldType) {
- case DOUBLE_FIELD:
- return new DoubleField(name, Double.parseDouble(value));
- case INTEGER_FIELD:
- return new IntegerField(name, Integer.parseInt(value));
- case LONG_FIELD:
- return new LongField(name, Long.parseLong(value));
- case STRING_FIELD:
- return new StringField(name, value);
- case UUID_FIELD:
- return new UUIDField(name, UUID.fromString(value));
- default:
- throw new RuntimeException("Unknown unique field type");
- }
- }
-
-
- /**
- * Get the singleton serializer
- */
- public static FieldSerializer get() {
- return INSTANCE;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java
index 7448816..dfccb34 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java
@@ -25,6 +25,9 @@ import org.apache.usergrid.persistence.collection.CollectionScope;
import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.field.Field;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValue.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValue.java
deleted file mode 100644
index de1594a..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValue.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.mvcc.stage.write;
-
-
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.field.Field;
-
-/**
- * Represents a Unique Value of a field within a collection.
- */
-public interface UniqueValue {
-
- public CollectionScope getCollectionScope();
-
- public Id getEntityId();
-
- public Field getField();
-
- public UUID getEntityVersion();
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueImpl.java
deleted file mode 100644
index 7c86491..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueImpl.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.mvcc.stage.write;
-
-
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.field.Field;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Represents a Unique Value of a field within a collection.
- */
-public class UniqueValueImpl implements UniqueValue {
- private final CollectionScope collectionScope;
- private final Field field;
- private final Id entityId;
- private final UUID entityVersion;
-
- public UniqueValueImpl(
- final CollectionScope scope, final Field field, Id entityId, final UUID version ) {
-
- Preconditions.checkNotNull( scope, "scope is required" );
- Preconditions.checkNotNull( field, "field is required" );
-// Preconditions.checkNotNull( version, "version is required" );
- Preconditions.checkNotNull( entityId, "entityId is required" );
-
- this.collectionScope = scope;
- this.field = field;
- this.entityVersion = version;
- this.entityId = entityId;
- }
-
- @Override
- public CollectionScope getCollectionScope() {
- return collectionScope;
- }
-
- @Override
- public Field getField() {
- return field;
- }
-
- @Override
- public UUID getEntityVersion() {
- return entityVersion;
- }
-
- @Override
- public Id getEntityId() {
- return entityId;
- }
-
-
- @Override
- public boolean equals( final Object o ) {
- if ( this == o ) {
- return true;
- }
- if ( o == null || getClass() != o.getClass() ) {
- return false;
- }
-
- final UniqueValueImpl that = ( UniqueValueImpl ) o;
-
- if ( !getCollectionScope().equals( that.getCollectionScope() ) ) {
- return false;
- }
-
- if ( !getField().equals( that.getField()) ) {
- return false;
- }
-
- if ( !getEntityVersion().equals( that.getEntityVersion() ) ) {
- return false;
- }
-
- if ( !getEntityId().equals( that.getEntityId() ) ) {
- return false;
- }
-
- return true;
- }
-
-
- @Override
- public int hashCode() {
- int result = 31 * getCollectionScope().hashCode();
- result = 31 * result + getField().hashCode();
- result = 31 * result + getEntityVersion().hashCode();
- result = 31 * result + getEntityId().hashCode();
- return result;
- }
-
-
- @Override
- public String toString() {
- return "UniqueValueImpl{" +
- ", collectionScope =" + collectionScope.getName() +
- ", field =" + field +
- ", entityVersion=" + entityVersion +
- ", entityId =" + entityId +
- '}';
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueSerializationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueSerializationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueSerializationStrategy.java
deleted file mode 100644
index 9773644..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueSerializationStrategy.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.mvcc.stage.write;
-
-
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.model.field.Field;
-
-
-/**
- * Reads and writes to UniqueValues column family.
- */
-public interface UniqueValueSerializationStrategy {
-
- /**
- * Write the specified UniqueValue to Cassandra with optional timeToLive in milliseconds.
- *
- * @param uniqueValue Object to be written
- * @return MutatationBatch that encapsulates operation, caller may or may not execute.
- */
- public MutationBatch write( UniqueValue uniqueValue );
-
- /**
- * Write the specified UniqueValue to Cassandra with optional timeToLive in milliseconds.
- *
- * @param uniqueValue Object to be written
- * @param timeToLive How long object should live in seconds
- * @return MutatationBatch that encapsulates operation, caller may or may not execute.
- */
- public MutationBatch write( UniqueValue uniqueValue, Integer timeToLive );
-
- /**
- * Load UniqueValue that matches field from collection or null if that value does not exist.
- *
- * @param colScope Collection scope in which to look for field name/value
- * @param field Field name/value to search for
- * @return UniqueValue or null if not found
- * @throws ConnectionException on error connecting to Cassandra
- */
- public UniqueValue load( CollectionScope colScope, Field field ) throws ConnectionException;
-
- /**
- * Delete the specified Unique Value from Cassandra.
- *
- * @param uniqueValue Object to be deleted.
- * @return MutatationBatch that encapsulates operation, caller may or may not execute.
- */
- public MutationBatch delete( UniqueValue uniqueValue );
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueSerializationStrategyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueSerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueSerializationStrategyImpl.java
deleted file mode 100644
index b7e113e..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/UniqueValueSerializationStrategyImpl.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.mvcc.stage.write;
-
-
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-import com.netflix.astyanax.ColumnListMutation;
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
-import com.netflix.astyanax.model.ColumnList;
-import com.netflix.astyanax.util.RangeBuilder;
-import java.util.Collections;
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.serialization.impl.CollectionScopedRowKeySerializer;
-import org.apache.usergrid.persistence.core.astyanax.ColumnTypes;
-import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
-import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
-import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
-import org.apache.usergrid.persistence.core.migration.Migration;
-import org.apache.usergrid.persistence.model.field.Field;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Reads and writes to UniqueValues column family.
- */
-public class UniqueValueSerializationStrategyImpl
- implements UniqueValueSerializationStrategy, Migration {
-
- private static final Logger log =
- LoggerFactory.getLogger( UniqueValueSerializationStrategyImpl.class );
-
- // TODO: use "real" field serializer here instead once it is ready
- private static final CollectionScopedRowKeySerializer<Field> ROW_KEY_SER =
- new CollectionScopedRowKeySerializer<Field>( FieldSerializer.get() );
-
- private static final EntityVersionSerializer ENTITY_VERSION_SER = new EntityVersionSerializer();
-
- private static final MultiTennantColumnFamily<CollectionScope, Field, EntityVersion>
- CF_UNIQUE_VALUES =
- new MultiTennantColumnFamily<CollectionScope, Field, EntityVersion>( "Unique_Values",
- ROW_KEY_SER,
- ENTITY_VERSION_SER );
-
- protected final Keyspace keyspace;
-
-
- /**
- * Construct serialization strategy for keyspace.
- * @param keyspace Keyspace in which to store Unique Values.
- */
- @Inject
- public UniqueValueSerializationStrategyImpl( final Keyspace keyspace ) {
- this.keyspace = keyspace;
- }
-
-
- @Override
- public java.util.Collection getColumnFamilies() {
-
- MultiTennantColumnFamilyDefinition cf = new MultiTennantColumnFamilyDefinition(
- CF_UNIQUE_VALUES,
- BytesType.class.getSimpleName(),
- ColumnTypes.DYNAMIC_COMPOSITE_TYPE,
- BytesType.class.getSimpleName(), MultiTennantColumnFamilyDefinition.CacheOption.KEYS );
-
- return Collections.singleton( cf );
- }
-
-
- public MutationBatch write( UniqueValue uniqueValue ) {
- return write( uniqueValue, Integer.MAX_VALUE );
- }
-
-
- @Override
- public MutationBatch write( UniqueValue value, Integer timeToLive ) {
-
- Preconditions.checkNotNull( value, "value is required" );
- Preconditions.checkNotNull( timeToLive, "timeToLive is required" );
-
- log.debug("Writing unique value scope={} id={} version={} name={} value={} ttl={} ",
- new Object[] {
- value.getCollectionScope().getName(),
- value.getEntityId(),
- value.getEntityVersion(),
- value.getField().getName(),
- value.getField().getValue(),
- timeToLive
- });
-
- final EntityVersion ev = new EntityVersion( value.getEntityId(), value.getEntityVersion() );
-
- final Integer ttl;
- if ( timeToLive.equals( Integer.MAX_VALUE )) {
- ttl = null;
- } else {
- ttl = timeToLive;
- }
-
- return doWrite( value.getCollectionScope(), value.getField(),
- new UniqueValueSerializationStrategyImpl.RowOp() {
-
- @Override
- public void doOp( final ColumnListMutation<EntityVersion> colMutation ) {
- colMutation.putColumn( ev, 0x0, ttl );
- }
- } );
- }
-
-
- @Override
- public MutationBatch delete(UniqueValue value) {
-
- Preconditions.checkNotNull( value, "value is required" );
-
- final EntityVersion ev = new EntityVersion( value.getEntityId(), value.getEntityVersion() );
-
- return doWrite( value.getCollectionScope(), value.getField(),
- new UniqueValueSerializationStrategyImpl.RowOp() {
-
- @Override
- public void doOp( final ColumnListMutation<EntityVersion> colMutation ) {
- colMutation.deleteColumn(ev);
- }
- } );
- }
-
-
- /**
- * Do the column update or delete for the given column and row key
- * @param context We need to use this when getting the keyspace
- */
- private MutationBatch doWrite( CollectionScope context, Field field, RowOp op ) {
- final MutationBatch batch = keyspace.prepareMutationBatch();
- op.doOp( batch.withRow( CF_UNIQUE_VALUES, ScopedRowKey.fromKey( context, field ) ) );
- return batch;
- }
-
-
- @Override
- public UniqueValue load( CollectionScope colScope, Field field ) throws ConnectionException {
-
- Preconditions.checkNotNull( field, "field is required" );
-
- ColumnList<EntityVersion> result;
- try {
- result = keyspace.prepareQuery( CF_UNIQUE_VALUES )
- .getKey( ScopedRowKey.fromKey( colScope, field ) )
- .withColumnRange(new RangeBuilder().setLimit(1).build())
- .execute()
- .getResult();
- }
- catch ( NotFoundException nfe ) {
- return null;
- }
-
- if ( result.isEmpty() ) {
- return null;
- }
-
- EntityVersion ev = result.getColumnByIndex(0).getName();
-
- return new UniqueValueImpl( colScope, field, ev.getEntityId(), ev.getEntityVersion() );
- }
-
-
- /**
- * Simple callback to perform puts and deletes with a common row setup code
- */
- private static interface RowOp {
- void doOp( ColumnListMutation<EntityVersion> colMutation );
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
index 0004ddb..49e967f 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
@@ -33,6 +33,9 @@ import org.apache.usergrid.persistence.collection.mvcc.entity.MvccValidationUtil
import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
import org.apache.usergrid.persistence.collection.util.EntityUtils;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
import org.apache.usergrid.persistence.model.entity.Entity;
@@ -110,7 +113,7 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Entity>
if ( field.isUnique() ) {
- UniqueValue written = new UniqueValueImpl( ioEvent.getEntityCollection(), field,
+ UniqueValue written = new UniqueValueImpl( ioEvent.getEntityCollection(), field,
mvccEntity.getEntity().get().getId(), mvccEntity.getEntity().get().getVersion());
MutationBatch mb = uniqueValueStrat.write( written );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java
index b5e5873..dcdb408 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java
@@ -38,6 +38,7 @@ import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import rx.functions.Action1;
import rx.functions.Func1;
@@ -46,7 +47,7 @@ import rx.functions.Func1;
*/
@Singleton
public class WriteOptimisticVerify
- implements Func1<CollectionIoEvent<MvccEntity>, CollectionIoEvent<MvccEntity>> {
+ implements Action1<CollectionIoEvent<MvccEntity>> {
private static final Logger log = LoggerFactory.getLogger( WriteOptimisticVerify.class );
@@ -59,7 +60,7 @@ public class WriteOptimisticVerify
@Override
- public CollectionIoEvent<MvccEntity> call( final CollectionIoEvent<MvccEntity> ioevent ) {
+ public void call( final CollectionIoEvent<MvccEntity> ioevent ) {
MvccValidationUtils.verifyMvccEntityWithEntity( ioevent.getEvent() );
// If the version was included on the entity write operation (delete or write) we need
@@ -74,7 +75,7 @@ public class WriteOptimisticVerify
CollectionScope collectionScope = ioevent.getEntityCollection();
if(entity.getVersion() == null){
- return ioevent;
+ return;
}
try {
@@ -98,8 +99,7 @@ public class WriteOptimisticVerify
"Error reading entity log", e );
}
- // No op, just emit the value
- return ioevent;
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
index 713703f..e6d37fc 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
@@ -18,36 +18,46 @@
package org.apache.usergrid.persistence.collection.mvcc.stage.write;
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccValidationUtils;
import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
+import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.field.Field;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
import rx.Observable;
-import rx.functions.Func1;
-import rx.functions.FuncN;
-import rx.schedulers.Schedulers;
+import rx.functions.Action1;
/**
* This phase execute all unique value verification on the MvccEntity.
*/
@Singleton
-public class WriteUniqueVerify implements
- Func1<CollectionIoEvent<MvccEntity>, Observable<? extends CollectionIoEvent<MvccEntity>>> {
+public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>> {
private static final Logger LOG = LoggerFactory.getLogger( WriteUniqueVerify.class );
@@ -55,16 +65,16 @@ public class WriteUniqueVerify implements
protected final SerializationFig serializationFig;
+ protected final Keyspace keyspace;
+
@Inject
- public WriteUniqueVerify(
- final UniqueValueSerializationStrategy uniqueValueSerializiationStrategy,
- final SerializationFig serializationFig ) {
+ public WriteUniqueVerify( final UniqueValueSerializationStrategy uniqueValueSerializiationStrategy,
+ final SerializationFig serializationFig, final Keyspace keyspace ) {
+ this.keyspace = keyspace;
- Preconditions.checkNotNull( uniqueValueSerializiationStrategy,
- "uniqueValueSerializationStrategy is required" );
- Preconditions.checkNotNull( serializationFig,
- "serializationFig is required" );
+ Preconditions.checkNotNull( uniqueValueSerializiationStrategy, "uniqueValueSerializationStrategy is required" );
+ Preconditions.checkNotNull( serializationFig, "serializationFig is required" );
this.uniqueValueStrat = uniqueValueSerializiationStrategy;
this.serializationFig = serializationFig;
@@ -72,8 +82,7 @@ public class WriteUniqueVerify implements
@Override
- public Observable<? extends CollectionIoEvent<MvccEntity>>
- call(final CollectionIoEvent<MvccEntity> ioevent ) {
+ public void call( final CollectionIoEvent<MvccEntity> ioevent ) {
MvccValidationUtils.verifyMvccEntityWithEntity( ioevent.getEvent() );
@@ -81,131 +90,100 @@ public class WriteUniqueVerify implements
final Entity entity = mvccEntity.getEntity().get();
- // use simple thread pool to verify fields in parallel
+ final Id entityId = entity.getId();
+
+ final UUID entityVersion = entity.getVersion();
- // We want to use concurrent to fork all validations this way they're wrapped by timeouts
- // and Hystrix thread pools for JMX operations. See the WriteCommand in the
- // EntityCollectionManagerImpl.
+ final CollectionScope scope = ioevent.getEntityCollection();
+
+ // use simple thread pool to verify fields in parallel
- // TODO: still needs to be added to the Concurrent utility class?
+ final Collection<Field> entityFields = entity.getFields();
- final List<Observable<FieldUniquenessResult>> fields =
- new ArrayList<Observable<FieldUniquenessResult>>();
+ //allocate our max size, worst case
+ final List<Field> uniqueFields = new ArrayList<>( entityFields.size() );
+ final MutationBatch batch = keyspace.prepareMutationBatch();
//
// Construct all the functions for verifying we're unique
//
- for ( final Field field : entity.getFields() ) {
+ for ( final Field field : entityFields ) {
// if it's unique, create a function to validate it and add it to the list of
// concurrent validations
if ( field.isUnique() ) {
- Observable<FieldUniquenessResult> result = Observable.from( field )
- .subscribeOn( Schedulers.io() )
- .map(new Func1<Field, FieldUniquenessResult>() {
-
- @Override
- public FieldUniquenessResult call(Field field ) {
-
- // use write-first then read strategy
- UniqueValue written = new UniqueValueImpl(
- ioevent.getEntityCollection(), field, entity.getId(), mvccEntity.getVersion() );
-
- // use TTL in case something goes wrong before entity is finally committed
- MutationBatch mb = uniqueValueStrat.write( written, serializationFig.getTimeout() );
-
- try {
- mb.execute();
- }
- catch ( ConnectionException ex ) {
- throw new RuntimeException("Unable to write to cassandra", ex );
- }
-
- // does the database value match what we wrote?
- UniqueValue loaded;
- try {
- loaded = uniqueValueStrat.load( ioevent.getEntityCollection(), field );
- }
- catch ( ConnectionException ex ) {
- throw new RuntimeException("Unable to read from cassandra", ex );
- }
-
- return new FieldUniquenessResult(
- field, loaded.getEntityId().equals( written.getEntityId() ) );
- }
- } );
-
- fields.add(result);
+
+ // use write-first then read strategy
+ final UniqueValue written = new UniqueValueImpl( scope, field, entityId, entityVersion );
+
+ // use TTL in case something goes wrong before entity is finally committed
+ final MutationBatch mb = uniqueValueStrat.write( written, serializationFig.getTimeout() );
+
+ batch.mergeShallow( mb );
+
+
+ uniqueFields.add( field );
}
}
- //short circuit. If we zip up nothing, we block forever.
- if(fields.size() == 0){
- return Observable.from(ioevent );
+ //short circuit nothing to do
+ if ( uniqueFields.size() == 0 ) {
+ return;
}
- //
- // Zip the results up
- //
- final FuncN<CollectionIoEvent<MvccEntity>> zipFunction =
- new FuncN<CollectionIoEvent<MvccEntity>>() {
-
- @Override
- public CollectionIoEvent<MvccEntity> call( final Object... args ) {
-
- Map<String, Field> uniquenessVioloations = new HashMap<String, Field>();
-
- for ( Object resultObj : args ) {
- FieldUniquenessResult result = ( FieldUniquenessResult ) resultObj;
- if ( !result.isUnique() ) {
- Field field = result.getField();
- uniquenessVioloations.put( field.getName(), field );
- }
- }
-
- if ( !uniquenessVioloations.isEmpty() ) {
- throw new WriteUniqueVerifyException(
- mvccEntity, ioevent.getEntityCollection(), uniquenessVioloations );
- }
-
- //return the original event
- return ioevent;
- }
- };
- return Observable.zip( fields, zipFunction );
- }
+ //perform the write
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException ex ) {
+ throw new RuntimeException( "Unable to write to cassandra", ex );
+ }
- static class FieldUniquenessResult {
- private Field field;
- private Boolean unique;
+ //now get the set of fields back
+ final UniqueValueSet uniqueValues;
- public FieldUniquenessResult( Field f, Boolean u ) {
- this.field = f;
- this.unique = u;
+ try {
+ uniqueValues = uniqueValueStrat.load( scope, uniqueFields );
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to read from cassandra", e );
}
- public Boolean isUnique() {
- return unique;
- }
+ final Map<String, Field> uniquenessViolations = new HashMap<>( uniqueFields.size());
- public void setUnique( Boolean isUnique ) {
- this.unique = isUnique;
- }
+ //loop through each field that was unique
+ for ( final Field field : uniqueFields ) {
+
+ final UniqueValue uniqueValue = uniqueValues.getValue( field.getName() );
+
+ if ( uniqueValue == null ) {
+ throw new RuntimeException(
+ String.format( "Could not retrieve unique value for field %s, unable to verify",
+ field.getName() ) );
+ }
+
+ final Id returnedEntityId = uniqueValue.getEntityId();
- public Field getField() {
- return field;
+
+ if ( !entityId.equals( returnedEntityId ) ) {
+ uniquenessViolations.put( field.getName(), field );
+ }
}
- public void setField( Field field ) {
- this.field = field;
+ //We have violations, throw an exception
+ if ( !uniquenessViolations.isEmpty() ) {
+ throw new WriteUniqueVerifyException( mvccEntity, ioevent.getEntityCollection(), uniquenessViolations );
}
+
+
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValue.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValue.java
new file mode 100644
index 0000000..9749101
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValue.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.collection.serialization;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.field.Field;
+
+/**
+ * Represents a Unique Value of a field within a collection.
+ */
+public interface UniqueValue {
+
+ /**
+ * The scope of this value
+ * @return
+ */
+ public CollectionScope getCollectionScope();
+
+ /**
+ * The entity Id that owns this value
+ * @return
+ */
+ public Id getEntityId();
+
+ /**
+ * The field value
+ * @return
+ */
+ public Field getField();
+
+ /**
+ * The version of the entity that owns this value
+ * @return
+ */
+ public UUID getEntityVersion();
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
new file mode 100644
index 0000000..efcc60d
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.collection.serialization;
+
+
+import java.util.Collection;
+
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.model.field.Field;
+
+
+/**
+ * Reads and writes to UniqueValues column family.
+ */
+public interface UniqueValueSerializationStrategy {
+
+ /**
+ * Write the specified UniqueValue to Cassandra with optional timeToLive in milliseconds.
+ *
+ * @param uniqueValue Object to be written
+ * @return MutatationBatch that encapsulates operation, caller may or may not execute.
+ */
+ public MutationBatch write( UniqueValue uniqueValue );
+
+ /**
+ * Write the specified UniqueValue to Cassandra with optional timeToLive in milliseconds.
+ *
+ * @param uniqueValue Object to be written
+ * @param timeToLive How long object should live in seconds
+ * @return MutatationBatch that encapsulates operation, caller may or may not execute.
+ */
+ public MutationBatch write( UniqueValue uniqueValue, Integer timeToLive );
+
+ /**
+ * Load UniqueValue that matches field from collection or null if that value does not exist.
+ *
+ * @param colScope Collection scope in which to look for field name/value
+ * @param fields Field name/value to search for
+ * @return UniqueValueSet containing fields from the collection that exist in cassandra
+ * @throws ConnectionException on error connecting to Cassandra
+ */
+ public UniqueValueSet load( CollectionScope colScope, Collection<Field> fields ) throws ConnectionException;
+
+ /**
+ * Delete the specified Unique Value from Cassandra.
+ *
+ * @param uniqueValue Object to be deleted.
+ * @return MutatationBatch that encapsulates operation, caller may or may not execute.
+ */
+ public MutationBatch delete( UniqueValue uniqueValue );
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSet.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSet.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSet.java
new file mode 100644
index 0000000..5436a11
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSet.java
@@ -0,0 +1,32 @@
+package org.apache.usergrid.persistence.collection.serialization;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+/**
+ * A read only view of unique values
+ */
+public interface UniqueValueSet extends Iterable<UniqueValue> {
+
+ /**
+ * Get the unique value for the field
+ * @param fieldName
+ * @return
+ */
+ public UniqueValue getValue(final String fieldName);
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityVersion.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityVersion.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityVersion.java
new file mode 100644
index 0000000..274cf5d
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityVersion.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.model.entity.Id;
+
+/**
+ * Combine entity ID and entity version for use as column name for UniqueValues Column Family.
+ */
+public class EntityVersion {
+ private final Id entityId;
+ private final UUID entityVersion;
+
+ public EntityVersion(Id id, UUID uuid) {
+ this.entityId = id;
+ this.entityVersion = uuid;
+ }
+
+ public Id getEntityId() {
+ return entityId;
+ }
+
+ public UUID getEntityVersion() {
+ return entityVersion;
+ }
+
+ public boolean equals( Object o ) {
+
+ if ( o == null || !(o instanceof EntityVersion) ) {
+ return false;
+ }
+
+ EntityVersion other = (EntityVersion)o;
+
+ if ( !other.getEntityId().equals( getEntityId() )) {
+ return false;
+ }
+
+ if ( !other.getEntityVersion().equals( getEntityVersion() )) {
+ return false;
+ }
+
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd841758/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityVersionSerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityVersionSerializer.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityVersionSerializer.java
new file mode 100644
index 0000000..810a1fc
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityVersionSerializer.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+
+import com.google.common.base.Preconditions;
+import com.netflix.astyanax.model.CompositeBuilder;
+import com.netflix.astyanax.model.Composites;
+import com.netflix.astyanax.model.DynamicComposite;
+import com.netflix.astyanax.serializers.AbstractSerializer;
+import com.netflix.astyanax.serializers.StringSerializer;
+import com.netflix.astyanax.serializers.UUIDSerializer;
+
+/**
+ * Serialize EntityVersion, entity ID and version, for use a column name in Unique Values Column Family.
+ */
+public class EntityVersionSerializer extends AbstractSerializer<EntityVersion> {
+
+ private static final Logger LOG = LoggerFactory.getLogger( EntityVersionSerializer.class );
+
+ @Override
+ public ByteBuffer toByteBuffer(final EntityVersion ev) {
+
+ CompositeBuilder builder = Composites.newDynamicCompositeBuilder();
+
+ builder.addTimeUUID( ev.getEntityVersion() );
+ builder.addTimeUUID( ev.getEntityId().getUuid() );
+ builder.addString( ev.getEntityId().getType() );
+
+ return builder.build();
+ }
+
+ @Override
+ public EntityVersion fromByteBuffer(final ByteBuffer byteBuffer) {
+
+ // would use Composites.newDynamicCompositeParser(byteBuffer) but it is not implemented
+
+ DynamicComposite composite = DynamicComposite.fromByteBuffer(byteBuffer);
+ Preconditions.checkArgument(composite.size() == 3, "Composite should have 3 elements");
+
+ final UUID version = composite.get( 0, UUIDSerializer.get() );
+ final UUID entityId = composite.get( 1, UUIDSerializer.get() );
+ final String entityType = composite.get( 2, StringSerializer.get() );
+
+ return new EntityVersion( new SimpleId( entityId, entityType ), version);
+ }
+
+}