You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/13 16:21:05 UTC

[17/17] git commit: New RollbackAction and exception handling changes.

New RollbackAction and exception handling changes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/8c4d8ddf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/8c4d8ddf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/8c4d8ddf

Branch: refs/heads/optimistic-tx-semantics
Commit: 8c4d8ddf7a8dc6debcc4db03271550122d24ead4
Parents: f7a5072
Author: Dave Johnson <dm...@apigee.com>
Authored: Thu Mar 13 10:35:25 2014 -0400
Committer: Dave Johnson <dm...@apigee.com>
Committed: Thu Mar 13 10:35:25 2014 -0400

----------------------------------------------------------------------
 .../exception/CollectionRuntimeException.java   |  40 ++++--
 .../exception/WriteCommitException.java         |  22 +--
 .../WriteOptimisticVerifyException.java         |  24 ++--
 .../exception/WriteStartException.java          |  21 +--
 .../exception/WriteUniqueVerifyException.java   |  22 +--
 .../collection/guice/CollectionModule.java      |   3 +
 .../impl/EntityCollectionManagerImpl.java       |  23 +--
 .../mvcc/stage/delete/MarkCommit.java           |   3 +-
 .../collection/mvcc/stage/delete/MarkStart.java |   6 +-
 .../mvcc/stage/write/RollbackAction.java        | 142 +++++++++++++++++++
 .../mvcc/stage/write/WriteCommit.java           |  24 ++--
 .../mvcc/stage/write/WriteOptimisticVerify.java |  92 +-----------
 .../collection/mvcc/stage/write/WriteStart.java |   3 +-
 .../mvcc/stage/write/WriteUniqueVerify.java     |   9 +-
 .../MvccEntitySerializationStrategyImpl.java    |   4 +-
 .../stage/write/WriteOptimisticVerifyTest.java  |   6 +
 .../mvcc/stage/write/WriteUniqueVerifyIT.java   |   2 -
 17 files changed, 274 insertions(+), 172 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8c4d8ddf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/CollectionRuntimeException.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/CollectionRuntimeException.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/CollectionRuntimeException.java
index 468a4cb..d8b4e53 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/CollectionRuntimeException.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/CollectionRuntimeException.java
@@ -17,31 +17,53 @@
  */
 package org.apache.usergrid.persistence.collection.exception;
 
-/**
- * @author tnine
- */
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+
 public class CollectionRuntimeException extends RuntimeException {
-    public CollectionRuntimeException() {
+
+    private Entity entity;
+    private CollectionScope collectionScope;
+
+
+    public CollectionRuntimeException( Entity entity, CollectionScope scope ) {
+        this.entity = entity;
+        this.collectionScope = scope; 
     }
 
 
-    public CollectionRuntimeException( final String message ) {
+    public CollectionRuntimeException( Entity entity, CollectionScope scope, final String message ) {
         super( message );
     }
 
 
-    public CollectionRuntimeException( final String message, final Throwable cause ) {
+    public CollectionRuntimeException( Entity entity, CollectionScope scope, final String message, final Throwable cause ) {
         super( message, cause );
     }
 
 
-    public CollectionRuntimeException( final Throwable cause ) {
+    public CollectionRuntimeException( Entity entity, CollectionScope scope, final Throwable cause ) {
         super( cause );
     }
 
 
-    public CollectionRuntimeException( final String message, final Throwable cause, final boolean enableSuppression,
-                                       final boolean writableStackTrace ) {
+    public CollectionRuntimeException( Entity entity, CollectionScope scope, 
+            final String message, final Throwable cause, final boolean enableSuppression,
+            final boolean writableStackTrace ) {
         super( message, cause, enableSuppression, writableStackTrace );
     }
+
+    
+    public CollectionScope getCollectionScope() {
+        return collectionScope;
+    }
+
+    /**
+     * Entity involved in operation.
+     * @return Entity or null if entity not instantiated yet in operation. 
+     */
+    public Entity getEntity() {
+        return entity;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8c4d8ddf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteCommitException.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteCommitException.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteCommitException.java
index 14c1615..e3f753e 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteCommitException.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteCommitException.java
@@ -17,28 +17,28 @@
  */
 package org.apache.usergrid.persistence.collection.exception;
 
-public class WriteCommitException extends CollectionRuntimeException {
-    public WriteCommitException() {
-    }
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.model.entity.Entity;
 
+public class WriteCommitException extends CollectionRuntimeException {
 
-    public WriteCommitException( final String message ) {
-        super( message );
+    public WriteCommitException( Entity entity, CollectionScope scope, final String message ) {
+        super( entity, scope, message );
     }
 
 
-    public WriteCommitException( final String message, final Throwable cause ) {
-        super( message, cause );
+    public WriteCommitException( Entity entity, CollectionScope scope, final String message, final Throwable cause ) {
+        super( entity, scope, message, cause );
     }
 
 
-    public WriteCommitException( final Throwable cause ) {
-        super( cause );
+    public WriteCommitException( Entity entity, CollectionScope scope, final Throwable cause ) {
+        super( entity, scope, cause );
     }
 
 
-    public WriteCommitException( final String message, final Throwable cause, final boolean enableSuppression,
+    public WriteCommitException( Entity entity, CollectionScope scope, final String message, final Throwable cause, final boolean enableSuppression,
                                        final boolean writableStackTrace ) {
-        super( message, cause, enableSuppression, writableStackTrace );
+        super( entity, scope, message, cause, enableSuppression, writableStackTrace );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8c4d8ddf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteOptimisticVerifyException.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteOptimisticVerifyException.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteOptimisticVerifyException.java
index b8b8f4f..7895cb9 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteOptimisticVerifyException.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteOptimisticVerifyException.java
@@ -17,28 +17,30 @@
  */
 package org.apache.usergrid.persistence.collection.exception;
 
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
 public class WriteOptimisticVerifyException extends CollectionRuntimeException {
-    public WriteOptimisticVerifyException() {
-    }
 
 
-    public WriteOptimisticVerifyException( final String message ) {
-        super( message );
+    public WriteOptimisticVerifyException( Entity entity, CollectionScope scope, final String message ) {
+        super( entity, scope, message );
     }
 
 
-    public WriteOptimisticVerifyException( final String message, final Throwable cause ) {
-        super( message, cause );
+    public WriteOptimisticVerifyException( Entity entity, CollectionScope scope, final String message, final Throwable cause ) {
+        super( entity, scope, message, cause );
     }
 
 
-    public WriteOptimisticVerifyException( final Throwable cause ) {
-        super( cause );
+    public WriteOptimisticVerifyException( Entity entity, CollectionScope scope, final Throwable cause ) {
+        super( entity, scope, cause );
     }
 
 
-    public WriteOptimisticVerifyException( final String message, final Throwable cause, final boolean enableSuppression,
-                                       final boolean writableStackTrace ) {
-        super( message, cause, enableSuppression, writableStackTrace );
+    public WriteOptimisticVerifyException( Entity entity, CollectionScope scope, 
+            final String message, final Throwable cause, final boolean enableSuppression,
+            final boolean writableStackTrace ) {
+        super( entity, scope, message, cause, enableSuppression, writableStackTrace );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8c4d8ddf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteStartException.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteStartException.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteStartException.java
index a4a5f9e..f422ebd 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteStartException.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteStartException.java
@@ -17,28 +17,29 @@
  */
 package org.apache.usergrid.persistence.collection.exception;
 
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
 public class WriteStartException extends CollectionRuntimeException {
-    public WriteStartException() {
-    }
 
 
-    public WriteStartException( final String message ) {
-        super( message );
+    public WriteStartException( Entity entity, CollectionScope scope, final String message ) {
+        super( entity, scope, message );
     }
 
 
-    public WriteStartException( final String message, final Throwable cause ) {
-        super( message, cause );
+    public WriteStartException( Entity entity, CollectionScope scope, final String message, final Throwable cause ) {
+        super( entity, scope, message, cause );
     }
 
 
-    public WriteStartException( final Throwable cause ) {
-        super( cause );
+    public WriteStartException( Entity entity, CollectionScope scope, final Throwable cause ) {
+        super( entity, scope, cause );
     }
 
 
-    public WriteStartException( final String message, final Throwable cause, final boolean enableSuppression,
+    public WriteStartException( Entity entity, CollectionScope scope, final String message, final Throwable cause, final boolean enableSuppression,
                                        final boolean writableStackTrace ) {
-        super( message, cause, enableSuppression, writableStackTrace );
+        super( entity, scope, message, cause, enableSuppression, writableStackTrace );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8c4d8ddf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteUniqueVerifyException.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteUniqueVerifyException.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteUniqueVerifyException.java
index 66b3076..503bf4f 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteUniqueVerifyException.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteUniqueVerifyException.java
@@ -18,6 +18,8 @@
 package org.apache.usergrid.persistence.collection.exception;
 
 import java.util.Map;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.field.Field;
 
 
@@ -28,25 +30,27 @@ public class WriteUniqueVerifyException extends CollectionRuntimeException {
     private Map<String, Field> violations;
 
     
-    public WriteUniqueVerifyException( Map<String, Field> violations ) {
-        super( "Error: one or more duplicate fields detected");
+    public WriteUniqueVerifyException( Entity entity, CollectionScope scope, Map<String, Field> violations ) {
+        super( entity, scope, "Error: one or more duplicate fields detected");
         this.violations = violations;
     }
 
 
-    public WriteUniqueVerifyException( final String message, final Throwable cause ) {
-        super( message, cause );
+    public WriteUniqueVerifyException( Entity entity, CollectionScope scope, 
+            final String message, final Throwable cause ) {
+        super( entity, scope, message, cause );
     }
 
 
-    public WriteUniqueVerifyException( final Throwable cause ) {
-        super( cause );
+    public WriteUniqueVerifyException( Entity entity, CollectionScope scope, final Throwable cause ) {
+        super( entity, scope, cause );
     }
 
 
-    public WriteUniqueVerifyException( final String message, final Throwable cause, final boolean enableSuppression,
-                                       final boolean writableStackTrace ) {
-        super( message, cause, enableSuppression, writableStackTrace );
+    public WriteUniqueVerifyException( Entity entity, CollectionScope scope, 
+            final String message, final Throwable cause, final boolean enableSuppression,
+            final boolean writableStackTrace ) {
+        super( entity, scope, message, cause, enableSuppression, writableStackTrace );
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8c4d8ddf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index 2837255..a954a78 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -28,6 +28,7 @@ import org.apache.usergrid.persistence.collection.impl.EntityCollectionManagerSy
 import org.apache.usergrid.persistence.collection.migration.MigrationManagerFig;
 import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.stage.write.RollbackAction;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.UniqueValueSerializationStrategyImpl;
 import org.apache.usergrid.persistence.collection.rx.CassandraThreadScheduler;
@@ -71,5 +72,7 @@ public class CollectionModule extends AbstractModule {
                 .build( EntityCollectionManagerFactory.class ) );
 
         bind( Scheduler.class ).toProvider( CassandraThreadScheduler.class );
+
+        bind( RollbackAction.class );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8c4d8ddf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 2cec4b1..9685114 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -30,6 +30,7 @@ import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
 import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit;
 import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart;
 import org.apache.usergrid.persistence.collection.mvcc.stage.load.Load;
+import org.apache.usergrid.persistence.collection.mvcc.stage.write.RollbackAction;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteCommit;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteOptimisticVerify;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart;
@@ -56,7 +57,7 @@ import rx.util.functions.FuncN;
  */
 public class EntityCollectionManagerImpl implements EntityCollectionManager {
 
-    private static final Logger logger = LoggerFactory.getLogger(EntityCollectionManagerImpl.class);
+    private static final Logger log = LoggerFactory.getLogger(EntityCollectionManagerImpl.class);
 
     private final CollectionScope collectionScope;
     private final UUIDService uuidService;
@@ -68,6 +69,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
     private final WriteUniqueVerify writeVerifyUnique;
     private final WriteOptimisticVerify writeOptimisticVerify;
     private final WriteCommit writeCommit;
+    private final RollbackAction rollback;
 
     //load stages
     private final Load load;
@@ -86,6 +88,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
         final WriteUniqueVerify writeVerifyUnique,
         final WriteOptimisticVerify writeOptimisticVerify,
         final WriteCommit writeCommit, 
+        final RollbackAction rollback,
         final Load load, 
         final MarkStart markStart,
         final MarkCommit markCommit,
@@ -98,6 +101,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
         this.writeVerifyUnique = writeVerifyUnique;
         this.writeOptimisticVerify = writeOptimisticVerify;
         this.writeCommit = writeCommit;
+        this.rollback = rollback;
         this.load = load;
         this.markStart = markStart;
         this.markCommit = markCommit;
@@ -141,7 +145,10 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
         CollectionIoEvent<Entity> writeData = new CollectionIoEvent<Entity>( collectionScope, entity );
 
         Observable<CollectionIoEvent<MvccEntity>> observable =
-            Observable.from( writeData ).subscribeOn( scheduler ).map( writeStart ).flatMap(
+            Observable.from( writeData )
+                    .subscribeOn( scheduler )
+                    .map( writeStart )
+                    .flatMap(
                 new Func1<CollectionIoEvent<MvccEntity>, Observable<CollectionIoEvent<MvccEntity>>>() {
 
                     @Override
@@ -156,7 +163,6 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
                         // WriteVerifyUnique stage to execute multiple verification steps in 
                         // parallel and zip the results
 
-
                         Observable<CollectionIoEvent<MvccEntity>> unique =
                             Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( scheduler )
                                 .flatMap( writeVerifyUnique);
@@ -167,13 +173,13 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
                             Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( scheduler )
                                 .map( writeOptimisticVerify );
 
-
                         // zip the results
                         // TODO: Should the zip only return errors here, and if errors are present, 
                         // we throw during the zip phase?  I couldn't find "
 
                         return Observable.zip( unique, optimistic, new Func2<CollectionIoEvent<MvccEntity>,
                                 CollectionIoEvent<MvccEntity>, CollectionIoEvent<MvccEntity>>() {
+
                             @Override
                             public CollectionIoEvent<MvccEntity> call(
                                 final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent,
@@ -181,18 +187,17 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
 
                             return mvccEntityCollectionIoEvent;
                            }
-                       } );
+                        });
                     }
-                } );
-
+                });
 
         // execute all validation stages concurrently.  Needs refactored when this is done.  
         // https://github.com/Netflix/RxJava/issues/627
         // observable = Concurrent.concurrent( observable, scheduler, new WaitZip(), 
         //                  writeVerifyUnique, writeOptimisticVerify );
 
-        //return the commit result.
-        return observable.map( writeCommit );
+        // return the commit result.
+        return observable.map(writeCommit); // TODO: .doOnError( rollback );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8c4d8ddf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
index 9c1a432..e5e1eef 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
@@ -101,7 +101,8 @@ public class MarkCommit implements Func1<CollectionIoEvent<MvccEntity>, Void> {
         }
         catch ( ConnectionException e ) {
             LOG.error( "Failed to execute write asynchronously ", e );
-            throw new CollectionRuntimeException( "Failed to execute write asynchronously ", e );
+            throw new CollectionRuntimeException( entity.getEntity().get(), collectionScope, 
+                    "Failed to execute write asynchronously ", e );
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8c4d8ddf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkStart.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkStart.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkStart.java
index 4b0fae5..fe61469 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkStart.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkStart.java
@@ -99,12 +99,14 @@ public class MarkStart implements Func1<CollectionIoEvent<Id>, CollectionIoEvent
         }
         catch ( ConnectionException e ) {
             LOG.error( "Failed to execute write asynchronously ", e );
-            throw new CollectionRuntimeException( "Failed to execute write asynchronously ", e );
+            throw new CollectionRuntimeException( null, collectionScope, 
+                    "Failed to execute write asynchronously ", e );
         }
 
 
         //create the mvcc entity for the next stage
-        final MvccEntityImpl nextStage = new MvccEntityImpl(entityId, version, MvccEntity.Status.COMPLETE, Optional.<Entity>absent() );
+        final MvccEntityImpl nextStage = new MvccEntityImpl(
+            entityId, version, MvccEntity.Status.COMPLETE, Optional.<Entity>absent() );
 
 
         return new CollectionIoEvent<MvccEntity>( collectionScope, nextStage );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8c4d8ddf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java
new file mode 100644
index 0000000..3761741
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.collection.mvcc.stage.write;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
+import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException;
+import org.apache.usergrid.persistence.collection.guice.CollectionModule;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.field.Field;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.Scheduler;
+import rx.util.functions.Action1;
+import rx.util.functions.Func1;
+import rx.util.functions.FuncN;
+
+
+/**
+ * Reverses any changes made on behalf of the specified entity. When an exception is thrown
+ * during a write operation this action is called to rollback any changes made. 
+ */
+public class RollbackAction implements Action1<Throwable> {
+
+    private static final Logger log = LoggerFactory.getLogger(RollbackAction.class);
+
+    private final Scheduler scheduler;
+    private final UniqueValueSerializationStrategy uniqueValueStrat;
+
+
+    public RollbackAction() {
+        Injector injector = Guice.createInjector( new CollectionModule() );
+        scheduler = injector.getInstance( Scheduler.class );
+        uniqueValueStrat = injector.getInstance( UniqueValueSerializationStrategy.class );
+    }
+
+
+    public void call( final Throwable t ) {
+
+        if ( t instanceof CollectionRuntimeException ) {
+
+            CollectionRuntimeException cre = (CollectionRuntimeException)t;
+            final Entity entity = cre.getEntity();
+            final CollectionScope scope = cre.getCollectionScope();
+
+            // Delete all unique values of entity, and do it concurrently 
+            List<Observable<FieldDeleteResult>> results
+                = new ArrayList<Observable<FieldDeleteResult>>();
+
+            int uniqueFieldCount = 0;
+            for (final Field field : entity.getFields()) {
+
+                // if it's unique, create a function to delete it
+                if (field.isUnique()) {
+
+                    uniqueFieldCount++;
+
+                    Observable<FieldDeleteResult> result = Observable.from(field)
+                            .subscribeOn(scheduler).map(new Func1<Field, FieldDeleteResult>() {
+
+                                @Override
+                                public FieldDeleteResult call(Field field) {
+                                    UniqueValue toDelete = new UniqueValueImpl(
+                                        scope, field, entity.getId(), entity.getVersion());
+
+                                    MutationBatch mb = uniqueValueStrat.delete(toDelete);
+                                    try {
+                                        mb.execute();
+                                    } catch (ConnectionException ex) {
+                                        throw new WriteUniqueVerifyException(
+                                            entity, scope, "Rollback error deleting unique value " + field.toString(), ex);
+                                    }
+                                    return new FieldDeleteResult(field.getName());
+                                }
+                            });
+
+                    results.add(result);
+                }
+            }
+
+            if (uniqueFieldCount > 0) {
+
+                final FuncN<Boolean> zipFunction = new FuncN<Boolean>() {
+
+                    @Override
+                    public Boolean call(final Object... args) {
+                        for (Object resultObj : args) {
+
+                            FieldDeleteResult result = (FieldDeleteResult) resultObj;
+                            log.debug("Rollback deleted unique value from entity: {} version: {} name: {}",
+                                 new String[]{
+                                     entity.getId().toString(),
+                                     entity.getVersion().toString(),
+                                     result.getName()
+                                 });
+                        }
+                        return true;
+                    }
+                };
+
+                // "zip" up the concurrent results
+                Observable.zip(results, zipFunction).toBlockingObservable().last();
+            }
+        }
+    }
+
+    class FieldDeleteResult {
+
+        private final String name;
+
+        public FieldDeleteResult(String name) {
+            this.name = name;
+        }
+
+        public String getName() {
+            return this.name;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8c4d8ddf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
index 32bc540..0f34d37 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
@@ -80,32 +80,27 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Entity>
     @Override
     public Entity call( final CollectionIoEvent<MvccEntity> ioEvent ) {
 
-        final MvccEntity entity = ioEvent.getEvent();
-
-
-        ValidationUtils.verifyMvccEntityWithEntity( entity );
-
-        final Id entityId = entity.getId();
-        final UUID version = entity.getVersion();
+        final MvccEntity mvccEntity = ioEvent.getEvent();
+        ValidationUtils.verifyMvccEntityWithEntity( mvccEntity );
 
+        final Id entityId = mvccEntity.getId();
+        final UUID version = mvccEntity.getVersion();
         final CollectionScope collectionScope = ioEvent.getEntityCollection();
 
-
         final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, Stage.COMMITTED );
-
         MutationBatch logMutation = logEntryStrat.write( collectionScope, startEntry );
 
         // now get our actual insert into the entity data
-        MutationBatch entityMutation = entityStrat.write( collectionScope, entity );
+        MutationBatch entityMutation = entityStrat.write( collectionScope, mvccEntity );
 
         // merge the 2 into 1 mutation
         logMutation.mergeShallow( entityMutation );
 
         // re-write the unique values but this time with no TTL
-        for ( Field field : entity.getEntity().get().getFields() ) {
+        for ( Field field : mvccEntity.getEntity().get().getFields() ) {
             if ( field.isUnique() ) {
                 UniqueValue written  = new UniqueValueImpl( 
-                        ioEvent.getEntityCollection(), field, entity.getId(), entity.getVersion());
+                        ioEvent.getEntityCollection(), field, mvccEntity.getId(), mvccEntity.getVersion());
                 MutationBatch mb = uniqueValueStrat.write( written );
 
                 // merge into our existing mutation batch
@@ -119,9 +114,10 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Entity>
         }
         catch ( ConnectionException e ) {
             LOG.error( "Failed to execute write asynchronously ", e );
-            throw new WriteCommitException( "Failed to execute write asynchronously ", e );
+            throw new WriteCommitException( mvccEntity.getEntity().get(), collectionScope, 
+                "Failed to execute write asynchronously ", e );
         }
 
-        return entity.getEntity().get();
+        return mvccEntity.getEntity().get();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8c4d8ddf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java
index 9b04bd1..3bc096d 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java
@@ -20,29 +20,22 @@ package org.apache.usergrid.persistence.collection.mvcc.stage.write;
 
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import java.util.ArrayList;
 import java.util.List;
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
 import org.apache.usergrid.persistence.collection.exception.WriteOptimisticVerifyException;
-import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException;
 import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
 import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
 import org.apache.usergrid.persistence.collection.mvcc.entity.ValidationUtils;
-import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
 import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
 import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.field.Field;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import rx.Observable;
 import rx.Scheduler;
 import rx.util.functions.Func1;
-import rx.util.functions.FuncN;
 
 
 /**
@@ -96,96 +89,19 @@ public class WriteOptimisticVerify
                 log.debug("Conflict writing entity id {} version {}", 
                     entity.getId().toString(), entity.getVersion().toString());
             
-                // We're not the first writer, cleanup and throw exception
-
                 logEntryStrat.delete( collectionScope, entity.getId(), entity.getVersion() );
-
-                // Delete all unique values of entity, and do it concurrently 
-
-                List<Observable<FieldDeleteResult>> results = 
-                    new ArrayList<Observable<FieldDeleteResult>>();
-
-                int uniqueFieldCount = 0;
-                for ( final Field field : entity.getFields() ) {
-
-                    // if it's unique, create a function to delete it
-                    if ( field.isUnique() ) {
-
-                        uniqueFieldCount++;
-
-                        Observable<FieldDeleteResult> result =  Observable.from( field )
-                            .subscribeOn( scheduler ).map(new Func1<Field,  FieldDeleteResult>() {
-
-                            @Override
-                            public FieldDeleteResult call(Field field ) {
-
-                                UniqueValue toDelete = new UniqueValueImpl( 
-                                    ioevent.getEntityCollection(), field, 
-                                    entity.getId(), entity.getVersion() );
-
-                                MutationBatch mb = uniqueValueStrat.delete( toDelete );
-                                try {
-                                    mb.execute();
-                                }
-                                catch ( ConnectionException ex ) {
-                                    throw new WriteUniqueVerifyException( 
-                                        "Error deleting unique value " + field.toString(), ex );
-                                }
-                                return new FieldDeleteResult( field.getName() );
-                            }
-                        });
-
-                        results.add( result ); 
-                    }
-                }
-
-                if ( uniqueFieldCount > 0 ) {
-
-                    final FuncN<Boolean> zipFunction = new FuncN<Boolean>() {
-
-                        @Override
-                        public Boolean call( final Object... args ) {
-                            for ( Object resultObj : args ) {
-
-                                FieldDeleteResult result = ( FieldDeleteResult ) resultObj;
-                                log.debug("Rollback deleted field from entity: {} version: {} name: {}",
-                                    new String[] { 
-                                        entity.getId().toString(), 
-                                        entity.getVersion().toString(),
-                                        result.getName()
-                                    });
-                            }
-                            return true;
-                        }
-                    };
-   
-                    // "zip" up the concurrent results
-                    Observable.zip( results, zipFunction ).toBlockingObservable().last();
-                }
-
-                throw new WriteOptimisticVerifyException("Change conflict, not first writer");
+                throw new WriteOptimisticVerifyException( entity, collectionScope, 
+                        "Change conflict, not first writer");
             }
 
-
         } catch ( ConnectionException e ) {
             log.error( "Error reading entity log", e );
-            throw new CollectionRuntimeException( "Error reading entity log", e );
+            throw new CollectionRuntimeException( entity, collectionScope, 
+                    "Error reading entity log", e );
         }
 
         // No op, just emit the value
         return ioevent;
     }
 
-
-    class FieldDeleteResult {
-        private final String name;
-
-        public FieldDeleteResult( String name ) {
-            this.name = name;
-        }
-
-        public String getName() {
-            return this.name;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8c4d8ddf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java
index c603f57..93b2da2 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java
@@ -73,7 +73,8 @@ public class WriteStart implements Func1<CollectionIoEvent<Entity>, CollectionIo
             }
             catch ( ConnectionException e ) {
                 LOG.error( "Failed to execute write ", e );
-                throw new WriteStartException( "Failed to execute write ", e );
+                throw new WriteStartException( entity, collectionScope, 
+                        "Failed to execute write ", e );
             }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8c4d8ddf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
index ca99c0c..26af8cd 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
@@ -122,7 +122,8 @@ public class WriteUniqueVerify implements
                         }
                         catch ( ConnectionException ex ) {
                             throw new WriteUniqueVerifyException( 
-                                "Error writing unique value " + field.toString(), ex );
+                                entity, ioevent.getEntityCollection(), 
+                                    "Error writing unique value " + field.toString(), ex );
                         }
 
                         // does the database value match what we wrote?
@@ -131,7 +132,8 @@ public class WriteUniqueVerify implements
                             loaded = uniqueValueStrat.load( ioevent.getEntityCollection(), field );
                         }
                         catch ( ConnectionException ex ) {
-                            throw new WriteUniqueVerifyException( "Error verifying write", ex );
+                            throw new WriteUniqueVerifyException( entity, ioevent.getEntityCollection(), 
+                                    "Error verifying write", ex );
                         }
 
                         return new FieldUniquenessResult( field, loaded.equals( written ) );
@@ -167,7 +169,8 @@ public class WriteUniqueVerify implements
                 }
 
                 if ( !uniquenessVioloations.isEmpty() ) {
-                    throw new WriteUniqueVerifyException( uniquenessVioloations );
+                    throw new WriteUniqueVerifyException( 
+                        entity, ioevent.getEntityCollection(), uniquenessVioloations );
                 }
                     
                 //return the original event

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8c4d8ddf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
index ebb6edb..2af030f 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
@@ -128,7 +128,7 @@ public class MvccEntitySerializationStrategyImpl implements MvccEntitySerializat
             return null;
         }
         catch ( ConnectionException e ) {
-            throw new CollectionRuntimeException( "An error occurred connecting to cassandra", e );
+            throw new CollectionRuntimeException( null, collectionScope, "An error occurred connecting to cassandra", e );
         }
 
 
@@ -153,7 +153,7 @@ public class MvccEntitySerializationStrategyImpl implements MvccEntitySerializat
                             .withColumnRange( version, null, false, maxSize ).execute().getResult();
         }
         catch ( ConnectionException e ) {
-            throw new CollectionRuntimeException( "An error occurred connecting to cassandra", e );
+            throw new CollectionRuntimeException( null, collectionScope, "An error occurred connecting to cassandra", e );
         }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8c4d8ddf/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerifyTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerifyTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerifyTest.java
index e9b2c7f..b396909 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerifyTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerifyTest.java
@@ -25,6 +25,7 @@ import java.util.List;
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.exception.WriteOptimisticVerifyException;
 import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
+import org.apache.usergrid.persistence.collection.impl.EntityCollectionManagerImpl;
 import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
@@ -46,12 +47,16 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import rx.Scheduler;
 
 
 @UseModules( TestCollectionModule.class )
 public class WriteOptimisticVerifyTest extends AbstractMvccEntityStageTest {
 
+    private static final Logger log = LoggerFactory.getLogger(WriteOptimisticVerifyTest.class);
+
     @Override
     protected void validateStage( final CollectionIoEvent<MvccEntity> event ) {
 
@@ -168,6 +173,7 @@ public class WriteOptimisticVerifyTest extends AbstractMvccEntityStageTest {
             newStage.call( new CollectionIoEvent<MvccEntity>(scope, mvccEntity));
 
         } catch (WriteOptimisticVerifyException e) {
+            log.info("Error", e);
             conflictDetected = true;
         }
         assertTrue( conflictDetected );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8c4d8ddf/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyIT.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyIT.java
index 3deb235..0793ac0 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyIT.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyIT.java
@@ -89,9 +89,7 @@ public class WriteUniqueVerifyIT {
             // verify two unique value violations
             assertEquals( 2, e.getVioliations().size() );
         }
-
     }
-
 }