You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/10/09 22:01:44 UTC

[2/4] git commit: merge and get fielded

merge and get fielded


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/0d508f98
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/0d508f98
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/0d508f98

Branch: refs/heads/collection_multiget
Commit: 0d508f981db9d509ef3250cbf75cc2f5384dd491
Parents: c8fe156 7af7e70
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Oct 9 13:57:49 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Oct 9 13:57:49 2014 -0600

----------------------------------------------------------------------
 .../collection/EntityCollectionManager.java     |  16 +-
 .../persistence/collection/EntitySet.java       |   2 -
 .../persistence/collection/MvccEntity.java      |  68 ++++++
 .../persistence/collection/MvccLogEntry.java    |  94 +++++++++
 .../persistence/collection/VersionSet.java      |  67 ++++++
 .../exception/CollectionRuntimeException.java   |   2 +-
 .../exception/WriteCommitException.java         |   4 +-
 .../WriteOptimisticVerifyException.java         |   4 +-
 .../exception/WriteStartException.java          |   4 +-
 .../exception/WriteUniqueVerifyException.java   |   2 +-
 .../collection/guice/CollectionModule.java      |   2 +-
 .../impl/EntityCollectionManagerImpl.java       | 211 ++++++++++---------
 .../impl/EntityVersionCleanupTask.java          |   2 +-
 .../mvcc/MvccEntitySerializationStrategy.java   |   3 +-
 .../mvcc/MvccLogEntrySerializationStrategy.java |  17 +-
 .../mvcc/changelog/ChangeLogGenerator.java      |   5 +-
 .../mvcc/changelog/ChangeLogGeneratorImpl.java  |   2 +-
 .../collection/mvcc/entity/MvccEntity.java      |  68 ------
 .../collection/mvcc/entity/MvccLogEntry.java    |  93 --------
 .../mvcc/entity/MvccValidationUtils.java        |   1 +
 .../mvcc/entity/impl/MvccEntityDeleteEvent.java |   2 +-
 .../mvcc/entity/impl/MvccEntityEvent.java       |   2 +-
 .../mvcc/entity/impl/MvccEntityImpl.java        |   2 +-
 .../mvcc/entity/impl/MvccEntityWriteEvent.java  |   3 +-
 .../mvcc/entity/impl/MvccLogEntryImpl.java      |   2 +-
 .../mvcc/stage/EntityUpdateEvent.java           |   1 -
 .../mvcc/stage/delete/MarkCommit.java           |   4 +-
 .../collection/mvcc/stage/delete/MarkStart.java |   4 +-
 .../mvcc/stage/write/RollbackAction.java        |   2 +-
 .../mvcc/stage/write/WriteCommit.java           |   4 +-
 .../mvcc/stage/write/WriteOptimisticVerify.java |  44 ++--
 .../collection/mvcc/stage/write/WriteStart.java |   5 +-
 .../mvcc/stage/write/WriteUniqueVerify.java     |   2 +-
 .../collection/serialization/EntityRepair.java  |   2 +-
 .../serialization/OptimisticUpdate.java         |   2 +-
 .../serialization/impl/EntityRepairImpl.java    |   2 +-
 .../serialization/impl/EntitySetImpl.java       |   3 +-
 .../serialization/impl/LogEntryIterator.java    |   2 +-
 .../MvccEntitySerializationStrategyImpl.java    |   3 +-
 .../MvccLogEntrySerializationStrategyImpl.java  | 129 ++++++++----
 .../serialization/impl/VersionSetImpl.java      |  80 +++++++
 .../collection/EntityCollectionManagerIT.java   | 155 +++++++++-----
 .../changelog/ChangeLogGeneratorImplTest.java   |   3 +-
 .../mvcc/entity/impl/MvccEntityImplTest.java    |   2 +-
 .../mvcc/entity/impl/MvccLogEntryImplTest.java  |   2 +-
 .../mvcc/stage/AbstractMvccEntityStageTest.java |   2 +-
 .../mvcc/stage/TestEntityGenerator.java         |   2 +-
 .../mvcc/stage/delete/MarkCommitTest.java       |   6 +-
 .../mvcc/stage/delete/MarkStartTest.java        |   4 +-
 .../mvcc/stage/write/WriteCommitTest.java       |   4 +-
 .../stage/write/WriteOptimisticVerifyTest.java  |   4 +-
 .../mvcc/stage/write/WriteStartTest.java        |   4 +-
 .../mvcc/stage/write/WriteUniqueVerifyTest.java |   2 +-
 .../serialization/EntityRepairImplTest.java     |   2 +-
 .../impl/LogEntryIteratorTest.java              |   2 +-
 ...MvccEntitySerializationStrategyImplTest.java |   2 +-
 .../impl/MvccLESSTransientTest.java             |   7 +-
 ...ccLogEntrySerializationStrategyImplTest.java |  19 +-
 .../util/InvalidMvccEntityGenerator.java        |   2 +-
 .../util/InvalidValueGeneratorTest.java         |   2 +-
 .../collection/util/LogEntryMock.java           |   4 +-
 61 files changed, 724 insertions(+), 473 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0d508f98/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
index 2625078,41d19e6..f976cb5
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
@@@ -25,6 -25,6 +25,7 @@@ import java.util.UUID
  import org.apache.usergrid.persistence.model.entity.Entity;
  import org.apache.usergrid.persistence.model.entity.Id;
  
++import org.apache.usergrid.persistence.model.field.Field;
  import rx.Observable;
  
  
@@@ -55,12 -55,11 +56,18 @@@ public interface EntityCollectionManage
      public Observable<Entity> load( Id entityId );
  
      /**
-      * Return the latest version of the specified entity.
+      * Return the latest versions of the specified entityIds
       */
-     public Observable<UUID> getLatestVersion( Id entityId );
-     //TODO TN Change load to use multiget and return multiple entities.  Only supports loading 1k per load operation.
+     public Observable<VersionSet> getLatestVersion( Collection<Id> entityId );
+ 
+     /**
++     * Gets the Id for a field
++     * @param field
++     * @return most likely a single Id, watch for onerror events
++     */
++    public Observable<Id> getIdField(final Field field);
 +
 +    /**
       * Load all the entityIds into the observable entity set
       * @param entityIds
       * @return

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0d508f98/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 61f436f,c47bf1a..fab4269
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@@ -19,10 -19,11 +19,18 @@@
  package org.apache.usergrid.persistence.collection.impl;
  
  
++import java.net.ConnectException;
 +import java.util.*;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.List;
 -import java.util.UUID;
  
++import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
++import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
 +import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
++import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
++import org.apache.usergrid.persistence.model.entity.SimpleId;
 +import org.apache.usergrid.persistence.model.field.Field;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -33,7 -35,9 +42,8 @@@ import org.apache.usergrid.persistence.
  import org.apache.usergrid.persistence.collection.guice.Write;
  import org.apache.usergrid.persistence.collection.guice.WriteUpdate;
  import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
- import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+ import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+ import org.apache.usergrid.persistence.collection.MvccEntity;
 -import org.apache.usergrid.persistence.collection.MvccLogEntry;
  import org.apache.usergrid.persistence.collection.mvcc.entity.MvccValidationUtils;
  import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
  import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit;
@@@ -64,13 -66,12 +72,12 @@@ import rx.schedulers.Schedulers
  
  /**
   * Simple implementation.  Should perform  writes, delete and load.
-- *
++ * <p/>
   * TODO: maybe refactor the stage operations into their own classes for clarity and organization?
   */
  public class EntityCollectionManagerImpl implements EntityCollectionManager {
  
--    private static final Logger log = LoggerFactory.getLogger( EntityCollectionManagerImpl.class );
++    private static final Logger log = LoggerFactory.getLogger(EntityCollectionManagerImpl.class);
  
      private final CollectionScope collectionScope;
      private final UUIDService uuidService;
@@@ -91,28 -92,26 +98,31 @@@
  
      private final TaskExecutor taskExecutor;
  
+     private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy;
      private final MvccEntitySerializationStrategy entitySerializationStrategy;
 +    private UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
  
+ 
      @Inject
--    public EntityCollectionManagerImpl( final UUIDService uuidService, @Write final WriteStart writeStart,
--                                        @WriteUpdate final WriteStart writeUpdate,
--                                        final WriteUniqueVerify writeVerifyUnique,
--                                        final WriteOptimisticVerify writeOptimisticVerify,
--                                        final WriteCommit writeCommit, final RollbackAction rollback,
--                                        final MarkStart markStart, final MarkCommit markCommit,
--                                        final MvccEntitySerializationStrategy entitySerializationStrategy,
-                                         final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
--                                        @CollectionTaskExecutor final TaskExecutor taskExecutor,
-                                         @Assisted final CollectionScope collectionScope
-                                          ) {
 -                                        final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
 -                                        @Assisted final CollectionScope collectionScope ) {
++    public EntityCollectionManagerImpl(final UUIDService uuidService, @Write final WriteStart writeStart,
++                                       @WriteUpdate final WriteStart writeUpdate,
++                                       final WriteUniqueVerify writeVerifyUnique,
++                                       final WriteOptimisticVerify writeOptimisticVerify,
++                                       final WriteCommit writeCommit, final RollbackAction rollback,
++                                       final MarkStart markStart, final MarkCommit markCommit,
++                                       final MvccEntitySerializationStrategy entitySerializationStrategy,
++                                       final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
++                                       final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
++                                       @CollectionTaskExecutor final TaskExecutor taskExecutor,
++                                       @Assisted final CollectionScope collectionScope
++    ) {
 +        this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
 +        this.entitySerializationStrategy = entitySerializationStrategy;
  
  
--        Preconditions.checkNotNull( uuidService, "uuidService must be defined" );
++        Preconditions.checkNotNull(uuidService, "uuidService must be defined");
  
--        MvccValidationUtils.validateCollectionScope( collectionScope );
++        MvccValidationUtils.validateCollectionScope(collectionScope);
  
          this.writeStart = writeStart;
          this.writeUpdate = writeUpdate;
@@@ -129,175 -127,170 +138,189 @@@
          this.uuidService = uuidService;
          this.collectionScope = collectionScope;
          this.taskExecutor = taskExecutor;
 -        this.entitySerializationStrategy = entitySerializationStrategy;
+         this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy;
      }
  
  
      @Override
--    public Observable<Entity> write( final Entity entity ) {
++    public Observable<Entity> write(final Entity entity) {
  
          //do our input validation
-         Preconditions.checkNotNull( entity, 
-             "Entity is required in the new stage of the mvcc write" );
 -        Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
++        Preconditions.checkNotNull(entity, "Entity is required in the new stage of the mvcc write");
  
          final Id entityId = entity.getId();
  
--        ValidationUtils.verifyIdentity( entityId );
++        ValidationUtils.verifyIdentity(entityId);
  
  
          // create our observable and start the write
--        final CollectionIoEvent<Entity> writeData = new CollectionIoEvent<Entity>( collectionScope, entity );
++        final CollectionIoEvent<Entity> writeData = new CollectionIoEvent<Entity>(collectionScope, entity);
  
-         Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner( writeData,writeStart );
 -        Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner( writeData, writeStart );
++        Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner(writeData, writeStart);
  
          // execute all validation stages concurrently.  Needs refactored when this is done.  
          // https://github.com/Netflix/RxJava/issues/627
          // observable = Concurrent.concurrent( observable, Schedulers.io(), new WaitZip(), 
          //                  writeVerifyUnique, writeOptimisticVerify );
  
--        observable.map( writeCommit ).doOnNext( new Action1<Entity>() {
++        observable.map(writeCommit).doOnNext(new Action1<Entity>() {
              @Override
--            public void call( final Entity entity ) {
++            public void call(final Entity entity) {
                  //TODO fire a task here
  
                  //post-processing to come later. leave it empty for now.
              }
--        } ).doOnError( rollback );
++        }).doOnError(rollback);
  
  
          // return the commit result.
--        return observable.map( writeCommit ).doOnError( rollback );
++        return observable.map(writeCommit).doOnError(rollback);
      }
  
  
      @Override
--    public Observable<Void> delete( final Id entityId ) {
 -
 -        Preconditions.checkNotNull( entityId, "Entity id is required in this stage" );
 -        Preconditions.checkNotNull( entityId.getUuid(), "Entity id is required in this stage" );
 -        Preconditions.checkNotNull( entityId.getType(), "Entity type is required in this stage" );
 -
 -        return Observable.from( new CollectionIoEvent<Id>( collectionScope, entityId ) ).map( markStart )
 -                         .doOnNext( markCommit ).map( new Func1<CollectionIoEvent<MvccEntity>, Void>() {
 -                            @Override
 -                            public Void call( final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent ) {
 -                                return null;
 -                            }
 -                        } );
++    public Observable<Void> delete(final Id entityId) {
 +
-         Preconditions.checkNotNull( entityId, "Entity id is required in this stage" );
-         Preconditions.checkNotNull( entityId.getUuid(), "Entity id is required in this stage" );
-         Preconditions.checkNotNull( entityId.getType(), "Entity type is required in this stage" );
++        Preconditions.checkNotNull(entityId, "Entity id is required in this stage");
++        Preconditions.checkNotNull(entityId.getUuid(), "Entity id is required in this stage");
++        Preconditions.checkNotNull(entityId.getType(), "Entity type is required in this stage");
 +
-         return Observable.from( new CollectionIoEvent<Id>( collectionScope, entityId ) )
-                          .map( markStart ).doOnNext( markCommit ).map( new Func1<CollectionIoEvent<MvccEntity>,
-                         Void>() {
++        return Observable.from(new CollectionIoEvent<Id>(collectionScope, entityId)).map(markStart)
++                .doOnNext(markCommit).map(new Func1<CollectionIoEvent<MvccEntity>, Void>() {
 +                    @Override
-                     public Void call( final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent ) {
++                    public Void call(final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent) {
 +                        return null;
 +                    }
-                 } );
++                });
      }
  
  
      @Override
--    public Observable<Entity> load( final Id entityId ) {
++    public Observable<Entity> load(final Id entityId) {
  
--        Preconditions.checkNotNull( entityId, "Entity id required in the load stage" );
--        Preconditions.checkNotNull( entityId.getUuid(), "Entity id uuid required in load stage" );
--        Preconditions.checkNotNull( entityId.getType(), "Entity id type required in load stage" );
++        Preconditions.checkNotNull(entityId, "Entity id required in the load stage");
++        Preconditions.checkNotNull(entityId.getUuid(), "Entity id uuid required in load stage");
++        Preconditions.checkNotNull(entityId.getType(), "Entity id type required in load stage");
  
-         return load( Collections.singleton(entityId) ).map( new Func1<EntitySet, Entity>() {
 -        return load( Collections.singleton( entityId ) ).map( new Func1<EntitySet, Entity>() {
++        return load(Collections.singleton(entityId)).map(new Func1<EntitySet, Entity>() {
              @Override
--            public Entity call( final EntitySet entitySet ) {
--                final MvccEntity entity = entitySet.getEntity( entityId );
++            public Entity call(final EntitySet entitySet) {
++                final MvccEntity entity = entitySet.getEntity(entityId);
  
-                 if(entity == null){
 -                if ( entity == null ) {
++                if (entity == null) {
                      return null;
                  }
  
                  return entity.getEntity().orNull();
              }
--        } );
++        });
      }
  
  
      @Override
--    public Observable<EntitySet> load( final Collection<Id> entityIds ) {
++    public Observable<EntitySet> load(final Collection<Id> entityIds) {
  
--        Preconditions.checkNotNull( entityIds, "entityIds cannot be null" );
++        Preconditions.checkNotNull(entityIds, "entityIds cannot be null");
  
-         final EntitySet
-                 results = entitySerializationStrategy.load( collectionScope, entityIds, UUIDGenerator.newTimeUUID() );
+         final EntitySet results =
 -                entitySerializationStrategy.load( collectionScope, entityIds, UUIDGenerator.newTimeUUID() );
++                entitySerializationStrategy.load(collectionScope, entityIds, UUIDGenerator.newTimeUUID());
 +
-         return Observable.just( results );
++        return Observable.just(results);
 +    }
  
 -        return Observable.just( results );
 +    @Override
-        public Observable<Id> getIdField(Field field){
-         List<Field> fields = new ArrayList<>(1);
-         fields.add(field);
-         List list = Collections.singletonList(field);
-         rx.Observable.from(list).su
-         return null;
++    public Observable<Id> getIdField(final Field field) {
++        final List<Field> fields = Collections.singletonList(field);
++        rx.Observable<Id> o = rx.Observable.from(fields).map(new Func1<Field, Id>() {
++            @Override
++            public Id call(Field field) {
++                try {
++                    UniqueValueSet set = uniqueValueSerializationStrategy.load(collectionScope, fields);
++                    UniqueValue value = set.getValue(field.getName());
++                    Id id = value == null ? null :value.getEntityId();
++                    return id;
++                } catch (ConnectionException e) {
++                    log.error("Failed to getIdField", e);
++                    throw new RuntimeException(e);
++                }
++            }
++        });
++        return o;
      }
  
  
      @Override
--    public Observable<Entity> update( final Entity entity ) {
++    public Observable<Entity> update(final Entity entity) {
  
--        log.debug( "Starting update process" );
++        log.debug("Starting update process");
  
          //do our input validation
--        Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
++        Preconditions.checkNotNull(entity, "Entity is required in the new stage of the mvcc write");
  
          final Id entityId = entity.getId();
  
  
--        ValidationUtils.verifyIdentity( entityId );
++        ValidationUtils.verifyIdentity(entityId);
  
          // create our observable and start the write
--        CollectionIoEvent<Entity> writeData = new CollectionIoEvent<Entity>( collectionScope, entity );
++        CollectionIoEvent<Entity> writeData = new CollectionIoEvent<Entity>(collectionScope, entity);
  
  
-         Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner( writeData,writeUpdate );
 -        Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner( writeData, writeUpdate );
++        Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner(writeData, writeUpdate);
  
  
--        return observable.map( writeCommit ).doOnNext( new Action1<Entity>() {
++        return observable.map(writeCommit).doOnNext(new Action1<Entity>() {
              @Override
--            public void call( final Entity entity ) {
--                log.debug( "sending entity to the queue" );
++            public void call(final Entity entity) {
++                log.debug("sending entity to the queue");
  
-                //we an update, signal the fix
+                 //we an update, signal the fix
  
                  //TODO T.N Change this to fire a task
- //                Observable.from( new CollectionIoEvent<Id>(collectionScope, entityId ) ).map( load ).subscribeOn( Schedulers.io() ).subscribe();
+                 //                Observable.from( new CollectionIoEvent<Id>(collectionScope,
+                 // entityId ) ).map( load ).subscribeOn( Schedulers.io() ).subscribe();
  
  
              }
--        } ).doOnError( rollback );
++        }).doOnError(rollback);
      }
  
-     // fire the stages
-     public Observable<CollectionIoEvent<MvccEntity>> stageRunner( CollectionIoEvent<Entity> writeData, WriteStart writeState ) {
- 
-         return Observable.from( writeData ).map( writeState ).doOnNext( new Action1<CollectionIoEvent<MvccEntity>>() {
  
-                     @Override
-                     public void call(
-                             final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent ) {
+     // fire the stages
 -    public Observable<CollectionIoEvent<MvccEntity>> stageRunner( CollectionIoEvent<Entity> writeData,
 -                                                                  WriteStart writeState ) {
++    public Observable<CollectionIoEvent<MvccEntity>> stageRunner(CollectionIoEvent<Entity> writeData,
++                                                                 WriteStart writeState) {
  
-                         Observable<CollectionIoEvent<MvccEntity>> unique =
-                                 Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() )
-                                           .doOnNext( writeVerifyUnique );
 -        return Observable.from( writeData ).map( writeState ).doOnNext( new Action1<CollectionIoEvent<MvccEntity>>() {
++        return Observable.from(writeData).map(writeState).doOnNext(new Action1<CollectionIoEvent<MvccEntity>>() {
  
+             @Override
 -            public void call( final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent ) {
++            public void call(final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent) {
  
-                         // optimistic verification
-                         Observable<CollectionIoEvent<MvccEntity>> optimistic =
-                                 Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() )
-                                           .doOnNext( writeOptimisticVerify );
+                 Observable<CollectionIoEvent<MvccEntity>> unique =
 -                        Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() )
 -                                  .doOnNext( writeVerifyUnique );
++                        Observable.from(mvccEntityCollectionIoEvent).subscribeOn(Schedulers.io())
++                                .doOnNext(writeVerifyUnique);
  
  
-                         //wait for both to finish
-                         Observable.merge( unique, optimistic ).toBlocking().last();
+                 // optimistic verification
+                 Observable<CollectionIoEvent<MvccEntity>> optimistic =
 -                        Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() )
 -                                  .doOnNext( writeOptimisticVerify );
++                        Observable.from(mvccEntityCollectionIoEvent).subscribeOn(Schedulers.io())
++                                .doOnNext(writeOptimisticVerify);
  
  
-                     }
-                 } );
+                 //wait for both to finish
 -                Observable.merge( unique, optimistic ).toBlocking().last();
++                Observable.merge(unique, optimistic).toBlocking().last();
+             }
 -        } );
++        });
      }
  
  
      @Override
-     public Observable<UUID> getLatestVersion(Id entityId) {
-         return Observable.from( 
-                 new CollectionIoEvent<Id>( collectionScope, entityId ) ).map(getVersion);
-     }
+     public Observable<VersionSet> getLatestVersion(
 -            Collection<Id> entityIds ) {
++            Collection<Id> entityIds) {
+ 
 -        VersionSet logEntries = mvccLogEntrySerializationStrategy.load( collectionScope, entityIds,
 -                UUIDGenerator.newTimeUUID() );
++        VersionSet logEntries = mvccLogEntrySerializationStrategy.load(collectionScope, entityIds,
++                UUIDGenerator.newTimeUUID());
  
+ 
+         return Observable.just(logEntries);
+     }
  }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0d508f98/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
index a7f9ea2,c01e022..6afad33
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
@@@ -22,6 -23,6 +23,7 @@@ import java.util.Arrays
  import java.util.List;
  import java.util.UUID;
  
++import org.apache.usergrid.persistence.model.field.Field;
  import org.jukito.UseModules;
  import org.junit.Rule;
  import org.junit.Test;
@@@ -277,6 -278,6 +279,36 @@@ public class EntityCollectionManagerIT 
          assertNotNull( collectionScope3 );
      }
  
++    @Test
++    public void writeAndGetField() {
++
++
++        CollectionScope collectionScope1 =
++                new CollectionScopeImpl(new SimpleId("organization"), new SimpleId("test1"), "test1");
++
++        Entity newEntity = new Entity(new SimpleId("test"));
++        Field field = new StringField("testField", "unique", true);
++        newEntity.setField(field);
++
++        EntityCollectionManager manager = factory.createCollectionManager(collectionScope1);
++
++        Observable<Entity> observable = manager.write(newEntity);
++
++        Entity createReturned = observable.toBlocking().lastOrDefault(null);
++
++
++        assertNotNull("Id was assigned", createReturned.getId());
++        assertNotNull("Version was assigned", createReturned.getVersion());
++
++        Id id = manager.getIdField(field).toBlocking().lastOrDefault(null);
++        assertNotNull(id);
++
++        Field fieldNull = new StringField("testFieldNotThere", "uniquely", true);
++        id = manager.getIdField(fieldNull).toBlocking().lastOrDefault(null);
++        assertNull(id);
++
++    }
++
  
      @Test
      public void partialUpdate() {